001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl.async;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.ByteArrayOutputStream;
021 import java.io.DataInputStream;
022 import java.io.DataOutputStream;
023 import java.io.File;
024 import java.io.FilenameFilter;
025 import java.io.IOException;
026 import java.util.ArrayList;
027 import java.util.Collections;
028 import java.util.HashMap;
029 import java.util.HashSet;
030 import java.util.Iterator;
031 import java.util.LinkedHashMap;
032 import java.util.List;
033 import java.util.Map;
034 import java.util.Set;
035 import java.util.concurrent.ConcurrentHashMap;
036 import java.util.concurrent.atomic.AtomicLong;
037 import java.util.concurrent.atomic.AtomicReference;
038 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
039 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
040 import org.apache.activemq.thread.Scheduler;
041 import org.apache.activemq.util.ByteSequence;
042 import org.apache.activemq.util.IOHelper;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046
047
048 /**
049 * Manages DataFiles
050 *
051 *
052 */
053 public class AsyncDataManager {
054
055 public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
056 public static final int ITEM_HEAD_RESERVED_SPACE = 21;
057 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
058 public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3;
059 public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
060 public static final int ITEM_FOOT_SPACE = 3; // EOR
061
062 public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
063
064 public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; //
065 public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; //
066
067 public static final byte DATA_ITEM_TYPE = 1;
068 public static final byte REDO_ITEM_TYPE = 2;
069 public static final String DEFAULT_DIRECTORY = "data";
070 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
071 public static final String DEFAULT_FILE_PREFIX = "data-";
072 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
073 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
074 public static final int PREFERED_DIFF = 1024 * 512;
075
076 private static final Logger LOG = LoggerFactory.getLogger(AsyncDataManager.class);
077 protected Scheduler scheduler;
078
079 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
080
081 protected File directory = new File(DEFAULT_DIRECTORY);
082 protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
083 protected String filePrefix = DEFAULT_FILE_PREFIX;
084 protected ControlFile controlFile;
085 protected boolean started;
086 protected boolean useNio = true;
087
088 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
089 protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
090
091 protected DataFileAppender appender;
092 protected DataFileAccessorPool accessorPool;
093
094 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
095 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
096 protected DataFile currentWriteFile;
097
098 protected Location mark;
099 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
100 protected Runnable cleanupTask;
101 protected final AtomicLong storeSize;
102 protected boolean archiveDataLogs;
103
104 public AsyncDataManager(AtomicLong storeSize) {
105 this.storeSize=storeSize;
106 }
107
108 public AsyncDataManager() {
109 this(new AtomicLong());
110 }
111
112 @SuppressWarnings("unchecked")
113 public synchronized void start() throws IOException {
114 if (started) {
115 return;
116 }
117
118 started = true;
119 preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
120 lock();
121
122 accessorPool = new DataFileAccessorPool(this);
123 ByteSequence sequence = controlFile.load();
124 if (sequence != null && sequence.getLength() > 0) {
125 unmarshallState(sequence);
126 }
127 if (useNio) {
128 appender = new NIODataFileAppender(this);
129 } else {
130 appender = new DataFileAppender(this);
131 }
132
133 File[] files = directory.listFiles(new FilenameFilter() {
134 public boolean accept(File dir, String n) {
135 return dir.equals(directory) && n.startsWith(filePrefix);
136 }
137 });
138
139 if (files != null) {
140 for (int i = 0; i < files.length; i++) {
141 try {
142 File file = files[i];
143 String n = file.getName();
144 String numStr = n.substring(filePrefix.length(), n.length());
145 int num = Integer.parseInt(numStr);
146 DataFile dataFile = new DataFile(file, num, preferedFileLength);
147 fileMap.put(dataFile.getDataFileId(), dataFile);
148 storeSize.addAndGet(dataFile.getLength());
149 } catch (NumberFormatException e) {
150 // Ignore file that do not match the pattern.
151 }
152 }
153
154 // Sort the list so that we can link the DataFiles together in the
155 // right order.
156 List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
157 Collections.sort(l);
158 currentWriteFile = null;
159 for (DataFile df : l) {
160 if (currentWriteFile != null) {
161 currentWriteFile.linkAfter(df);
162 }
163 currentWriteFile = df;
164 fileByFileMap.put(df.getFile(), df);
165 }
166 }
167
168 // Need to check the current Write File to see if there was a partial
169 // write to it.
170 if (currentWriteFile != null) {
171
172 // See if the lastSyncedLocation is valid..
173 Location l = lastAppendLocation.get();
174 if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
175 l = null;
176 }
177
178 // If we know the last location that was ok.. then we can skip lots
179 // of checking
180 try{
181 l = recoveryCheck(currentWriteFile, l);
182 lastAppendLocation.set(l);
183 }catch(IOException e){
184 LOG.warn("recovery check failed", e);
185 }
186 }
187
188 storeState(false);
189
190 cleanupTask = new Runnable() {
191 public void run() {
192 cleanup();
193 }
194 };
195 this.scheduler = new Scheduler("AsyncDataManager Scheduler");
196 try {
197 this.scheduler.start();
198 } catch (Exception e) {
199 IOException ioe = new IOException("scheduler start: " + e);
200 ioe.initCause(e);
201 throw ioe;
202 }
203 this.scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
204 }
205
206 public void lock() throws IOException {
207 synchronized (this) {
208 if (controlFile == null || controlFile.isDisposed()) {
209 IOHelper.mkdirs(directory);
210 controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
211 }
212 controlFile.lock();
213 }
214 }
215
216 protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
217 if (location == null) {
218 location = new Location();
219 location.setDataFileId(dataFile.getDataFileId());
220 location.setOffset(0);
221 }
222 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
223 try {
224 reader.readLocationDetails(location);
225 while (reader.readLocationDetailsAndValidate(location)) {
226 location.setOffset(location.getOffset() + location.getSize());
227 }
228 } finally {
229 accessorPool.closeDataFileAccessor(reader);
230 }
231 dataFile.setLength(location.getOffset());
232 return location;
233 }
234
235 protected void unmarshallState(ByteSequence sequence) throws IOException {
236 ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
237 DataInputStream dis = new DataInputStream(bais);
238 if (dis.readBoolean()) {
239 mark = new Location();
240 mark.readExternal(dis);
241 } else {
242 mark = null;
243 }
244 if (dis.readBoolean()) {
245 Location l = new Location();
246 l.readExternal(dis);
247 lastAppendLocation.set(l);
248 } else {
249 lastAppendLocation.set(null);
250 }
251 }
252
253 private synchronized ByteSequence marshallState() throws IOException {
254 ByteArrayOutputStream baos = new ByteArrayOutputStream();
255 DataOutputStream dos = new DataOutputStream(baos);
256
257 if (mark != null) {
258 dos.writeBoolean(true);
259 mark.writeExternal(dos);
260 } else {
261 dos.writeBoolean(false);
262 }
263 Location l = lastAppendLocation.get();
264 if (l != null) {
265 dos.writeBoolean(true);
266 l.writeExternal(dos);
267 } else {
268 dos.writeBoolean(false);
269 }
270
271 byte[] bs = baos.toByteArray();
272 return new ByteSequence(bs, 0, bs.length);
273 }
274
275 synchronized DataFile allocateLocation(Location location) throws IOException {
276 if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) {
277 int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
278
279 String fileName = filePrefix + nextNum;
280 File file = new File(directory, fileName);
281 DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
282 //actually allocate the disk space
283 nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
284 fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
285 fileByFileMap.put(file, nextWriteFile);
286 if (currentWriteFile != null) {
287 currentWriteFile.linkAfter(nextWriteFile);
288 if (currentWriteFile.isUnused()) {
289 removeDataFile(currentWriteFile);
290 }
291 }
292 currentWriteFile = nextWriteFile;
293
294 }
295 location.setOffset(currentWriteFile.getLength());
296 location.setDataFileId(currentWriteFile.getDataFileId().intValue());
297 int size = location.getSize();
298 currentWriteFile.incrementLength(size);
299 currentWriteFile.increment();
300 storeSize.addAndGet(size);
301 return currentWriteFile;
302 }
303
304 public synchronized void removeLocation(Location location) throws IOException{
305
306 DataFile dataFile = getDataFile(location);
307 dataFile.decrement();
308 }
309
310 synchronized DataFile getDataFile(Location item) throws IOException {
311 Integer key = Integer.valueOf(item.getDataFileId());
312 DataFile dataFile = fileMap.get(key);
313 if (dataFile == null) {
314 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
315 throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
316 }
317 return dataFile;
318 }
319
320 synchronized File getFile(Location item) throws IOException {
321 Integer key = Integer.valueOf(item.getDataFileId());
322 DataFile dataFile = fileMap.get(key);
323 if (dataFile == null) {
324 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
325 throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
326 }
327 return dataFile.getFile();
328 }
329
330 private DataFile getNextDataFile(DataFile dataFile) {
331 return (DataFile)dataFile.getNext();
332 }
333
334 public synchronized void close() throws IOException {
335 if (!started) {
336 return;
337 }
338 this.scheduler.cancel(cleanupTask);
339 try {
340 this.scheduler.stop();
341 } catch (Exception e) {
342 IOException ioe = new IOException("scheduler stop: " + e);
343 ioe.initCause(e);
344 throw ioe;
345 }
346 accessorPool.close();
347 storeState(false);
348 appender.close();
349 fileMap.clear();
350 fileByFileMap.clear();
351 controlFile.unlock();
352 controlFile.dispose();
353 started = false;
354 }
355
356 synchronized void cleanup() {
357 if (accessorPool != null) {
358 accessorPool.disposeUnused();
359 }
360 }
361
362 public synchronized boolean delete() throws IOException {
363
364 // Close all open file handles...
365 appender.close();
366 accessorPool.close();
367
368 boolean result = true;
369 for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
370 DataFile dataFile = (DataFile)i.next();
371 storeSize.addAndGet(-dataFile.getLength());
372 result &= dataFile.delete();
373 }
374 fileMap.clear();
375 fileByFileMap.clear();
376 lastAppendLocation.set(null);
377 mark = null;
378 currentWriteFile = null;
379
380 // reopen open file handles...
381 accessorPool = new DataFileAccessorPool(this);
382 if (useNio) {
383 appender = new NIODataFileAppender(this);
384 } else {
385 appender = new DataFileAppender(this);
386 }
387 return result;
388 }
389
390 public synchronized void addInterestInFile(int file) throws IOException {
391 if (file >= 0) {
392 Integer key = Integer.valueOf(file);
393 DataFile dataFile = fileMap.get(key);
394 if (dataFile == null) {
395 throw new IOException("That data file does not exist");
396 }
397 addInterestInFile(dataFile);
398 }
399 }
400
401 synchronized void addInterestInFile(DataFile dataFile) {
402 if (dataFile != null) {
403 dataFile.increment();
404 }
405 }
406
407 public synchronized void removeInterestInFile(int file) throws IOException {
408 if (file >= 0) {
409 Integer key = Integer.valueOf(file);
410 DataFile dataFile = fileMap.get(key);
411 removeInterestInFile(dataFile);
412 }
413
414 }
415
416 synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
417 if (dataFile != null) {
418 if (dataFile.decrement() <= 0) {
419 removeDataFile(dataFile);
420 }
421 }
422 }
423
424 public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
425 Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
426 unUsed.removeAll(inUse);
427 unUsed.removeAll(inProgress);
428
429 List<DataFile> purgeList = new ArrayList<DataFile>();
430 for (Integer key : unUsed) {
431 DataFile dataFile = fileMap.get(key);
432 purgeList.add(dataFile);
433 }
434 for (DataFile dataFile : purgeList) {
435 if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
436 forceRemoveDataFile(dataFile);
437 }
438 }
439 }
440
441 public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
442 Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
443 unUsed.removeAll(inUse);
444
445 List<DataFile> purgeList = new ArrayList<DataFile>();
446 for (Integer key : unUsed) {
447 // Only add files less than the lastFile..
448 if( key.intValue() < lastFile.intValue() ) {
449 DataFile dataFile = fileMap.get(key);
450 purgeList.add(dataFile);
451 }
452 }
453 if (LOG.isDebugEnabled()) {
454 LOG.debug("lastFileId=" + lastFile + ", purgeList: (" + purgeList.size() + ") " + purgeList);
455 }
456 for (DataFile dataFile : purgeList) {
457 forceRemoveDataFile(dataFile);
458 }
459 }
460
461 public synchronized void consolidateDataFiles() throws IOException {
462 List<DataFile> purgeList = new ArrayList<DataFile>();
463 for (DataFile dataFile : fileMap.values()) {
464 if (dataFile.isUnused()) {
465 purgeList.add(dataFile);
466 }
467 }
468 for (DataFile dataFile : purgeList) {
469 removeDataFile(dataFile);
470 }
471 }
472
473 private synchronized void removeDataFile(DataFile dataFile) throws IOException {
474
475 // Make sure we don't delete too much data.
476 if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
477 LOG.debug("Won't remove DataFile" + dataFile);
478 return;
479 }
480 forceRemoveDataFile(dataFile);
481 }
482
483 private synchronized void forceRemoveDataFile(DataFile dataFile)
484 throws IOException {
485 accessorPool.disposeDataFileAccessors(dataFile);
486 fileByFileMap.remove(dataFile.getFile());
487 fileMap.remove(dataFile.getDataFileId());
488 storeSize.addAndGet(-dataFile.getLength());
489 dataFile.unlink();
490 if (archiveDataLogs) {
491 dataFile.move(getDirectoryArchive());
492 LOG.debug("moved data file " + dataFile + " to "
493 + getDirectoryArchive());
494 } else {
495 boolean result = dataFile.delete();
496 if (!result) {
497 LOG.info("Failed to discard data file " + dataFile);
498 }
499 }
500 }
501
502 /**
503 * @return the maxFileLength
504 */
505 public int getMaxFileLength() {
506 return maxFileLength;
507 }
508
509 /**
510 * @param maxFileLength the maxFileLength to set
511 */
512 public void setMaxFileLength(int maxFileLength) {
513 this.maxFileLength = maxFileLength;
514 }
515
516 @Override
517 public String toString() {
518 return "DataManager:(" + filePrefix + ")";
519 }
520
521 public synchronized Location getMark() throws IllegalStateException {
522 return mark;
523 }
524
525 public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
526
527 Location cur = null;
528 while (true) {
529 if (cur == null) {
530 if (location == null) {
531 DataFile head = (DataFile)currentWriteFile.getHeadNode();
532 cur = new Location();
533 cur.setDataFileId(head.getDataFileId());
534 cur.setOffset(0);
535 } else {
536 // Set to the next offset..
537 if( location.getSize() == -1 ) {
538 cur = new Location(location);
539 } else {
540 cur = new Location(location);
541 cur.setOffset(location.getOffset()+location.getSize());
542 }
543 }
544 } else {
545 cur.setOffset(cur.getOffset() + cur.getSize());
546 }
547
548 DataFile dataFile = getDataFile(cur);
549
550 // Did it go into the next file??
551 if (dataFile.getLength() <= cur.getOffset()) {
552 dataFile = getNextDataFile(dataFile);
553 if (dataFile == null) {
554 return null;
555 } else {
556 cur.setDataFileId(dataFile.getDataFileId().intValue());
557 cur.setOffset(0);
558 }
559 }
560
561 // Load in location size and type.
562 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
563 try {
564 reader.readLocationDetails(cur);
565 } finally {
566 accessorPool.closeDataFileAccessor(reader);
567 }
568
569 if (cur.getType() == 0) {
570 return null;
571 } else if (cur.getType() > 0) {
572 // Only return user records.
573 return cur;
574 }
575 }
576 }
577
578 public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
579 DataFile df = fileByFileMap.get(file);
580 return getNextLocation(df, lastLocation,thisFileOnly);
581 }
582
583 public synchronized Location getNextLocation(DataFile dataFile,
584 Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
585
586 Location cur = null;
587 while (true) {
588 if (cur == null) {
589 if (lastLocation == null) {
590 DataFile head = (DataFile)dataFile.getHeadNode();
591 cur = new Location();
592 cur.setDataFileId(head.getDataFileId());
593 cur.setOffset(0);
594 } else {
595 // Set to the next offset..
596 cur = new Location(lastLocation);
597 cur.setOffset(cur.getOffset() + cur.getSize());
598 }
599 } else {
600 cur.setOffset(cur.getOffset() + cur.getSize());
601 }
602
603
604 // Did it go into the next file??
605 if (dataFile.getLength() <= cur.getOffset()) {
606 if (thisFileOnly) {
607 return null;
608 }else {
609 dataFile = getNextDataFile(dataFile);
610 if (dataFile == null) {
611 return null;
612 } else {
613 cur.setDataFileId(dataFile.getDataFileId().intValue());
614 cur.setOffset(0);
615 }
616 }
617 }
618
619 // Load in location size and type.
620 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
621 try {
622 reader.readLocationDetails(cur);
623 } finally {
624 accessorPool.closeDataFileAccessor(reader);
625 }
626
627 if (cur.getType() == 0) {
628 return null;
629 } else if (cur.getType() > 0) {
630 // Only return user records.
631 return cur;
632 }
633 }
634 }
635
636 public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
637 DataFile dataFile = getDataFile(location);
638 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
639 ByteSequence rc = null;
640 try {
641 rc = reader.readRecord(location);
642 } finally {
643 accessorPool.closeDataFileAccessor(reader);
644 }
645 return rc;
646 }
647
648 public void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
649 synchronized (this) {
650 mark = location;
651 }
652 storeState(sync);
653 }
654
655 protected synchronized void storeState(boolean sync) throws IOException {
656 ByteSequence state = marshallState();
657 appender.storeItem(state, Location.MARK_TYPE, sync);
658 controlFile.store(state, sync);
659 }
660
661 public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
662 Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
663 return loc;
664 }
665
666 public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
667 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
668 return loc;
669 }
670
671 public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
672 return appender.storeItem(data, type, sync);
673 }
674
675 public void update(Location location, ByteSequence data, boolean sync) throws IOException {
676 DataFile dataFile = getDataFile(location);
677 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
678 try {
679 updater.updateRecord(location, data, sync);
680 } finally {
681 accessorPool.closeDataFileAccessor(updater);
682 }
683 }
684
685 public File getDirectory() {
686 return directory;
687 }
688
689 public void setDirectory(File directory) {
690 this.directory = directory;
691 }
692
693 public String getFilePrefix() {
694 return filePrefix;
695 }
696
697 public void setFilePrefix(String filePrefix) {
698 this.filePrefix = IOHelper.toFileSystemSafeName(filePrefix);
699 }
700
701 public Map<WriteKey, WriteCommand> getInflightWrites() {
702 return inflightWrites;
703 }
704
705 public Location getLastAppendLocation() {
706 return lastAppendLocation.get();
707 }
708
709 public void setLastAppendLocation(Location lastSyncedLocation) {
710 this.lastAppendLocation.set(lastSyncedLocation);
711 }
712
713 public boolean isUseNio() {
714 return useNio;
715 }
716
717 public void setUseNio(boolean useNio) {
718 this.useNio = useNio;
719 }
720
721 public File getDirectoryArchive() {
722 return directoryArchive;
723 }
724
725 public void setDirectoryArchive(File directoryArchive) {
726 this.directoryArchive = directoryArchive;
727 }
728
729 public boolean isArchiveDataLogs() {
730 return archiveDataLogs;
731 }
732
733 public void setArchiveDataLogs(boolean archiveDataLogs) {
734 this.archiveDataLogs = archiveDataLogs;
735 }
736
737 synchronized public Integer getCurrentDataFileId() {
738 if( currentWriteFile==null )
739 return null;
740 return currentWriteFile.getDataFileId();
741 }
742
743 /**
744 * Get a set of files - only valid after start()
745 * @return files currently being used
746 */
747 public Set<File> getFiles(){
748 return fileByFileMap.keySet();
749 }
750
751 synchronized public long getDiskSize() {
752 long rc=0;
753 DataFile cur = (DataFile)currentWriteFile.getHeadNode();
754 while( cur !=null ) {
755 rc += cur.getLength();
756 cur = (DataFile) cur.getNext();
757 }
758 return rc;
759 }
760
761 synchronized public long getDiskSizeUntil(Location startPosition) {
762 long rc=0;
763 DataFile cur = (DataFile)currentWriteFile.getHeadNode();
764 while( cur !=null ) {
765 if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
766 return rc + startPosition.getOffset();
767 }
768 rc += cur.getLength();
769 cur = (DataFile) cur.getNext();
770 }
771 return rc;
772 }
773
774 }