001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.kahadb.page;
018
019 import java.io.*;
020 import java.util.ArrayList;
021 import java.util.Arrays;
022 import java.util.Collection;
023 import java.util.Collections;
024 import java.util.Iterator;
025 import java.util.LinkedHashMap;
026 import java.util.Map;
027 import java.util.Map.Entry;
028 import java.util.Properties;
029 import java.util.TreeMap;
030 import java.util.concurrent.CountDownLatch;
031 import java.util.concurrent.atomic.AtomicBoolean;
032 import java.util.concurrent.atomic.AtomicLong;
033 import java.util.zip.Adler32;
034 import java.util.zip.Checksum;
035
036 import org.apache.kahadb.util.DataByteArrayOutputStream;
037 import org.apache.kahadb.util.IOExceptionSupport;
038 import org.apache.kahadb.util.IOHelper;
039 import org.apache.kahadb.util.IntrospectionSupport;
040 import org.apache.kahadb.util.LFUCache;
041 import org.apache.kahadb.util.LRUCache;
042 import org.apache.kahadb.util.Sequence;
043 import org.apache.kahadb.util.SequenceSet;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 /**
048 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
049 * be externally synchronized.
050 * <p/>
051 * The file has 3 parts:
052 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
053 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
054 * Page Space: The pages in the page file.
055 */
056 public class PageFile {
057
058 private static final String PAGEFILE_SUFFIX = ".data";
059 private static final String RECOVERY_FILE_SUFFIX = ".redo";
060 private static final String FREE_FILE_SUFFIX = ".free";
061
062 // 4k Default page size.
063 public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
064 public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
065 public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
066
067 private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
068 private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
069
070 // Recovery header is (long offset)
071 private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
072
073 // A PageFile will use a couple of files in this directory
074 private File directory;
075 // And the file names in that directory will be based on this name.
076 private final String name;
077
078 // File handle used for reading pages..
079 private RandomAccessFile readFile;
080 // File handle used for writing pages..
081 private RandomAccessFile writeFile;
082 // File handle used for writing pages..
083 private RandomAccessFile recoveryFile;
084
085 // The size of pages
086 private int pageSize = DEFAULT_PAGE_SIZE;
087
088 // The minimum number of space allocated to the recovery file in number of pages.
089 private int recoveryFileMinPageCount = 1000;
090 // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
091 // to this max size as soon as possible.
092 private int recoveryFileMaxPageCount = 10000;
093 // The number of pages in the current recovery buffer
094 private int recoveryPageCount;
095
096 private AtomicBoolean loaded = new AtomicBoolean();
097 // The number of pages we are aiming to write every time we
098 // write to disk.
099 int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
100
101 // We keep a cache of pages recently used?
102 private Map<Long, Page> pageCache;
103 // The cache of recently used pages.
104 private boolean enablePageCaching = true;
105 // How many pages will we keep in the cache?
106 private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
107
108 // Should first log the page write to the recovery buffer? Avoids partial
109 // page write failures..
110 private boolean enableRecoveryFile = true;
111 // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
112 private boolean enableDiskSyncs = true;
113 // Will writes be done in an async thread?
114 private boolean enabledWriteThread = false;
115
116 // These are used if enableAsyncWrites==true
117 private AtomicBoolean stopWriter = new AtomicBoolean();
118 private Thread writerThread;
119 private CountDownLatch checkpointLatch;
120
121 // Keeps track of writes that are being written to disk.
122 private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
123
124 // Keeps track of free pages.
125 private final AtomicLong nextFreePageId = new AtomicLong();
126 private SequenceSet freeList = new SequenceSet();
127
128 private AtomicLong nextTxid = new AtomicLong();
129
130 // Persistent settings stored in the page file.
131 private MetaData metaData;
132
133 private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
134
135 private boolean useLFRUEviction = false;
136 private float LFUEvictionFactor = 0.2f;
137
138 /**
139 * Use to keep track of updated pages which have not yet been committed.
140 */
141 static class PageWrite {
142 Page page;
143 byte[] current;
144 byte[] diskBound;
145 long currentLocation = -1;
146 long diskBoundLocation = -1;
147 File tmpFile;
148 int length;
149
150 public PageWrite(Page page, byte[] data) {
151 this.page = page;
152 current = data;
153 }
154
155 public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
156 this.page = page;
157 this.currentLocation = currentLocation;
158 this.tmpFile = tmpFile;
159 this.length = length;
160 }
161
162 public void setCurrent(Page page, byte[] data) {
163 this.page = page;
164 current = data;
165 currentLocation = -1;
166 diskBoundLocation = -1;
167 }
168
169 public void setCurrentLocation(Page page, long location, int length) {
170 this.page = page;
171 this.currentLocation = location;
172 this.length = length;
173 this.current = null;
174 }
175
176 @Override
177 public String toString() {
178 return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
179 }
180
181 @SuppressWarnings("unchecked")
182 public Page getPage() {
183 return page;
184 }
185
186 public byte[] getDiskBound() throws IOException {
187 if (diskBound == null && diskBoundLocation != -1) {
188 diskBound = new byte[length];
189 RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
190 file.seek(diskBoundLocation);
191 file.read(diskBound);
192 file.close();
193 diskBoundLocation = -1;
194 }
195 return diskBound;
196 }
197
198 void begin() {
199 if (currentLocation != -1) {
200 diskBoundLocation = currentLocation;
201 } else {
202 diskBound = current;
203 }
204 current = null;
205 currentLocation = -1;
206 }
207
208 /**
209 * @return true if there is no pending writes to do.
210 */
211 boolean done() {
212 diskBoundLocation = -1;
213 diskBound = null;
214 return current == null || currentLocation == -1;
215 }
216
217 boolean isDone() {
218 return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
219 }
220 }
221
222 /**
223 * The MetaData object hold the persistent data associated with a PageFile object.
224 */
225 public static class MetaData {
226
227 String fileType;
228 String fileTypeVersion;
229
230 long metaDataTxId = -1;
231 int pageSize;
232 boolean cleanShutdown;
233 long lastTxId;
234 long freePages;
235
236 public String getFileType() {
237 return fileType;
238 }
239
240 public void setFileType(String fileType) {
241 this.fileType = fileType;
242 }
243
244 public String getFileTypeVersion() {
245 return fileTypeVersion;
246 }
247
248 public void setFileTypeVersion(String version) {
249 this.fileTypeVersion = version;
250 }
251
252 public long getMetaDataTxId() {
253 return metaDataTxId;
254 }
255
256 public void setMetaDataTxId(long metaDataTxId) {
257 this.metaDataTxId = metaDataTxId;
258 }
259
260 public int getPageSize() {
261 return pageSize;
262 }
263
264 public void setPageSize(int pageSize) {
265 this.pageSize = pageSize;
266 }
267
268 public boolean isCleanShutdown() {
269 return cleanShutdown;
270 }
271
272 public void setCleanShutdown(boolean cleanShutdown) {
273 this.cleanShutdown = cleanShutdown;
274 }
275
276 public long getLastTxId() {
277 return lastTxId;
278 }
279
280 public void setLastTxId(long lastTxId) {
281 this.lastTxId = lastTxId;
282 }
283
284 public long getFreePages() {
285 return freePages;
286 }
287
288 public void setFreePages(long value) {
289 this.freePages = value;
290 }
291 }
292
293 public Transaction tx() {
294 assertLoaded();
295 return new Transaction(this);
296 }
297
298 /**
299 * Creates a PageFile in the specified directory who's data files are named by name.
300 */
301 public PageFile(File directory, String name) {
302 this.directory = directory;
303 this.name = name;
304 }
305
306 /**
307 * Deletes the files used by the PageFile object. This method can only be used when this object is not loaded.
308 *
309 * @throws IOException if the files cannot be deleted.
310 * @throws IllegalStateException if this PageFile is loaded
311 */
312 public void delete() throws IOException {
313 if (loaded.get()) {
314 throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
315 }
316 delete(getMainPageFile());
317 delete(getFreeFile());
318 delete(getRecoveryFile());
319 }
320
321 public void archive() throws IOException {
322 if (loaded.get()) {
323 throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
324 }
325 long timestamp = System.currentTimeMillis();
326 archive(getMainPageFile(), String.valueOf(timestamp));
327 archive(getFreeFile(), String.valueOf(timestamp));
328 archive(getRecoveryFile(), String.valueOf(timestamp));
329 }
330
331 /**
332 * @param file
333 * @throws IOException
334 */
335 private void delete(File file) throws IOException {
336 if (file.exists() && !file.delete()) {
337 throw new IOException("Could not delete: " + file.getPath());
338 }
339 }
340
341 private void archive(File file, String suffix) throws IOException {
342 if (file.exists()) {
343 File archive = new File(file.getPath() + "-" + suffix);
344 if (!file.renameTo(archive)) {
345 throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
346 }
347 }
348 }
349
350 /**
351 * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the
352 * first time the page file is loaded, then this creates the page file in the file system.
353 *
354 * @throws IOException If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
355 * there was a disk error.
356 * @throws IllegalStateException If the page file was already loaded.
357 */
358 public void load() throws IOException, IllegalStateException {
359 if (loaded.compareAndSet(false, true)) {
360
361 if (enablePageCaching) {
362 if (isUseLFRUEviction()) {
363 pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
364 } else {
365 pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
366 }
367 }
368
369 File file = getMainPageFile();
370 IOHelper.mkdirs(file.getParentFile());
371 writeFile = new RandomAccessFile(file, "rw");
372 readFile = new RandomAccessFile(file, "r");
373
374 if (readFile.length() > 0) {
375 // Load the page size setting cause that can't change once the file is created.
376 loadMetaData();
377 pageSize = metaData.getPageSize();
378 } else {
379 // Store the page size setting cause that can't change once the file is created.
380 metaData = new MetaData();
381 metaData.setFileType(PageFile.class.getName());
382 metaData.setFileTypeVersion("1");
383 metaData.setPageSize(getPageSize());
384 metaData.setCleanShutdown(true);
385 metaData.setFreePages(-1);
386 metaData.setLastTxId(0);
387 storeMetaData();
388 }
389
390 if (enableRecoveryFile) {
391 recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
392 }
393
394 if (metaData.isCleanShutdown()) {
395 nextTxid.set(metaData.getLastTxId() + 1);
396 if (metaData.getFreePages() > 0) {
397 loadFreeList();
398 }
399 } else {
400 LOG.debug(toString() + ", Recovering page file...");
401 nextTxid.set(redoRecoveryUpdates());
402
403 // Scan all to find the free pages.
404 freeList = new SequenceSet();
405 for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
406 Page page = i.next();
407 if (page.getType() == Page.PAGE_FREE_TYPE) {
408 freeList.add(page.getPageId());
409 }
410 }
411 }
412
413 metaData.setCleanShutdown(false);
414 storeMetaData();
415 getFreeFile().delete();
416
417 if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
418 writeFile.setLength(PAGE_FILE_HEADER_SIZE);
419 }
420 nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
421 startWriter();
422
423 } else {
424 throw new IllegalStateException("Cannot load the page file when it is already loaded.");
425 }
426 }
427
428
429 /**
430 * Unloads a previously loaded PageFile. This deallocates OS related resources like file handles.
431 * once unloaded, you can no longer use the page file to read or write Pages.
432 *
433 * @throws IOException if there was a disk error occurred while closing the down the page file.
434 * @throws IllegalStateException if the PageFile is not loaded
435 */
436 public void unload() throws IOException {
437 if (loaded.compareAndSet(true, false)) {
438 flush();
439 try {
440 stopWriter();
441 } catch (InterruptedException e) {
442 throw new InterruptedIOException();
443 }
444
445 if (freeList.isEmpty()) {
446 metaData.setFreePages(0);
447 } else {
448 storeFreeList();
449 metaData.setFreePages(freeList.size());
450 }
451
452 metaData.setLastTxId(nextTxid.get() - 1);
453 metaData.setCleanShutdown(true);
454 storeMetaData();
455
456 if (readFile != null) {
457 readFile.close();
458 readFile = null;
459 writeFile.close();
460 writeFile = null;
461 if (enableRecoveryFile) {
462 recoveryFile.close();
463 recoveryFile = null;
464 }
465 freeList.clear();
466 if (pageCache != null) {
467 pageCache = null;
468 }
469 synchronized (writes) {
470 writes.clear();
471 }
472 }
473 } else {
474 throw new IllegalStateException("Cannot unload the page file when it is not loaded");
475 }
476 }
477
478 public boolean isLoaded() {
479 return loaded.get();
480 }
481
482 /**
483 * Flush and sync all write buffers to disk.
484 *
485 * @throws IOException If an disk error occurred.
486 */
487 public void flush() throws IOException {
488
489 if (enabledWriteThread && stopWriter.get()) {
490 throw new IOException("Page file already stopped: checkpointing is not allowed");
491 }
492
493 // Setup a latch that gets notified when all buffered writes hits the disk.
494 CountDownLatch checkpointLatch;
495 synchronized (writes) {
496 if (writes.isEmpty()) {
497 return;
498 }
499 if (enabledWriteThread) {
500 if (this.checkpointLatch == null) {
501 this.checkpointLatch = new CountDownLatch(1);
502 }
503 checkpointLatch = this.checkpointLatch;
504 writes.notify();
505 } else {
506 writeBatch();
507 return;
508 }
509 }
510 try {
511 checkpointLatch.await();
512 } catch (InterruptedException e) {
513 InterruptedIOException ioe = new InterruptedIOException();
514 ioe.initCause(e);
515 throw ioe;
516 }
517 }
518
519
520 public String toString() {
521 return "Page File: " + getMainPageFile();
522 }
523
524 ///////////////////////////////////////////////////////////////////
525 // Private Implementation Methods
526 ///////////////////////////////////////////////////////////////////
527 private File getMainPageFile() {
528 return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
529 }
530
531 public File getFreeFile() {
532 return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
533 }
534
535 public File getRecoveryFile() {
536 return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
537 }
538
539 public long toOffset(long pageId) {
540 return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
541 }
542
543 private void loadMetaData() throws IOException {
544
545 ByteArrayInputStream is;
546 MetaData v1 = new MetaData();
547 MetaData v2 = new MetaData();
548 try {
549 Properties p = new Properties();
550 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
551 readFile.seek(0);
552 readFile.readFully(d);
553 is = new ByteArrayInputStream(d);
554 p.load(is);
555 IntrospectionSupport.setProperties(v1, p);
556 } catch (IOException e) {
557 v1 = null;
558 }
559
560 try {
561 Properties p = new Properties();
562 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
563 readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
564 readFile.readFully(d);
565 is = new ByteArrayInputStream(d);
566 p.load(is);
567 IntrospectionSupport.setProperties(v2, p);
568 } catch (IOException e) {
569 v2 = null;
570 }
571
572 if (v1 == null && v2 == null) {
573 throw new IOException("Could not load page file meta data");
574 }
575
576 if (v1 == null || v1.metaDataTxId < 0) {
577 metaData = v2;
578 } else if (v2 == null || v1.metaDataTxId < 0) {
579 metaData = v1;
580 } else if (v1.metaDataTxId == v2.metaDataTxId) {
581 metaData = v1; // use the first since the 2nd could be a partial..
582 } else {
583 metaData = v2; // use the second cause the first is probably a partial.
584 }
585 }
586
587 private void storeMetaData() throws IOException {
588 // Convert the metadata into a property format
589 metaData.metaDataTxId++;
590 Properties p = new Properties();
591 IntrospectionSupport.getProperties(metaData, p, null);
592
593 ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
594 p.store(os, "");
595 if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
596 throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
597 }
598 // Fill the rest with space...
599 byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
600 Arrays.fill(filler, (byte) ' ');
601 os.write(filler);
602 os.flush();
603
604 byte[] d = os.toByteArray();
605
606 // So we don't loose it.. write it 2 times...
607 writeFile.seek(0);
608 writeFile.write(d);
609 writeFile.getFD().sync();
610 writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
611 writeFile.write(d);
612 writeFile.getFD().sync();
613 }
614
615 private void storeFreeList() throws IOException {
616 FileOutputStream os = new FileOutputStream(getFreeFile());
617 DataOutputStream dos = new DataOutputStream(os);
618 SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
619 dos.close();
620 }
621
622 private void loadFreeList() throws IOException {
623 freeList.clear();
624 FileInputStream is = new FileInputStream(getFreeFile());
625 DataInputStream dis = new DataInputStream(is);
626 freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
627 dis.close();
628 }
629
630 ///////////////////////////////////////////////////////////////////
631 // Property Accessors
632 ///////////////////////////////////////////////////////////////////
633
634 /**
635 * Is the recovery buffer used to double buffer page writes. Enabled by default.
636 *
637 * @return is the recovery buffer enabled.
638 */
639 public boolean isEnableRecoveryFile() {
640 return enableRecoveryFile;
641 }
642
643 /**
644 * Sets if the recovery buffer uses to double buffer page writes. Enabled by default. Disabling this
645 * may potentially cause partial page writes which can lead to page file corruption.
646 */
647 public void setEnableRecoveryFile(boolean doubleBuffer) {
648 assertNotLoaded();
649 this.enableRecoveryFile = doubleBuffer;
650 }
651
652 /**
653 * @return Are page writes synced to disk?
654 */
655 public boolean isEnableDiskSyncs() {
656 return enableDiskSyncs;
657 }
658
659 /**
660 * Allows you enable syncing writes to disk.
661 */
662 public void setEnableDiskSyncs(boolean syncWrites) {
663 assertNotLoaded();
664 this.enableDiskSyncs = syncWrites;
665 }
666
667 /**
668 * @return the page size
669 */
670 public int getPageSize() {
671 return this.pageSize;
672 }
673
674 /**
675 * @return the amount of content data that a page can hold.
676 */
677 public int getPageContentSize() {
678 return this.pageSize - Page.PAGE_HEADER_SIZE;
679 }
680
681 /**
682 * Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk,
683 * subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting
684 * can no longer be changed.
685 *
686 * @param pageSize the pageSize to set
687 * @throws IllegalStateException once the page file is loaded.
688 */
689 public void setPageSize(int pageSize) throws IllegalStateException {
690 assertNotLoaded();
691 this.pageSize = pageSize;
692 }
693
694 /**
695 * @return true if read page caching is enabled
696 */
697 public boolean isEnablePageCaching() {
698 return this.enablePageCaching;
699 }
700
701 /**
702 * @param enablePageCaching allows you to enable read page caching
703 */
704 public void setEnablePageCaching(boolean enablePageCaching) {
705 assertNotLoaded();
706 this.enablePageCaching = enablePageCaching;
707 }
708
709 /**
710 * @return the maximum number of pages that will get stored in the read page cache.
711 */
712 public int getPageCacheSize() {
713 return this.pageCacheSize;
714 }
715
716 /**
717 * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
718 */
719 public void setPageCacheSize(int pageCacheSize) {
720 assertNotLoaded();
721 this.pageCacheSize = pageCacheSize;
722 }
723
724 public boolean isEnabledWriteThread() {
725 return enabledWriteThread;
726 }
727
728 public void setEnableWriteThread(boolean enableAsyncWrites) {
729 assertNotLoaded();
730 this.enabledWriteThread = enableAsyncWrites;
731 }
732
733 public long getDiskSize() throws IOException {
734 return toOffset(nextFreePageId.get());
735 }
736
737 /**
738 * @return the number of pages allocated in the PageFile
739 */
740 public long getPageCount() {
741 return nextFreePageId.get();
742 }
743
744 public int getRecoveryFileMinPageCount() {
745 return recoveryFileMinPageCount;
746 }
747
748 public long getFreePageCount() {
749 assertLoaded();
750 return freeList.rangeSize();
751 }
752
753 public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
754 assertNotLoaded();
755 this.recoveryFileMinPageCount = recoveryFileMinPageCount;
756 }
757
758 public int getRecoveryFileMaxPageCount() {
759 return recoveryFileMaxPageCount;
760 }
761
762 public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
763 assertNotLoaded();
764 this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
765 }
766
767 public int getWriteBatchSize() {
768 return writeBatchSize;
769 }
770
771 public void setWriteBatchSize(int writeBatchSize) {
772 this.writeBatchSize = writeBatchSize;
773 }
774
775 public float getLFUEvictionFactor() {
776 return LFUEvictionFactor;
777 }
778
779 public void setLFUEvictionFactor(float LFUEvictionFactor) {
780 this.LFUEvictionFactor = LFUEvictionFactor;
781 }
782
783 public boolean isUseLFRUEviction() {
784 return useLFRUEviction;
785 }
786
787 public void setUseLFRUEviction(boolean useLFRUEviction) {
788 this.useLFRUEviction = useLFRUEviction;
789 }
790
791 ///////////////////////////////////////////////////////////////////
792 // Package Protected Methods exposed to Transaction
793 ///////////////////////////////////////////////////////////////////
794
795 /**
796 * @throws IllegalStateException if the page file is not loaded.
797 */
798 void assertLoaded() throws IllegalStateException {
799 if (!loaded.get()) {
800 throw new IllegalStateException("PageFile is not loaded");
801 }
802 }
803
804 void assertNotLoaded() throws IllegalStateException {
805 if (loaded.get()) {
806 throw new IllegalStateException("PageFile is loaded");
807 }
808 }
809
810 /**
811 * Allocates a block of free pages that you can write data to.
812 *
813 * @param count the number of sequential pages to allocate
814 * @return the first page of the sequential set.
815 * @throws IOException If an disk error occurred.
816 * @throws IllegalStateException if the PageFile is not loaded
817 */
818 <T> Page<T> allocate(int count) throws IOException {
819 assertLoaded();
820 if (count <= 0) {
821 throw new IllegalArgumentException("The allocation count must be larger than zero");
822 }
823
824 Sequence seq = freeList.removeFirstSequence(count);
825
826 // We may need to create new free pages...
827 if (seq == null) {
828
829 Page<T> first = null;
830 int c = count;
831
832 // Perform the id's only once....
833 long pageId = nextFreePageId.getAndAdd(count);
834 long writeTxnId = nextTxid.getAndAdd(count);
835
836 while (c-- > 0) {
837 Page<T> page = new Page<T>(pageId++);
838 page.makeFree(writeTxnId++);
839
840 if (first == null) {
841 first = page;
842 }
843
844 addToCache(page);
845 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
846 page.write(out);
847 write(page, out.getData());
848
849 // LOG.debug("allocate writing: "+page.getPageId());
850 }
851
852 return first;
853 }
854
855 Page<T> page = new Page<T>(seq.getFirst());
856 page.makeFree(0);
857 // LOG.debug("allocated: "+page.getPageId());
858 return page;
859 }
860
861 long getNextWriteTransactionId() {
862 return nextTxid.incrementAndGet();
863 }
864
865 void readPage(long pageId, byte[] data) throws IOException {
866 readFile.seek(toOffset(pageId));
867 readFile.readFully(data);
868 }
869
870 public void freePage(long pageId) {
871 freeList.add(pageId);
872 removeFromCache(pageId);
873 }
874
875 @SuppressWarnings("unchecked")
876 private <T> void write(Page<T> page, byte[] data) throws IOException {
877 final PageWrite write = new PageWrite(page, data);
878 Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
879 public Long getKey() {
880 return write.getPage().getPageId();
881 }
882
883 public PageWrite getValue() {
884 return write;
885 }
886
887 public PageWrite setValue(PageWrite value) {
888 return null;
889 }
890 };
891 Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
892 write(Arrays.asList(entries));
893 }
894
895 void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
896 synchronized (writes) {
897 if (enabledWriteThread) {
898 while (writes.size() >= writeBatchSize && !stopWriter.get()) {
899 try {
900 writes.wait();
901 } catch (InterruptedException e) {
902 Thread.currentThread().interrupt();
903 throw new InterruptedIOException();
904 }
905 }
906 }
907
908 boolean longTx = false;
909
910 for (Map.Entry<Long, PageWrite> entry : updates) {
911 Long key = entry.getKey();
912 PageWrite value = entry.getValue();
913 PageWrite write = writes.get(key);
914 if (write == null) {
915 writes.put(key, value);
916 } else {
917 if (value.currentLocation != -1) {
918 write.setCurrentLocation(value.page, value.currentLocation, value.length);
919 write.tmpFile = value.tmpFile;
920 longTx = true;
921 } else {
922 write.setCurrent(value.page, value.current);
923 }
924 }
925 }
926
927 // Once we start approaching capacity, notify the writer to start writing
928 // sync immediately for long txs
929 if (longTx || canStartWriteBatch()) {
930
931 if (enabledWriteThread) {
932 writes.notify();
933 } else {
934 writeBatch();
935 }
936 }
937 }
938 }
939
940 private boolean canStartWriteBatch() {
941 int capacityUsed = ((writes.size() * 100) / writeBatchSize);
942 if (enabledWriteThread) {
943 // The constant 10 here controls how soon write batches start going to disk..
944 // would be nice to figure out how to auto tune that value. Make to small and
945 // we reduce through put because we are locking the write mutex too often doing writes
946 return capacityUsed >= 10 || checkpointLatch != null;
947 } else {
948 return capacityUsed >= 80 || checkpointLatch != null;
949 }
950 }
951
952 ///////////////////////////////////////////////////////////////////
953 // Cache Related operations
954 ///////////////////////////////////////////////////////////////////
955 @SuppressWarnings("unchecked")
956 <T> Page<T> getFromCache(long pageId) {
957 synchronized (writes) {
958 PageWrite pageWrite = writes.get(pageId);
959 if (pageWrite != null) {
960 return pageWrite.page;
961 }
962 }
963
964 Page<T> result = null;
965 if (enablePageCaching) {
966 result = pageCache.get(pageId);
967 }
968 return result;
969 }
970
971 void addToCache(Page page) {
972 if (enablePageCaching) {
973 pageCache.put(page.getPageId(), page);
974 }
975 }
976
977 void removeFromCache(long pageId) {
978 if (enablePageCaching) {
979 pageCache.remove(pageId);
980 }
981 }
982
983 ///////////////////////////////////////////////////////////////////
984 // Internal Double write implementation follows...
985 ///////////////////////////////////////////////////////////////////
986
987 private void pollWrites() {
988 try {
989 while (!stopWriter.get()) {
990 // Wait for a notification...
991 synchronized (writes) {
992 writes.notifyAll();
993
994 // If there is not enough to write, wait for a notification...
995 while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
996 writes.wait(100);
997 }
998
999 if (writes.isEmpty()) {
1000 releaseCheckpointWaiter();
1001 }
1002 }
1003 writeBatch();
1004 }
1005 } catch (Throwable e) {
1006 LOG.info("An exception was raised while performing poll writes", e);
1007 } finally {
1008 releaseCheckpointWaiter();
1009 }
1010 }
1011
1012 private void writeBatch() throws IOException {
1013
1014 CountDownLatch checkpointLatch;
1015 ArrayList<PageWrite> batch;
1016 synchronized (writes) {
1017 // If there is not enough to write, wait for a notification...
1018
1019 batch = new ArrayList<PageWrite>(writes.size());
1020 // build a write batch from the current write cache.
1021 for (PageWrite write : writes.values()) {
1022 batch.add(write);
1023 // Move the current write to the diskBound write, this lets folks update the
1024 // page again without blocking for this write.
1025 write.begin();
1026 if (write.diskBound == null && write.diskBoundLocation == -1) {
1027 batch.remove(write);
1028 }
1029 }
1030
1031 // Grab on to the existing checkpoint latch cause once we do this write we can
1032 // release the folks that were waiting for those writes to hit disk.
1033 checkpointLatch = this.checkpointLatch;
1034 this.checkpointLatch = null;
1035 }
1036
1037 Checksum checksum = new Adler32();
1038 if (enableRecoveryFile) {
1039 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1040 }
1041 for (PageWrite w : batch) {
1042 if (enableRecoveryFile) {
1043 try {
1044 checksum.update(w.getDiskBound(), 0, pageSize);
1045 } catch (Throwable t) {
1046 throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1047 }
1048 recoveryFile.writeLong(w.page.getPageId());
1049 recoveryFile.write(w.getDiskBound(), 0, pageSize);
1050 }
1051
1052 writeFile.seek(toOffset(w.page.getPageId()));
1053 writeFile.write(w.getDiskBound(), 0, pageSize);
1054 w.done();
1055 }
1056
1057 try {
1058 if (enableRecoveryFile) {
1059 // Can we shrink the recovery buffer??
1060 if (recoveryPageCount > recoveryFileMaxPageCount) {
1061 int t = Math.max(recoveryFileMinPageCount, batch.size());
1062 recoveryFile.setLength(recoveryFileSizeForPages(t));
1063 }
1064
1065 // Record the page writes in the recovery buffer.
1066 recoveryFile.seek(0);
1067 // Store the next tx id...
1068 recoveryFile.writeLong(nextTxid.get());
1069 // Store the checksum for thw write batch so that on recovery we
1070 // know if we have a consistent
1071 // write batch on disk.
1072 recoveryFile.writeLong(checksum.getValue());
1073 // Write the # of pages that will follow
1074 recoveryFile.writeInt(batch.size());
1075 }
1076
1077 if (enableDiskSyncs) {
1078 // Sync to make sure recovery buffer writes land on disk..
1079 if (enableRecoveryFile) {
1080 recoveryFile.getFD().sync();
1081 }
1082 writeFile.getFD().sync();
1083 }
1084 } finally {
1085 synchronized (writes) {
1086 for (PageWrite w : batch) {
1087 // If there are no more pending writes, then remove it from
1088 // the write cache.
1089 if (w.isDone()) {
1090 writes.remove(w.page.getPageId());
1091 if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1092 if (!w.tmpFile.delete()) {
1093 throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1094 }
1095 tmpFilesForRemoval.remove(w.tmpFile);
1096 }
1097 }
1098 }
1099 }
1100
1101 if (checkpointLatch != null) {
1102 checkpointLatch.countDown();
1103 }
1104 }
1105 }
1106
1107 public void removeTmpFile(File file) {
1108 tmpFilesForRemoval.add(file);
1109 }
1110
1111 private long recoveryFileSizeForPages(int pageCount) {
1112 return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
1113 }
1114
1115 private void releaseCheckpointWaiter() {
1116 if (checkpointLatch != null) {
1117 checkpointLatch.countDown();
1118 checkpointLatch = null;
1119 }
1120 }
1121
1122 /**
1123 * Inspects the recovery buffer and re-applies any
1124 * partially applied page writes.
1125 *
1126 * @return the next transaction id that can be used.
1127 */
1128 private long redoRecoveryUpdates() throws IOException {
1129 if (!enableRecoveryFile) {
1130 return 0;
1131 }
1132 recoveryPageCount = 0;
1133
1134 // Are we initializing the recovery file?
1135 if (recoveryFile.length() == 0) {
1136 // Write an empty header..
1137 recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1138 // Preallocate the minium size for better performance.
1139 recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1140 return 0;
1141 }
1142
1143 // How many recovery pages do we have in the recovery buffer?
1144 recoveryFile.seek(0);
1145 long nextTxId = recoveryFile.readLong();
1146 long expectedChecksum = recoveryFile.readLong();
1147 int pageCounter = recoveryFile.readInt();
1148
1149 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1150 Checksum checksum = new Adler32();
1151 LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1152 try {
1153 for (int i = 0; i < pageCounter; i++) {
1154 long offset = recoveryFile.readLong();
1155 byte[] data = new byte[pageSize];
1156 if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1157 // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1158 return nextTxId;
1159 }
1160 checksum.update(data, 0, pageSize);
1161 batch.put(offset, data);
1162 }
1163 } catch (Exception e) {
1164 // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1165 // as the pages should still be consistent.
1166 LOG.debug("Redo buffer was not fully intact: ", e);
1167 return nextTxId;
1168 }
1169
1170 recoveryPageCount = pageCounter;
1171
1172 // If the checksum is not valid then the recovery buffer was partially written to disk.
1173 if (checksum.getValue() != expectedChecksum) {
1174 return nextTxId;
1175 }
1176
1177 // Re-apply all the writes in the recovery buffer.
1178 for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1179 writeFile.seek(toOffset(e.getKey()));
1180 writeFile.write(e.getValue());
1181 }
1182
1183 // And sync it to disk
1184 writeFile.getFD().sync();
1185 return nextTxId;
1186 }
1187
1188 private void startWriter() {
1189 synchronized (writes) {
1190 if (enabledWriteThread) {
1191 stopWriter.set(false);
1192 writerThread = new Thread("KahaDB Page Writer") {
1193 @Override
1194 public void run() {
1195 pollWrites();
1196 }
1197 };
1198 writerThread.setPriority(Thread.MAX_PRIORITY);
1199 writerThread.setDaemon(true);
1200 writerThread.start();
1201 }
1202 }
1203 }
1204
1205 private void stopWriter() throws InterruptedException {
1206 if (enabledWriteThread) {
1207 stopWriter.set(true);
1208 writerThread.join();
1209 }
1210 }
1211
1212 public File getFile() {
1213 return getMainPageFile();
1214 }
1215
1216 public File getDirectory() {
1217 return directory;
1218 }
1219 }