001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.kahadb.page;
018
019 import org.apache.kahadb.page.PageFile.PageWrite;
020 import org.apache.kahadb.util.*;
021
022 import java.io.*;
023 import java.util.Iterator;
024 import java.util.NoSuchElementException;
025 import java.util.TreeMap;
026
027 /**
028 * The class used to read/update a PageFile object. Using a transaction allows you to
029 * do multiple update operations in a single unit of work.
030 */
031 public class Transaction implements Iterable<Page> {
032
033 private RandomAccessFile tmpFile;
034 private File txFile;
035 private long nextLocation = 0;
036
037 /**
038 * The PageOverflowIOException occurs when a page write is requested
039 * and it's data is larger than what would fit into a single page.
040 */
041 public class PageOverflowIOException extends IOException {
042 private static final long serialVersionUID = 1L;
043
044 public PageOverflowIOException(String message) {
045 super(message);
046 }
047 }
048
049 /**
050 * The InvalidPageIOException is thrown if try to load/store a a page
051 * with an invalid page id.
052 */
053 public class InvalidPageIOException extends IOException {
054 private static final long serialVersionUID = 1L;
055
056 private final long page;
057
058 public InvalidPageIOException(String message, long page) {
059 super(message);
060 this.page = page;
061 }
062
063 public long getPage() {
064 return page;
065 }
066 }
067
068 /**
069 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
070 *
071 * @param <T> The type of exceptions that operation will throw.
072 */
073 public interface Closure <T extends Throwable> {
074 public void execute(Transaction tx) throws T;
075 }
076
077 /**
078 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
079 *
080 * @param <R> The type of result that the closure produces.
081 * @param <T> The type of exceptions that operation will throw.
082 */
083 public interface CallableClosure<R, T extends Throwable> {
084 public R execute(Transaction tx) throws T;
085 }
086
087
088 // The page file that this Transaction operates against.
089 private final PageFile pageFile;
090 // If this transaction is updating stuff.. this is the tx of
091 private long writeTransactionId=-1;
092 // List of pages that this transaction has modified.
093 private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
094 // List of pages allocated in this transaction
095 private final SequenceSet allocateList = new SequenceSet();
096 // List of pages freed in this transaction
097 private final SequenceSet freeList = new SequenceSet();
098
099 private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L);
100
101 private long size = 0;
102
103 Transaction(PageFile pageFile) {
104 this.pageFile = pageFile;
105 }
106
107 /**
108 * @return the page file that created this Transaction
109 */
110 public PageFile getPageFile() {
111 return this.pageFile;
112 }
113
114 /**
115 * Allocates a free page that you can write data to.
116 *
117 * @return a newly allocated page.
118 * @throws IOException
119 * If an disk error occurred.
120 * @throws IllegalStateException
121 * if the PageFile is not loaded
122 */
123 public <T> Page<T> allocate() throws IOException {
124 return allocate(1);
125 }
126
127 /**
128 * Allocates a block of free pages that you can write data to.
129 *
130 * @param count the number of sequential pages to allocate
131 * @return the first page of the sequential set.
132 * @throws IOException
133 * If an disk error occurred.
134 * @throws IllegalStateException
135 * if the PageFile is not loaded
136 */
137 public <T> Page<T> allocate(int count) throws IOException {
138 Page<T> rc = pageFile.allocate(count);
139 allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
140 return rc;
141 }
142
143 /**
144 * Frees up a previously allocated page so that it can be re-allocated again.
145 *
146 * @param pageId the page to free up
147 * @throws IOException
148 * If an disk error occurred.
149 * @throws IllegalStateException
150 * if the PageFile is not loaded
151 */
152 public void free(long pageId) throws IOException {
153 free(load(pageId, null));
154 }
155
156 /**
157 * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
158 *
159 * @param pageId the initial page of the sequence that will be getting freed
160 * @param count the number of pages in the sequence
161 *
162 * @throws IOException
163 * If an disk error occurred.
164 * @throws IllegalStateException
165 * if the PageFile is not loaded
166 */
167 public void free(long pageId, int count) throws IOException {
168 free(load(pageId, null), count);
169 }
170
171 /**
172 * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
173 *
174 * @param page the initial page of the sequence that will be getting freed
175 * @param count the number of pages in the sequence
176 *
177 * @throws IOException
178 * If an disk error occurred.
179 * @throws IllegalStateException
180 * if the PageFile is not loaded
181 */
182 public <T> void free(Page<T> page, int count) throws IOException {
183 pageFile.assertLoaded();
184 long offsetPage = page.getPageId();
185 while (count-- > 0) {
186 if (page == null) {
187 page = load(offsetPage, null);
188 }
189 free(page);
190 page = null;
191 // Increment the offsetPage value since using it depends on the current count.
192 offsetPage++;
193 }
194 }
195
196 /**
197 * Frees up a previously allocated page so that it can be re-allocated again.
198 *
199 * @param page the page to free up
200 * @throws IOException
201 * If an disk error occurred.
202 * @throws IllegalStateException
203 * if the PageFile is not loaded
204 */
205 public <T> void free(Page<T> page) throws IOException {
206 pageFile.assertLoaded();
207
208 // We may need loop to free up a page chain.
209 while (page != null) {
210
211 // Is it already free??
212 if (page.getType() == Page.PAGE_FREE_TYPE) {
213 return;
214 }
215
216 Page<T> next = null;
217 if (page.getType() == Page.PAGE_PART_TYPE) {
218 next = load(page.getNext(), null);
219 }
220
221 page.makeFree(getWriteTransactionId());
222 // ensure free page is visible while write is pending
223 pageFile.addToCache(page.copy());
224
225 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
226 page.write(out);
227 write(page, out.getData());
228
229 freeList.add(page.getPageId());
230 page = next;
231 }
232 }
233
234 /**
235 *
236 * @param page
237 * the page to write. The Page object must be fully populated with a valid pageId, type, and data.
238 * @param marshaller
239 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
240 * @param overflow
241 * If true, then if the page data marshalls to a bigger size than can fit in one page, then additional
242 * overflow pages are automatically allocated and chained to this page to store all the data. If false,
243 * and the overflow condition would occur, then the PageOverflowIOException is thrown.
244 * @throws IOException
245 * If an disk error occurred.
246 * @throws PageOverflowIOException
247 * If the page data marshalls to size larger than maximum page size and overflow was false.
248 * @throws IllegalStateException
249 * if the PageFile is not loaded
250 */
251 public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
252 DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
253 if (marshaller != null) {
254 marshaller.writePayload(page.get(), out);
255 }
256 out.close();
257 }
258
259 /**
260 * @throws IOException
261 */
262 public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
263 pageFile.assertLoaded();
264
265 // Copy to protect against the end user changing
266 // the page instance while we are doing a write.
267 final Page copy = page.copy();
268 pageFile.addToCache(copy);
269
270 //
271 // To support writing VERY large data, we override the output stream so
272 // that we
273 // we do the page writes incrementally while the data is being
274 // marshalled.
275 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
276 Page current = copy;
277
278 @SuppressWarnings("unchecked")
279 @Override
280 protected void onWrite() throws IOException {
281
282 // Are we at an overflow condition?
283 final int pageSize = pageFile.getPageSize();
284 if (pos >= pageSize) {
285 // If overflow is allowed
286 if (overflow) {
287
288 do {
289 Page next;
290 if (current.getType() == Page.PAGE_PART_TYPE) {
291 next = load(current.getNext(), null);
292 } else {
293 next = allocate();
294 }
295
296 next.txId = current.txId;
297
298 // Write the page header
299 int oldPos = pos;
300 pos = 0;
301
302 current.makePagePart(next.getPageId(), getWriteTransactionId());
303 current.write(this);
304
305 // Do the page write..
306 byte[] data = new byte[pageSize];
307 System.arraycopy(buf, 0, data, 0, pageSize);
308 Transaction.this.write(current, data);
309
310 // Reset for the next page chunk
311 pos = 0;
312 // The page header marshalled after the data is written.
313 skip(Page.PAGE_HEADER_SIZE);
314 // Move the overflow data after the header.
315 System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
316 pos += oldPos - pageSize;
317 current = next;
318
319 } while (pos > pageSize);
320 } else {
321 throw new PageOverflowIOException("Page overflow.");
322 }
323 }
324
325 }
326
327 @Override
328 public void close() throws IOException {
329 super.close();
330
331 // We need to free up the rest of the page chain..
332 if (current.getType() == Page.PAGE_PART_TYPE) {
333 free(current.getNext());
334 }
335
336 current.makePageEnd(pos, getWriteTransactionId());
337
338 // Write the header..
339 pos = 0;
340 current.write(this);
341
342 Transaction.this.write(current, buf);
343 }
344 };
345
346 // The page header marshaled after the data is written.
347 out.skip(Page.PAGE_HEADER_SIZE);
348 return out;
349 }
350
351 /**
352 * Loads a page from disk.
353 *
354 * @param pageId
355 * the id of the page to load
356 * @param marshaller
357 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
358 * @return The page with the given id
359 * @throws IOException
360 * If an disk error occurred.
361 * @throws IllegalStateException
362 * if the PageFile is not loaded
363 */
364 public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
365 pageFile.assertLoaded();
366 Page<T> page = new Page<T>(pageId);
367 load(page, marshaller);
368 return page;
369 }
370
371 /**
372 * Loads a page from disk.
373 *
374 * @param page - The pageId field must be properly set
375 * @param marshaller
376 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
377 * @throws IOException
378 * If an disk error occurred.
379 * @throws InvalidPageIOException
380 * If the page is is not valid.
381 * @throws IllegalStateException
382 * if the PageFile is not loaded
383 */
384 @SuppressWarnings("unchecked")
385 public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
386 pageFile.assertLoaded();
387
388 // Can't load invalid offsets...
389 long pageId = page.getPageId();
390 if (pageId < 0) {
391 throw new InvalidPageIOException("Page id is not valid", pageId);
392 }
393
394 // It might be a page this transaction has modified...
395 PageWrite update = writes.get(pageId);
396 if (update != null) {
397 page.copy(update.getPage());
398 return;
399 }
400
401 // We may be able to get it from the cache...
402 Page<T> t = pageFile.getFromCache(pageId);
403 if (t != null) {
404 page.copy(t);
405 return;
406 }
407
408 if (marshaller != null) {
409 // Full page read..
410 InputStream is = openInputStream(page);
411 DataInputStream dataIn = new DataInputStream(is);
412 page.set(marshaller.readPayload(dataIn));
413 is.close();
414 } else {
415 // Page header read.
416 DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
417 pageFile.readPage(pageId, in.getRawData());
418 page.read(in);
419 page.set(null);
420 }
421
422 // Cache it.
423 if (marshaller != null) {
424 pageFile.addToCache(page);
425 }
426 }
427
428 /**
429 * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
430 * org.apache.kahadb.util.Marshaller)
431 */
432 public InputStream openInputStream(final Page p) throws IOException {
433
434 return new InputStream() {
435
436 private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
437 private Page page = readPage(p);
438 private int pageCount = 1;
439
440 private Page markPage;
441 private ByteSequence markChunk;
442
443 private Page readPage(Page page) throws IOException {
444 // Read the page data
445
446 pageFile.readPage(page.getPageId(), chunk.getData());
447
448 chunk.setOffset(0);
449 chunk.setLength(pageFile.getPageSize());
450
451 DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
452 page.read(in);
453
454 chunk.setOffset(Page.PAGE_HEADER_SIZE);
455 if (page.getType() == Page.PAGE_END_TYPE) {
456 chunk.setLength((int)(page.getNext()));
457 }
458
459 if (page.getType() == Page.PAGE_FREE_TYPE) {
460 throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
461 }
462
463 return page;
464 }
465
466 public int read() throws IOException {
467 if (!atEOF()) {
468 return chunk.data[chunk.offset++] & 0xff;
469 } else {
470 return -1;
471 }
472 }
473
474 private boolean atEOF() throws IOException {
475 if (chunk.offset < chunk.length) {
476 return false;
477 }
478 if (page.getType() == Page.PAGE_END_TYPE) {
479 return true;
480 }
481 fill();
482 return chunk.offset >= chunk.length;
483 }
484
485 private void fill() throws IOException {
486 page = readPage(new Page(page.getNext()));
487 pageCount++;
488 }
489
490 public int read(byte[] b) throws IOException {
491 return read(b, 0, b.length);
492 }
493
494 public int read(byte b[], int off, int len) throws IOException {
495 if (!atEOF()) {
496 int rc = 0;
497 while (!atEOF() && rc < len) {
498 len = Math.min(len, chunk.length - chunk.offset);
499 if (len > 0) {
500 System.arraycopy(chunk.data, chunk.offset, b, off, len);
501 chunk.offset += len;
502 }
503 rc += len;
504 }
505 return rc;
506 } else {
507 return -1;
508 }
509 }
510
511 public long skip(long len) throws IOException {
512 if (atEOF()) {
513 int rc = 0;
514 while (!atEOF() && rc < len) {
515 len = Math.min(len, chunk.length - chunk.offset);
516 if (len > 0) {
517 chunk.offset += len;
518 }
519 rc += len;
520 }
521 return rc;
522 } else {
523 return -1;
524 }
525 }
526
527 public int available() {
528 return chunk.length - chunk.offset;
529 }
530
531 public boolean markSupported() {
532 return true;
533 }
534
535 public void mark(int markpos) {
536 markPage = page;
537 byte data[] = new byte[pageFile.getPageSize()];
538 System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
539 markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
540 }
541
542 public void reset() {
543 page = markPage;
544 chunk = markChunk;
545 }
546
547 };
548 }
549
550 /**
551 * Allows you to iterate through all active Pages in this object. Pages with type Page.FREE_TYPE are
552 * not included in this iteration.
553 *
554 * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
555 *
556 * @throws IllegalStateException
557 * if the PageFile is not loaded
558 */
559 public Iterator<Page> iterator() {
560 return (Iterator<Page>)iterator(false);
561 }
562
563 /**
564 * Allows you to iterate through all active Pages in this object. You can optionally include free pages in the pages
565 * iterated.
566 *
567 * @param includeFreePages - if true, free pages are included in the iteration
568 * @throws IllegalStateException
569 * if the PageFile is not loaded
570 */
571 public Iterator<Page> iterator(final boolean includeFreePages) {
572
573 pageFile.assertLoaded();
574
575 return new Iterator<Page>() {
576
577 long nextId;
578 Page nextPage;
579 Page lastPage;
580
581 private void findNextPage() {
582 if (!pageFile.isLoaded()) {
583 throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
584 }
585
586 if (nextPage != null) {
587 return;
588 }
589
590 try {
591 while (nextId < pageFile.getPageCount()) {
592
593 Page page = load(nextId, null);
594
595 if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
596 nextPage = page;
597 return;
598 } else {
599 nextId++;
600 }
601 }
602 } catch (IOException e) {
603 }
604 }
605
606 public boolean hasNext() {
607 findNextPage();
608 return nextPage != null;
609 }
610
611 public Page next() {
612 findNextPage();
613 if (nextPage != null) {
614 lastPage = nextPage;
615 nextPage = null;
616 nextId++;
617 return lastPage;
618 } else {
619 throw new NoSuchElementException();
620 }
621 }
622
623 @SuppressWarnings("unchecked")
624 public void remove() {
625 if (lastPage == null) {
626 throw new IllegalStateException();
627 }
628 try {
629 free(lastPage);
630 lastPage = null;
631 } catch (IOException e) {
632 throw new RuntimeException(e);
633 }
634 }
635 };
636 }
637
638 ///////////////////////////////////////////////////////////////////
639 // Commit / Rollback related methods..
640 ///////////////////////////////////////////////////////////////////
641
642 /**
643 * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
644 * with the transaction are written to disk or none will.
645 */
646 public void commit() throws IOException {
647 if( writeTransactionId!=-1 ) {
648 if (tmpFile != null) {
649 tmpFile.close();
650 pageFile.removeTmpFile(getTempFile());
651 tmpFile = null;
652 txFile = null;
653 }
654 // Actually do the page writes...
655 pageFile.write(writes.entrySet());
656 // Release the pages that were freed up in the transaction..
657 freePages(freeList);
658
659 freeList.clear();
660 allocateList.clear();
661 writes.clear();
662 writeTransactionId = -1;
663 }
664 size = 0;
665 }
666
667 /**
668 * Rolls back the transaction.
669 */
670 public void rollback() throws IOException {
671 if( writeTransactionId!=-1 ) {
672 if (tmpFile != null) {
673 tmpFile.close();
674 pageFile.removeTmpFile(getTempFile());
675 tmpFile = null;
676 txFile = null;
677 }
678 // Release the pages that were allocated in the transaction...
679 freePages(allocateList);
680
681 freeList.clear();
682 allocateList.clear();
683 writes.clear();
684 writeTransactionId = -1;
685 }
686 size = 0;
687 }
688
689 private long getWriteTransactionId() {
690 if( writeTransactionId==-1 ) {
691 writeTransactionId = pageFile.getNextWriteTransactionId();
692 }
693 return writeTransactionId;
694 }
695
696
697 protected File getTempFile() {
698 if (txFile == null) {
699 txFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName("tx-"+ Long.toString(getWriteTransactionId()) + "-" + Long.toString(System.currentTimeMillis()) + ".tmp"));
700 }
701 return txFile;
702 }
703
704 /**
705 * Queues up a page write that should get done when commit() gets called.
706 */
707 private void write(final Page page, byte[] data) throws IOException {
708 Long key = page.getPageId();
709
710 // how much pages we have for this transaction
711 size = writes.size() * pageFile.getPageSize();
712
713 PageWrite write;
714
715 if (size > maxTransactionSize) {
716 if (tmpFile == null) {
717 tmpFile = new RandomAccessFile(getTempFile(), "rw");
718 }
719 long location = nextLocation;
720 tmpFile.seek(nextLocation);
721 tmpFile.write(data);
722 nextLocation = location + data.length;
723 write = new PageWrite(page, location, data.length, getTempFile());
724 } else {
725 write = new PageWrite(page, data);
726 }
727 writes.put(key, write);
728 }
729
730 /**
731 * @param list
732 * @throws RuntimeException
733 */
734 private void freePages(SequenceSet list) throws RuntimeException {
735 Sequence seq = list.getHead();
736 while( seq!=null ) {
737 seq.each(new Sequence.Closure<RuntimeException>(){
738 public void execute(long value) {
739 pageFile.freePage(value);
740 }
741 });
742 seq = seq.getNext();
743 }
744 }
745
746 /**
747 * @return true if there are no uncommitted page file updates associated with this transaction.
748 */
749 public boolean isReadOnly() {
750 return writeTransactionId==-1;
751 }
752
753 ///////////////////////////////////////////////////////////////////
754 // Transaction closure helpers...
755 ///////////////////////////////////////////////////////////////////
756
757 /**
758 * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
759 * If the closure throws an Exception, then the transaction is rolled back.
760 *
761 * @param <T>
762 * @param closure - the work to get exectued.
763 * @throws T if the closure throws it
764 * @throws IOException If the commit fails.
765 */
766 public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
767 boolean success = false;
768 try {
769 closure.execute(this);
770 success = true;
771 } finally {
772 if (success) {
773 commit();
774 } else {
775 rollback();
776 }
777 }
778 }
779
780 /**
781 * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
782 * If the closure throws an Exception, then the transaction is rolled back.
783 *
784 * @param <T>
785 * @param closure - the work to get exectued.
786 * @throws T if the closure throws it
787 * @throws IOException If the commit fails.
788 */
789 public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
790 boolean success = false;
791 try {
792 R rc = closure.execute(this);
793 success = true;
794 return rc;
795 } finally {
796 if (success) {
797 commit();
798 } else {
799 rollback();
800 }
801 }
802 }
803 }