001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.kahadb.journal;
018
019 import java.io.File;
020 import java.io.FilenameFilter;
021 import java.io.IOException;
022 import java.io.UnsupportedEncodingException;
023 import java.util.ArrayList;
024 import java.util.Collections;
025 import java.util.HashMap;
026 import java.util.Iterator;
027 import java.util.LinkedHashMap;
028 import java.util.List;
029 import java.util.Map;
030 import java.util.Set;
031 import java.util.Timer;
032 import java.util.TimerTask;
033 import java.util.TreeMap;
034 import java.util.concurrent.ConcurrentHashMap;
035 import java.util.concurrent.atomic.AtomicLong;
036 import java.util.concurrent.atomic.AtomicReference;
037 import java.util.zip.Adler32;
038 import java.util.zip.Checksum;
039 import org.apache.kahadb.util.LinkedNode;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042 import org.apache.kahadb.util.ByteSequence;
043 import org.apache.kahadb.util.DataByteArrayInputStream;
044 import org.apache.kahadb.util.DataByteArrayOutputStream;
045 import org.apache.kahadb.util.LinkedNodeList;
046 import org.apache.kahadb.util.SchedulerTimerTask;
047 import org.apache.kahadb.util.Sequence;
048
049 /**
050 * Manages DataFiles
051 *
052 *
053 */
054 public class Journal {
055 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
056 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
057
058 private static final int MAX_BATCH_SIZE = 32*1024*1024;
059
060 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
061 public static final int RECORD_HEAD_SPACE = 4 + 1;
062
063 public static final byte USER_RECORD_TYPE = 1;
064 public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
065 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
066 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
067 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
068 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
069
070 private static byte[] createBatchControlRecordHeader() {
071 try {
072 DataByteArrayOutputStream os = new DataByteArrayOutputStream();
073 os.writeInt(BATCH_CONTROL_RECORD_SIZE);
074 os.writeByte(BATCH_CONTROL_RECORD_TYPE);
075 os.write(BATCH_CONTROL_RECORD_MAGIC);
076 ByteSequence sequence = os.toByteSequence();
077 sequence.compact();
078 return sequence.getData();
079 } catch (IOException e) {
080 throw new RuntimeException("Could not create batch control record header.", e);
081 }
082 }
083
084 public static final String DEFAULT_DIRECTORY = ".";
085 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
086 public static final String DEFAULT_FILE_PREFIX = "db-";
087 public static final String DEFAULT_FILE_SUFFIX = ".log";
088 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
089 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
090 public static final int PREFERED_DIFF = 1024 * 512;
091 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
092
093 private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
094
095 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
096
097 protected File directory = new File(DEFAULT_DIRECTORY);
098 protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
099 protected String filePrefix = DEFAULT_FILE_PREFIX;
100 protected String fileSuffix = DEFAULT_FILE_SUFFIX;
101 protected boolean started;
102
103 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
104 protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
105 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
106
107 protected FileAppender appender;
108 protected DataFileAccessorPool accessorPool;
109
110 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
111 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
112 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
113
114 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
115 protected Runnable cleanupTask;
116 protected AtomicLong totalLength = new AtomicLong();
117 protected boolean archiveDataLogs;
118 private ReplicationTarget replicationTarget;
119 protected boolean checksum;
120 protected boolean checkForCorruptionOnStartup;
121 protected boolean enableAsyncDiskSync = true;
122 private Timer timer;
123
124 public synchronized void start() throws IOException {
125 if (started) {
126 return;
127 }
128
129 long start = System.currentTimeMillis();
130 accessorPool = new DataFileAccessorPool(this);
131 started = true;
132 preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
133
134 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
135
136 File[] files = directory.listFiles(new FilenameFilter() {
137 public boolean accept(File dir, String n) {
138 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
139 }
140 });
141
142 if (files != null) {
143 for (File file : files) {
144 try {
145 String n = file.getName();
146 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
147 int num = Integer.parseInt(numStr);
148 DataFile dataFile = new DataFile(file, num, preferedFileLength);
149 fileMap.put(dataFile.getDataFileId(), dataFile);
150 totalLength.addAndGet(dataFile.getLength());
151 } catch (NumberFormatException e) {
152 // Ignore file that do not match the pattern.
153 }
154 }
155
156 // Sort the list so that we can link the DataFiles together in the
157 // right order.
158 List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
159 Collections.sort(l);
160 for (DataFile df : l) {
161 if (df.getLength() == 0) {
162 // possibly the result of a previous failed write
163 LOG.info("ignoring zero length, partially initialised journal data file: " + df);
164 continue;
165 }
166 dataFiles.addLast(df);
167 fileByFileMap.put(df.getFile(), df);
168
169 if( isCheckForCorruptionOnStartup() ) {
170 lastAppendLocation.set(recoveryCheck(df));
171 }
172 }
173 }
174
175 getCurrentWriteFile();
176
177 if( lastAppendLocation.get()==null ) {
178 DataFile df = dataFiles.getTail();
179 lastAppendLocation.set(recoveryCheck(df));
180 }
181
182 cleanupTask = new Runnable() {
183 public void run() {
184 cleanup();
185 }
186 };
187 this.timer = new Timer("KahaDB Scheduler", true);
188 TimerTask task = new SchedulerTimerTask(cleanupTask);
189 this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
190 long end = System.currentTimeMillis();
191 LOG.trace("Startup took: "+(end-start)+" ms");
192 }
193
194 private static byte[] bytes(String string) {
195 try {
196 return string.getBytes("UTF-8");
197 } catch (UnsupportedEncodingException e) {
198 throw new RuntimeException(e);
199 }
200 }
201
202 protected Location recoveryCheck(DataFile dataFile) throws IOException {
203 Location location = new Location();
204 location.setDataFileId(dataFile.getDataFileId());
205 location.setOffset(0);
206
207 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
208 try {
209 while( true ) {
210 int size = checkBatchRecord(reader, location.getOffset());
211 if ( size>=0 ) {
212 location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
213 } else {
214
215 // Perhaps it's just some corruption... scan through the file to find the next valid batch record. We
216 // may have subsequent valid batch records.
217 int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
218 if( nextOffset >=0 ) {
219 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
220 LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence);
221 dataFile.corruptedBlocks.add(sequence);
222 location.setOffset(nextOffset);
223 } else {
224 break;
225 }
226 }
227 }
228
229 } catch (IOException e) {
230 } finally {
231 accessorPool.closeDataFileAccessor(reader);
232 }
233
234 int existingLen = dataFile.getLength();
235 dataFile.setLength(location.getOffset());
236 if (existingLen > dataFile.getLength()) {
237 totalLength.addAndGet(dataFile.getLength() - existingLen);
238 }
239
240 if( !dataFile.corruptedBlocks.isEmpty() ) {
241 // Is the end of the data file corrupted?
242 if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
243 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
244 }
245 }
246
247 return location;
248 }
249
250 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
251 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
252 byte data[] = new byte[1024*4];
253 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
254
255 int pos = 0;
256 while( true ) {
257 pos = bs.indexOf(header, pos);
258 if( pos >= 0 ) {
259 return offset+pos;
260 } else {
261 // need to load the next data chunck in..
262 if( bs.length != data.length ) {
263 // If we had a short read then we were at EOF
264 return -1;
265 }
266 offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
267 bs = new ByteSequence(data, 0, reader.read(offset, data));
268 pos=0;
269 }
270 }
271 }
272
273
274 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
275 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
276 DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
277
278 reader.readFully(offset, controlRecord);
279
280 // Assert that it's a batch record.
281 for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
282 if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
283 return -1;
284 }
285 }
286
287 int size = controlIs.readInt();
288 if( size > MAX_BATCH_SIZE ) {
289 return -1;
290 }
291
292 if( isChecksum() ) {
293
294 long expectedChecksum = controlIs.readLong();
295 if( expectedChecksum == 0 ) {
296 // Checksuming was not enabled when the record was stored.
297 // we can't validate the record :(
298 return size;
299 }
300
301 byte data[] = new byte[size];
302 reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
303
304 Checksum checksum = new Adler32();
305 checksum.update(data, 0, data.length);
306
307 if( expectedChecksum!=checksum.getValue() ) {
308 return -1;
309 }
310
311 }
312 return size;
313 }
314
315
316 void addToTotalLength(int size) {
317 totalLength.addAndGet(size);
318 }
319
320 public long length() {
321 return totalLength.get();
322 }
323
324 synchronized DataFile getCurrentWriteFile() throws IOException {
325 if (dataFiles.isEmpty()) {
326 rotateWriteFile();
327 }
328 return dataFiles.getTail();
329 }
330
331 synchronized DataFile rotateWriteFile() {
332 int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
333 File file = getFile(nextNum);
334 DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
335 // actually allocate the disk space
336 fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
337 fileByFileMap.put(file, nextWriteFile);
338 dataFiles.addLast(nextWriteFile);
339 return nextWriteFile;
340 }
341
342 public File getFile(int nextNum) {
343 String fileName = filePrefix + nextNum + fileSuffix;
344 File file = new File(directory, fileName);
345 return file;
346 }
347
348 synchronized DataFile getDataFile(Location item) throws IOException {
349 Integer key = Integer.valueOf(item.getDataFileId());
350 DataFile dataFile = fileMap.get(key);
351 if (dataFile == null) {
352 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
353 throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
354 }
355 return dataFile;
356 }
357
358 synchronized File getFile(Location item) throws IOException {
359 Integer key = Integer.valueOf(item.getDataFileId());
360 DataFile dataFile = fileMap.get(key);
361 if (dataFile == null) {
362 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
363 throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
364 }
365 return dataFile.getFile();
366 }
367
368 private DataFile getNextDataFile(DataFile dataFile) {
369 return dataFile.getNext();
370 }
371
372 public synchronized void close() throws IOException {
373 if (!started) {
374 return;
375 }
376 if (this.timer != null) {
377 this.timer.cancel();
378 }
379 accessorPool.close();
380 appender.close();
381 fileMap.clear();
382 fileByFileMap.clear();
383 dataFiles.clear();
384 lastAppendLocation.set(null);
385 started = false;
386 }
387
388 protected synchronized void cleanup() {
389 if (accessorPool != null) {
390 accessorPool.disposeUnused();
391 }
392 }
393
394 public synchronized boolean delete() throws IOException {
395
396 // Close all open file handles...
397 appender.close();
398 accessorPool.close();
399
400 boolean result = true;
401 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
402 DataFile dataFile = i.next();
403 totalLength.addAndGet(-dataFile.getLength());
404 result &= dataFile.delete();
405 }
406 fileMap.clear();
407 fileByFileMap.clear();
408 lastAppendLocation.set(null);
409 dataFiles = new LinkedNodeList<DataFile>();
410
411 // reopen open file handles...
412 accessorPool = new DataFileAccessorPool(this);
413 appender = new DataFileAppender(this);
414 return result;
415 }
416
417 public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
418 for (Integer key : files) {
419 // Can't remove the data file (or subsequent files) that is currently being written to.
420 if( key >= lastAppendLocation.get().getDataFileId() ) {
421 continue;
422 }
423 DataFile dataFile = fileMap.get(key);
424 if( dataFile!=null ) {
425 forceRemoveDataFile(dataFile);
426 }
427 }
428 }
429
430 private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
431 accessorPool.disposeDataFileAccessors(dataFile);
432 fileByFileMap.remove(dataFile.getFile());
433 fileMap.remove(dataFile.getDataFileId());
434 totalLength.addAndGet(-dataFile.getLength());
435 dataFile.unlink();
436 if (archiveDataLogs) {
437 dataFile.move(getDirectoryArchive());
438 LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
439 } else {
440 if ( dataFile.delete() ) {
441 LOG.debug("Discarded data file " + dataFile);
442 } else {
443 LOG.warn("Failed to discard data file " + dataFile.getFile());
444 }
445 }
446 }
447
448 /**
449 * @return the maxFileLength
450 */
451 public int getMaxFileLength() {
452 return maxFileLength;
453 }
454
455 /**
456 * @param maxFileLength the maxFileLength to set
457 */
458 public void setMaxFileLength(int maxFileLength) {
459 this.maxFileLength = maxFileLength;
460 }
461
462 @Override
463 public String toString() {
464 return directory.toString();
465 }
466
467 public synchronized void appendedExternally(Location loc, int length) throws IOException {
468 DataFile dataFile = null;
469 if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
470 // It's an update to the current log file..
471 dataFile = dataFiles.getTail();
472 dataFile.incrementLength(length);
473 } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
474 // It's an update to the next log file.
475 int nextNum = loc.getDataFileId();
476 File file = getFile(nextNum);
477 dataFile = new DataFile(file, nextNum, preferedFileLength);
478 // actually allocate the disk space
479 fileMap.put(dataFile.getDataFileId(), dataFile);
480 fileByFileMap.put(file, dataFile);
481 dataFiles.addLast(dataFile);
482 } else {
483 throw new IOException("Invalid external append.");
484 }
485 }
486
487 public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
488
489 Location cur = null;
490 while (true) {
491 if (cur == null) {
492 if (location == null) {
493 DataFile head = dataFiles.getHead();
494 if( head == null ) {
495 return null;
496 }
497 cur = new Location();
498 cur.setDataFileId(head.getDataFileId());
499 cur.setOffset(0);
500 } else {
501 // Set to the next offset..
502 if (location.getSize() == -1) {
503 cur = new Location(location);
504 } else {
505 cur = new Location(location);
506 cur.setOffset(location.getOffset() + location.getSize());
507 }
508 }
509 } else {
510 cur.setOffset(cur.getOffset() + cur.getSize());
511 }
512
513 DataFile dataFile = getDataFile(cur);
514
515 // Did it go into the next file??
516 if (dataFile.getLength() <= cur.getOffset()) {
517 dataFile = getNextDataFile(dataFile);
518 if (dataFile == null) {
519 return null;
520 } else {
521 cur.setDataFileId(dataFile.getDataFileId().intValue());
522 cur.setOffset(0);
523 }
524 }
525
526 // Load in location size and type.
527 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
528 try {
529 reader.readLocationDetails(cur);
530 } finally {
531 accessorPool.closeDataFileAccessor(reader);
532 }
533
534 if (cur.getType() == 0) {
535 return null;
536 } else if (cur.getType() == USER_RECORD_TYPE) {
537 // Only return user records.
538 return cur;
539 }
540 }
541 }
542
543 public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
544 DataFile df = fileByFileMap.get(file);
545 return getNextLocation(df, lastLocation, thisFileOnly);
546 }
547
548 public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
549
550 Location cur = null;
551 while (true) {
552 if (cur == null) {
553 if (lastLocation == null) {
554 DataFile head = dataFile.getHeadNode();
555 cur = new Location();
556 cur.setDataFileId(head.getDataFileId());
557 cur.setOffset(0);
558 } else {
559 // Set to the next offset..
560 cur = new Location(lastLocation);
561 cur.setOffset(cur.getOffset() + cur.getSize());
562 }
563 } else {
564 cur.setOffset(cur.getOffset() + cur.getSize());
565 }
566
567 // Did it go into the next file??
568 if (dataFile.getLength() <= cur.getOffset()) {
569 if (thisFileOnly) {
570 return null;
571 } else {
572 dataFile = getNextDataFile(dataFile);
573 if (dataFile == null) {
574 return null;
575 } else {
576 cur.setDataFileId(dataFile.getDataFileId().intValue());
577 cur.setOffset(0);
578 }
579 }
580 }
581
582 // Load in location size and type.
583 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
584 try {
585 reader.readLocationDetails(cur);
586 } finally {
587 accessorPool.closeDataFileAccessor(reader);
588 }
589
590 if (cur.getType() == 0) {
591 return null;
592 } else if (cur.getType() > 0) {
593 // Only return user records.
594 return cur;
595 }
596 }
597 }
598
599 public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
600 DataFile dataFile = getDataFile(location);
601 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
602 ByteSequence rc = null;
603 try {
604 rc = reader.readRecord(location);
605 } finally {
606 accessorPool.closeDataFileAccessor(reader);
607 }
608 return rc;
609 }
610
611 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
612 Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
613 return loc;
614 }
615
616 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
617 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
618 return loc;
619 }
620
621 public void update(Location location, ByteSequence data, boolean sync) throws IOException {
622 DataFile dataFile = getDataFile(location);
623 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
624 try {
625 updater.updateRecord(location, data, sync);
626 } finally {
627 accessorPool.closeDataFileAccessor(updater);
628 }
629 }
630
631 public File getDirectory() {
632 return directory;
633 }
634
635 public void setDirectory(File directory) {
636 this.directory = directory;
637 }
638
639 public String getFilePrefix() {
640 return filePrefix;
641 }
642
643 public void setFilePrefix(String filePrefix) {
644 this.filePrefix = filePrefix;
645 }
646
647 public Map<WriteKey, WriteCommand> getInflightWrites() {
648 return inflightWrites;
649 }
650
651 public Location getLastAppendLocation() {
652 return lastAppendLocation.get();
653 }
654
655 public void setLastAppendLocation(Location lastSyncedLocation) {
656 this.lastAppendLocation.set(lastSyncedLocation);
657 }
658
659 public File getDirectoryArchive() {
660 return directoryArchive;
661 }
662
663 public void setDirectoryArchive(File directoryArchive) {
664 this.directoryArchive = directoryArchive;
665 }
666
667 public boolean isArchiveDataLogs() {
668 return archiveDataLogs;
669 }
670
671 public void setArchiveDataLogs(boolean archiveDataLogs) {
672 this.archiveDataLogs = archiveDataLogs;
673 }
674
675 synchronized public Integer getCurrentDataFileId() {
676 if (dataFiles.isEmpty())
677 return null;
678 return dataFiles.getTail().getDataFileId();
679 }
680
681 /**
682 * Get a set of files - only valid after start()
683 *
684 * @return files currently being used
685 */
686 public Set<File> getFiles() {
687 return fileByFileMap.keySet();
688 }
689
690 public synchronized Map<Integer, DataFile> getFileMap() {
691 return new TreeMap<Integer, DataFile>(fileMap);
692 }
693
694 public long getDiskSize() {
695 long tailLength=0;
696 synchronized( this ) {
697 if( !dataFiles.isEmpty() ) {
698 tailLength = dataFiles.getTail().getLength();
699 }
700 }
701
702 long rc = totalLength.get();
703
704 // The last file is actually at a minimum preferedFileLength big.
705 if( tailLength < preferedFileLength ) {
706 rc -= tailLength;
707 rc += preferedFileLength;
708 }
709 return rc;
710 }
711
712 public void setReplicationTarget(ReplicationTarget replicationTarget) {
713 this.replicationTarget = replicationTarget;
714 }
715 public ReplicationTarget getReplicationTarget() {
716 return replicationTarget;
717 }
718
719 public String getFileSuffix() {
720 return fileSuffix;
721 }
722
723 public void setFileSuffix(String fileSuffix) {
724 this.fileSuffix = fileSuffix;
725 }
726
727 public boolean isChecksum() {
728 return checksum;
729 }
730
731 public void setChecksum(boolean checksumWrites) {
732 this.checksum = checksumWrites;
733 }
734
735 public boolean isCheckForCorruptionOnStartup() {
736 return checkForCorruptionOnStartup;
737 }
738
739 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
740 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
741 }
742
743 public void setWriteBatchSize(int writeBatchSize) {
744 this.writeBatchSize = writeBatchSize;
745 }
746
747 public int getWriteBatchSize() {
748 return writeBatchSize;
749 }
750
751 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
752 this.totalLength = storeSizeAccumulator;
753 }
754
755 public void setEnableAsyncDiskSync(boolean val) {
756 this.enableAsyncDiskSync = val;
757 }
758
759 public boolean isEnableAsyncDiskSync() {
760 return enableAsyncDiskSync;
761 }
762
763 public static class WriteCommand extends LinkedNode<WriteCommand> {
764 public final Location location;
765 public final ByteSequence data;
766 final boolean sync;
767 public final Runnable onComplete;
768
769 public WriteCommand(Location location, ByteSequence data, boolean sync) {
770 this.location = location;
771 this.data = data;
772 this.sync = sync;
773 this.onComplete = null;
774 }
775
776 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
777 this.location = location;
778 this.data = data;
779 this.onComplete = onComplete;
780 this.sync = false;
781 }
782 }
783
784 public static class WriteKey {
785 private final int file;
786 private final long offset;
787 private final int hash;
788
789 public WriteKey(Location item) {
790 file = item.getDataFileId();
791 offset = item.getOffset();
792 // TODO: see if we can build a better hash
793 hash = (int)(file ^ offset);
794 }
795
796 public int hashCode() {
797 return hash;
798 }
799
800 public boolean equals(Object obj) {
801 if (obj instanceof WriteKey) {
802 WriteKey di = (WriteKey)obj;
803 return di.file == file && di.offset == offset;
804 }
805 return false;
806 }
807 }
808 }