001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl.data;
018
019 import java.io.File;
020 import java.io.FilenameFilter;
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.HashMap;
024 import java.util.Iterator;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.concurrent.atomic.AtomicLong;
028
029 import org.apache.activemq.kaha.Marshaller;
030 import org.apache.activemq.kaha.StoreLocation;
031 import org.apache.activemq.kaha.impl.DataManager;
032 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
033 import org.apache.activemq.util.IOExceptionSupport;
034 import org.apache.activemq.util.IOHelper;
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 /**
039 * Manages DataFiles
040 *
041 *
042 */
043 public final class DataManagerImpl implements DataManager {
044
045 public static final int ITEM_HEAD_SIZE = 5; // type + length
046 public static final byte DATA_ITEM_TYPE = 1;
047 public static final byte REDO_ITEM_TYPE = 2;
048 public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32;
049
050 private static final Logger LOG = LoggerFactory.getLogger(DataManagerImpl.class);
051 private static final String NAME_PREFIX = "data-";
052
053 private final File directory;
054 private final String name;
055 private SyncDataFileReader reader;
056 private SyncDataFileWriter writer;
057 private DataFile currentWriteFile;
058 private long maxFileLength = MAX_FILE_LENGTH;
059 private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
060 private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
061 private String dataFilePrefix;
062 private final AtomicLong storeSize;
063
064 public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
065 this.directory = dir;
066 this.name = name;
067 this.storeSize=storeSize;
068
069 dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-");
070 // build up list of current dataFiles
071 File[] files = dir.listFiles(new FilenameFilter() {
072 public boolean accept(File dir, String n) {
073 return dir.equals(directory) && n.startsWith(dataFilePrefix);
074 }
075 });
076 if (files != null) {
077 for (int i = 0; i < files.length; i++) {
078 File file = files[i];
079 String n = file.getName();
080 String numStr = n.substring(dataFilePrefix.length(), n.length());
081 int num = Integer.parseInt(numStr);
082 DataFile dataFile = new DataFile(file, num);
083 storeSize.addAndGet(dataFile.getLength());
084 fileMap.put(dataFile.getNumber(), dataFile);
085 if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
086 currentWriteFile = dataFile;
087 }
088 }
089 }
090 }
091
092 private DataFile createAndAddDataFile(int num) {
093 String fileName = dataFilePrefix + num;
094 File file = new File(directory, fileName);
095 DataFile result = new DataFile(file, num);
096 fileMap.put(result.getNumber(), result);
097 return result;
098 }
099
100 /*
101 * (non-Javadoc)
102 *
103 * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
104 */
105 public String getName() {
106 return name;
107 }
108
109 synchronized DataFile findSpaceForData(DataItem item) throws IOException {
110 if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) {
111 int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1;
112 if (currentWriteFile != null && currentWriteFile.isUnused()) {
113 removeDataFile(currentWriteFile);
114 }
115 currentWriteFile = createAndAddDataFile(nextNum);
116 }
117 item.setOffset(currentWriteFile.getLength());
118 item.setFile(currentWriteFile.getNumber().intValue());
119 int len = item.getSize() + ITEM_HEAD_SIZE;
120 currentWriteFile.incrementLength(len);
121 storeSize.addAndGet(len);
122 return currentWriteFile;
123 }
124
125 DataFile getDataFile(StoreLocation item) throws IOException {
126 Integer key = Integer.valueOf(item.getFile());
127 DataFile dataFile = fileMap.get(key);
128 if (dataFile == null) {
129 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
130 throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile());
131 }
132 return dataFile;
133 }
134
135 /*
136 * (non-Javadoc)
137 *
138 * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller,
139 * org.apache.activemq.kaha.StoreLocation)
140 */
141 public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
142 return getReader().readItem(marshaller, item);
143 }
144
145 /*
146 * (non-Javadoc)
147 *
148 * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller,
149 * java.lang.Object)
150 */
151 public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
152 return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE);
153 }
154
155 /*
156 * (non-Javadoc)
157 *
158 * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
159 */
160 public synchronized StoreLocation storeRedoItem(Object payload) throws IOException {
161 return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
162 }
163
164 /*
165 * (non-Javadoc)
166 *
167 * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation,
168 * org.apache.activemq.kaha.Marshaller, java.lang.Object)
169 */
170 public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload)
171 throws IOException {
172 getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE);
173 }
174
175 /*
176 * (non-Javadoc)
177 *
178 * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
179 */
180 public synchronized void recoverRedoItems(RedoListener listener) throws IOException {
181
182 // Nothing to recover if there is no current file.
183 if (currentWriteFile == null) {
184 return;
185 }
186
187 DataItem item = new DataItem();
188 item.setFile(currentWriteFile.getNumber().intValue());
189 item.setOffset(0);
190 while (true) {
191 byte type;
192 try {
193 type = getReader().readDataItemSize(item);
194 } catch (IOException ignore) {
195 LOG.trace("End of data file reached at (header was invalid): " + item);
196 return;
197 }
198 if (type == REDO_ITEM_TYPE) {
199 // Un-marshal the redo item
200 Object object;
201 try {
202 object = readItem(redoMarshaller, item);
203 } catch (IOException e1) {
204 LOG.trace("End of data file reached at (payload was invalid): " + item);
205 return;
206 }
207 try {
208
209 listener.onRedoItem(item, object);
210 // in case the listener is holding on to item references,
211 // copy it
212 // so we don't change it behind the listener's back.
213 item = item.copy();
214
215 } catch (Exception e) {
216 throw IOExceptionSupport.create("Recovery handler failed: " + e, e);
217 }
218 }
219 // Move to the next item.
220 item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize());
221 }
222 }
223
224 /*
225 * (non-Javadoc)
226 *
227 * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
228 */
229 public synchronized void close() throws IOException {
230 getWriter().close();
231 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
232 DataFile dataFile = i.next();
233 getWriter().force(dataFile);
234 dataFile.close();
235 }
236 fileMap.clear();
237 }
238
239 /*
240 * (non-Javadoc)
241 *
242 * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
243 */
244 public synchronized void force() throws IOException {
245 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
246 DataFile dataFile = i.next();
247 getWriter().force(dataFile);
248 }
249 }
250
251 /*
252 * (non-Javadoc)
253 *
254 * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
255 */
256 public synchronized boolean delete() throws IOException {
257 boolean result = true;
258 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
259 DataFile dataFile = i.next();
260 storeSize.addAndGet(-dataFile.getLength());
261 result &= dataFile.delete();
262 }
263 fileMap.clear();
264 return result;
265 }
266
267 /*
268 * (non-Javadoc)
269 *
270 * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
271 */
272 public synchronized void addInterestInFile(int file) throws IOException {
273 if (file >= 0) {
274 Integer key = Integer.valueOf(file);
275 DataFile dataFile = fileMap.get(key);
276 if (dataFile == null) {
277 dataFile = createAndAddDataFile(file);
278 }
279 addInterestInFile(dataFile);
280 }
281 }
282
283 synchronized void addInterestInFile(DataFile dataFile) {
284 if (dataFile != null) {
285 dataFile.increment();
286 }
287 }
288
289 /*
290 * (non-Javadoc)
291 *
292 * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
293 */
294 public synchronized void removeInterestInFile(int file) throws IOException {
295 if (file >= 0) {
296 Integer key = Integer.valueOf(file);
297 DataFile dataFile = fileMap.get(key);
298 removeInterestInFile(dataFile);
299 }
300 }
301
302 synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
303 if (dataFile != null) {
304
305 if (dataFile.decrement() <= 0) {
306 if (dataFile != currentWriteFile) {
307 removeDataFile(dataFile);
308 }
309 }
310 }
311 }
312
313 /*
314 * (non-Javadoc)
315 *
316 * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
317 */
318 public synchronized void consolidateDataFiles() throws IOException {
319 List<DataFile> purgeList = new ArrayList<DataFile>();
320 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
321 DataFile dataFile = i.next();
322 if (dataFile.isUnused() && dataFile != currentWriteFile) {
323 purgeList.add(dataFile);
324 }
325 }
326 for (int i = 0; i < purgeList.size(); i++) {
327 DataFile dataFile = purgeList.get(i);
328 removeDataFile(dataFile);
329 }
330 }
331
332 private void removeDataFile(DataFile dataFile) throws IOException {
333 fileMap.remove(dataFile.getNumber());
334 if (writer != null) {
335 writer.force(dataFile);
336 }
337 storeSize.addAndGet(-dataFile.getLength());
338 boolean result = dataFile.delete();
339 LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
340 }
341
342 /*
343 * (non-Javadoc)
344 *
345 * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
346 */
347 public Marshaller getRedoMarshaller() {
348 return redoMarshaller;
349 }
350
351 /*
352 * (non-Javadoc)
353 *
354 * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
355 */
356 public void setRedoMarshaller(Marshaller redoMarshaller) {
357 this.redoMarshaller = redoMarshaller;
358 }
359
360 /**
361 * @return the maxFileLength
362 */
363 public long getMaxFileLength() {
364 return maxFileLength;
365 }
366
367 /**
368 * @param maxFileLength the maxFileLength to set
369 */
370 public void setMaxFileLength(long maxFileLength) {
371 this.maxFileLength = maxFileLength;
372 }
373
374 public String toString() {
375 return "DataManager:(" + NAME_PREFIX + name + ")";
376 }
377
378 public synchronized SyncDataFileReader getReader() {
379 if (reader == null) {
380 reader = createReader();
381 }
382 return reader;
383 }
384
385 protected synchronized SyncDataFileReader createReader() {
386 return new SyncDataFileReader(this);
387 }
388
389 public synchronized void setReader(SyncDataFileReader reader) {
390 this.reader = reader;
391 }
392
393 public synchronized SyncDataFileWriter getWriter() {
394 if (writer == null) {
395 writer = createWriter();
396 }
397 return writer;
398 }
399
400 private SyncDataFileWriter createWriter() {
401 return new SyncDataFileWriter(this);
402 }
403
404 public synchronized void setWriter(SyncDataFileWriter writer) {
405 this.writer = writer;
406 }
407
408 }