001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl.async;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.io.RandomAccessFile;
022 import java.nio.channels.FileLock;
023 import java.nio.channels.OverlappingFileLockException;
024
025 import org.apache.activemq.util.ByteSequence;
026 import org.apache.activemq.util.IOExceptionSupport;
027
028 /**
029 * Use to reliably store fixed sized state data. It stores the state in record
030 * that is versioned and repeated twice in the file so that a failure in the
031 * middle of the write of the first or second record do not not result in an
032 * unknown state.
033 *
034 *
035 */
036 public final class ControlFile {
037
038 private static final boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
039 private final File file;
040
041 /** The File that holds the control data. */
042 private final RandomAccessFile randomAccessFile;
043 private final int maxRecordSize;
044 private final int firstRecordStart;
045 private final int secondRecordStart;
046 private final int firstRecordEnd;
047 private final int secondRecordEnd;
048
049 private long version;
050 private FileLock lock;
051 private boolean disposed;
052
053 public ControlFile(File file, int recordSize) throws IOException {
054 this.file = file;
055 this.maxRecordSize = recordSize + 4;
056
057 // Calculate where the records start and end.
058 this.firstRecordStart = 8;
059 this.secondRecordStart = 8 + maxRecordSize + 8 + 8;
060 this.firstRecordEnd = firstRecordStart+maxRecordSize;
061 this.secondRecordEnd = secondRecordStart+maxRecordSize;
062
063 randomAccessFile = new RandomAccessFile(file, "rw");
064 }
065
066 /**
067 * Locks the control file.
068 *
069 * @throws IOException
070 */
071 public void lock() throws IOException {
072 if (DISABLE_FILE_LOCK) {
073 return;
074 }
075
076 if (lock == null) {
077 try {
078 lock = randomAccessFile.getChannel().tryLock(0, Math.max(1, randomAccessFile.getChannel().size()), false);
079 } catch (OverlappingFileLockException e) {
080 throw IOExceptionSupport.create("Control file '" + file + "' could not be locked.",e);
081 }
082 if (lock == null) {
083 throw new IOException("Control file '" + file + "' could not be locked.");
084 }
085 }
086 }
087
088 /**
089 * Un locks the control file.
090 *
091 * @throws IOException
092 */
093 public void unlock() throws IOException {
094 if (DISABLE_FILE_LOCK) {
095 return;
096 }
097
098 if (lock != null) {
099 lock.release();
100 lock = null;
101 }
102 }
103
104 public void dispose() {
105 if (disposed) {
106 return;
107 }
108 disposed = true;
109 try {
110 unlock();
111 } catch (IOException ignore) {
112 }
113 try {
114 randomAccessFile.close();
115 } catch (IOException ignore) {
116 }
117 }
118
119 public synchronized ByteSequence load() throws IOException {
120 long l = randomAccessFile.length();
121 if (l < maxRecordSize) {
122 return null;
123 }
124
125 randomAccessFile.seek(firstRecordStart-8);
126 long v1 = randomAccessFile.readLong();
127 randomAccessFile.seek(firstRecordEnd);
128 long v1check = randomAccessFile.readLong();
129
130 randomAccessFile.seek(secondRecordStart - 8);
131 long v2 = randomAccessFile.readLong();
132 randomAccessFile.seek(secondRecordEnd);
133 long v2check = randomAccessFile.readLong();
134
135 byte[] data = null;
136 if (v2 == v2check) {
137 version = v2;
138 randomAccessFile.seek(secondRecordStart);
139 int size = randomAccessFile.readInt();
140 data = new byte[size];
141 randomAccessFile.readFully(data);
142 } else if (v1 == v1check) {
143 version = v1;
144 randomAccessFile.seek(firstRecordStart);
145 int size = randomAccessFile.readInt();
146 data = new byte[size];
147 randomAccessFile.readFully(data);
148 } else {
149 // Bummer.. Both checks are screwed. we don't know
150 // if any of the two buffer are ok. This should
151 // only happen is data got corrupted.
152 throw new IOException("Control data corrupted.");
153 }
154 return new ByteSequence(data, 0, data.length);
155 }
156
157 public void store(ByteSequence data, boolean sync) throws IOException {
158
159 version++;
160 randomAccessFile.setLength((maxRecordSize * 2) + 32);
161 randomAccessFile.seek(0);
162
163 // Write the first copy of the control data.
164 randomAccessFile.writeLong(version);
165 randomAccessFile.writeInt(data.getLength());
166 randomAccessFile.write(data.getData());
167 randomAccessFile.seek(firstRecordEnd);
168 randomAccessFile.writeLong(version);
169
170 // Write the second copy of the control data.
171 randomAccessFile.writeLong(version);
172 randomAccessFile.writeInt(data.getLength());
173 randomAccessFile.write(data.getData());
174 randomAccessFile.seek(secondRecordEnd);
175 randomAccessFile.writeLong(version);
176
177 if (sync) {
178 randomAccessFile.getFD().sync();
179 }
180 }
181
182 public boolean isDisposed() {
183 return disposed;
184 }
185
186 }