001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.File;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.OutputStream;
025 import java.util.ArrayList;
026 import java.util.HashMap;
027 import java.util.HashSet;
028 import java.util.Iterator;
029 import java.util.LinkedHashMap;
030 import java.util.TreeMap;
031 import java.util.Map.Entry;
032 import java.util.concurrent.atomic.AtomicBoolean;
033
034 import org.apache.activemq.command.SubscriptionInfo;
035 import org.apache.activemq.command.TransactionId;
036 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
037 import org.apache.activemq.store.kahadb.data.KahaDestination;
038 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
039 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
040 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
041 import org.apache.activemq.util.ByteSequence;
042 import org.slf4j.Logger;
043 import org.slf4j.LoggerFactory;
044 import org.apache.kahadb.index.BTreeIndex;
045 import org.apache.kahadb.page.PageFile;
046 import org.apache.kahadb.page.Transaction;
047 import org.apache.kahadb.util.LongMarshaller;
048 import org.apache.kahadb.util.Marshaller;
049 import org.apache.kahadb.util.StringMarshaller;
050 import org.apache.kahadb.util.VariableMarshaller;
051
052 public class TempMessageDatabase {
053
054 private static final Logger LOG = LoggerFactory.getLogger(TempMessageDatabase.class);
055
056 public static final int CLOSED_STATE = 1;
057 public static final int OPEN_STATE = 2;
058
059 protected BTreeIndex<String, StoredDestination> destinations;
060 protected PageFile pageFile;
061
062 protected File directory;
063
064 boolean enableIndexWriteAsync = true;
065 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
066
067 protected AtomicBoolean started = new AtomicBoolean();
068 protected AtomicBoolean opened = new AtomicBoolean();
069
070 public TempMessageDatabase() {
071 }
072
073 public void start() throws Exception {
074 if (started.compareAndSet(false, true)) {
075 load();
076 }
077 }
078
079 public void stop() throws Exception {
080 if (started.compareAndSet(true, false)) {
081 unload();
082 }
083 }
084
085 private void loadPageFile() throws IOException {
086 synchronized (indexMutex) {
087 final PageFile pageFile = getPageFile();
088 pageFile.load();
089 pageFile.tx().execute(new Transaction.Closure<IOException>() {
090 public void execute(Transaction tx) throws IOException {
091 destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
092 destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
093 destinations.setValueMarshaller(new StoredDestinationMarshaller());
094 destinations.load(tx);
095 }
096 });
097 pageFile.flush();
098 storedDestinations.clear();
099 }
100 }
101
102 /**
103 * @throws IOException
104 */
105 public void open() throws IOException {
106 if( opened.compareAndSet(false, true) ) {
107 loadPageFile();
108 }
109 }
110
111 public void load() throws IOException {
112 synchronized (indexMutex) {
113 open();
114 pageFile.unload();
115 pageFile.delete();
116 loadPageFile();
117 }
118 }
119
120
121 public void close() throws IOException, InterruptedException {
122 if( opened.compareAndSet(true, false)) {
123 synchronized (indexMutex) {
124 pageFile.unload();
125 }
126 }
127 }
128
129 public void unload() throws IOException, InterruptedException {
130 synchronized (indexMutex) {
131 if( pageFile.isLoaded() ) {
132 close();
133 }
134 }
135 }
136
137 public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException {
138 if (txid!=null) {
139 synchronized (indexMutex) {
140 ArrayList<Operation> inflightTx = getInflightTx(txid);
141 inflightTx.add(new AddOpperation(command, data));
142 }
143 } else {
144 synchronized (indexMutex) {
145 pageFile.tx().execute(new Transaction.Closure<IOException>() {
146 public void execute(Transaction tx) throws IOException {
147 upadateIndex(tx, command, data);
148 }
149 });
150 }
151 }
152 }
153
154 public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException {
155 if (txid!=null) {
156 synchronized (indexMutex) {
157 ArrayList<Operation> inflightTx = getInflightTx(txid);
158 inflightTx.add(new RemoveOpperation(command));
159 }
160 } else {
161 synchronized (indexMutex) {
162 pageFile.tx().execute(new Transaction.Closure<IOException>() {
163 public void execute(Transaction tx) throws IOException {
164 updateIndex(tx, command);
165 }
166 });
167 }
168 }
169
170 }
171
172 public void process(final KahaRemoveDestinationCommand command) throws IOException {
173 synchronized (indexMutex) {
174 pageFile.tx().execute(new Transaction.Closure<IOException>() {
175 public void execute(Transaction tx) throws IOException {
176 updateIndex(tx, command);
177 }
178 });
179 }
180 }
181
182 public void process(final KahaSubscriptionCommand command) throws IOException {
183 synchronized (indexMutex) {
184 pageFile.tx().execute(new Transaction.Closure<IOException>() {
185 public void execute(Transaction tx) throws IOException {
186 updateIndex(tx, command);
187 }
188 });
189 }
190 }
191
192 public void processCommit(TransactionId key) throws IOException {
193 synchronized (indexMutex) {
194 ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
195 if (inflightTx == null) {
196 inflightTx = preparedTransactions.remove(key);
197 }
198 if (inflightTx == null) {
199 return;
200 }
201
202 final ArrayList<Operation> messagingTx = inflightTx;
203 pageFile.tx().execute(new Transaction.Closure<IOException>() {
204 public void execute(Transaction tx) throws IOException {
205 for (Operation op : messagingTx) {
206 op.execute(tx);
207 }
208 }
209 });
210 }
211 }
212
213 public void processPrepare(TransactionId key) {
214 synchronized (indexMutex) {
215 ArrayList<Operation> tx = inflightTransactions.remove(key);
216 if (tx != null) {
217 preparedTransactions.put(key, tx);
218 }
219 }
220 }
221
222 public void processRollback(TransactionId key) {
223 synchronized (indexMutex) {
224 ArrayList<Operation> tx = inflightTransactions.remove(key);
225 if (tx == null) {
226 preparedTransactions.remove(key);
227 }
228 }
229 }
230
231 // /////////////////////////////////////////////////////////////////
232 // These methods do the actual index updates.
233 // /////////////////////////////////////////////////////////////////
234
235 protected final Object indexMutex = new Object();
236 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
237
238 private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException {
239 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
240
241 // Skip adding the message to the index if this is a topic and there are
242 // no subscriptions.
243 if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
244 return;
245 }
246
247 // Add the message.
248 long id = sd.nextMessageId++;
249 Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
250 if( previous == null ) {
251 sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data));
252 } else {
253 // restore the previous value.. Looks like this was a redo of a previously
254 // added message. We don't want to assing it a new id as the other indexes would
255 // be wrong..
256 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
257 }
258 }
259
260 private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException {
261 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
262 if (!command.hasSubscriptionKey()) {
263
264 // In the queue case we just remove the message from the index..
265 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
266 if (sequenceId != null) {
267 sd.orderIndex.remove(tx, sequenceId);
268 }
269 } else {
270 // In the topic case we need remove the message once it's been acked
271 // by all the subs
272 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
273
274 // Make sure it's a valid message id...
275 if (sequence != null) {
276 String subscriptionKey = command.getSubscriptionKey();
277 Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
278
279 // The following method handles deleting un-referenced messages.
280 removeAckByteSequence(tx, sd, subscriptionKey, prev);
281
282 // Add it to the new location set.
283 addAckByteSequence(sd, sequence, subscriptionKey);
284 }
285
286 }
287 }
288
289 private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException {
290 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
291 sd.orderIndex.clear(tx);
292 sd.orderIndex.unload(tx);
293 tx.free(sd.orderIndex.getPageId());
294
295 sd.messageIdIndex.clear(tx);
296 sd.messageIdIndex.unload(tx);
297 tx.free(sd.messageIdIndex.getPageId());
298
299 if (sd.subscriptions != null) {
300 sd.subscriptions.clear(tx);
301 sd.subscriptions.unload(tx);
302 tx.free(sd.subscriptions.getPageId());
303
304 sd.subscriptionAcks.clear(tx);
305 sd.subscriptionAcks.unload(tx);
306 tx.free(sd.subscriptionAcks.getPageId());
307 }
308
309 String key = key(command.getDestination());
310 storedDestinations.remove(key);
311 destinations.remove(tx, key);
312 }
313
314 private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException {
315 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
316
317 // If set then we are creating it.. otherwise we are destroying the sub
318 if (command.hasSubscriptionInfo()) {
319 String subscriptionKey = command.getSubscriptionKey();
320 sd.subscriptions.put(tx, subscriptionKey, command);
321 long ackByteSequence=-1;
322 if (!command.getRetroactive()) {
323 ackByteSequence = sd.nextMessageId-1;
324 }
325
326 sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence);
327 addAckByteSequence(sd, ackByteSequence, subscriptionKey);
328 } else {
329 // delete the sub...
330 String subscriptionKey = command.getSubscriptionKey();
331 sd.subscriptions.remove(tx, subscriptionKey);
332 Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
333 if( prev!=null ) {
334 removeAckByteSequence(tx, sd, subscriptionKey, prev);
335 }
336 }
337
338 }
339
340 public HashSet<Integer> getJournalFilesBeingReplicated() {
341 return journalFilesBeingReplicated;
342 }
343
344 // /////////////////////////////////////////////////////////////////
345 // StoredDestination related implementation methods.
346 // /////////////////////////////////////////////////////////////////
347
348
349 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
350
351 class StoredSubscription {
352 SubscriptionInfo subscriptionInfo;
353 String lastAckId;
354 ByteSequence lastAckByteSequence;
355 ByteSequence cursor;
356 }
357
358 static class MessageRecord {
359 final String messageId;
360 final ByteSequence data;
361
362 public MessageRecord(String messageId, ByteSequence location) {
363 this.messageId=messageId;
364 this.data=location;
365 }
366
367 @Override
368 public String toString() {
369 return "["+messageId+","+data+"]";
370 }
371 }
372
373 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageRecord> {
374 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
375
376 public MessageRecord readPayload(DataInput dataIn) throws IOException {
377 return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn));
378 }
379
380 public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException {
381 dataOut.writeUTF(object.messageId);
382 ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut);
383 }
384 }
385
386 static class StoredDestination {
387 long nextMessageId;
388 BTreeIndex<Long, MessageRecord> orderIndex;
389 BTreeIndex<String, Long> messageIdIndex;
390
391 // These bits are only set for Topics
392 BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
393 BTreeIndex<String, Long> subscriptionAcks;
394 HashMap<String, Long> subscriptionCursors;
395 TreeMap<Long, HashSet<String>> ackPositions;
396 }
397
398 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
399 public Class<StoredDestination> getType() {
400 return StoredDestination.class;
401 }
402
403 public StoredDestination readPayload(DataInput dataIn) throws IOException {
404 StoredDestination value = new StoredDestination();
405 value.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, dataIn.readLong());
406 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
407
408 if (dataIn.readBoolean()) {
409 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
410 value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
411 }
412 return value;
413 }
414
415 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
416 dataOut.writeLong(value.orderIndex.getPageId());
417 dataOut.writeLong(value.messageIdIndex.getPageId());
418 if (value.subscriptions != null) {
419 dataOut.writeBoolean(true);
420 dataOut.writeLong(value.subscriptions.getPageId());
421 dataOut.writeLong(value.subscriptionAcks.getPageId());
422 } else {
423 dataOut.writeBoolean(false);
424 }
425 }
426 }
427
428 static class ByteSequenceMarshaller extends VariableMarshaller<ByteSequence> {
429 final static ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller();
430
431 public ByteSequence readPayload(DataInput dataIn) throws IOException {
432 byte data[] = new byte[dataIn.readInt()];
433 dataIn.readFully(data);
434 return new ByteSequence(data);
435 }
436
437 public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException {
438 dataOut.writeInt(object.getLength());
439 dataOut.write(object.getData(), object.getOffset(), object.getLength());
440 }
441 }
442
443 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
444 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
445
446 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
447 KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
448 rc.mergeFramed((InputStream)dataIn);
449 return rc;
450 }
451
452 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
453 object.writeFramed((OutputStream)dataOut);
454 }
455 }
456
457 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
458 String key = key(destination);
459 StoredDestination rc = storedDestinations.get(key);
460 if (rc == null) {
461 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
462 rc = loadStoredDestination(tx, key, topic);
463 // Cache it. We may want to remove/unload destinations from the
464 // cache that are not used for a while
465 // to reduce memory usage.
466 storedDestinations.put(key, rc);
467 }
468 return rc;
469 }
470
471 /**
472 * @param tx
473 * @param key
474 * @param topic
475 * @return
476 * @throws IOException
477 */
478 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
479 // Try to load the existing indexes..
480 StoredDestination rc = destinations.get(tx, key);
481 if (rc == null) {
482 // Brand new destination.. allocate indexes for it.
483 rc = new StoredDestination();
484 rc.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, tx.allocate());
485 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
486
487 if (topic) {
488 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
489 rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
490 }
491 destinations.put(tx, key, rc);
492 }
493
494 // Configure the marshalers and load.
495 rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
496 rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
497 rc.orderIndex.load(tx);
498
499 // Figure out the next key using the last entry in the destination.
500 Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx);
501 if( lastEntry!=null ) {
502 rc.nextMessageId = lastEntry.getKey()+1;
503 }
504
505 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
506 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
507 rc.messageIdIndex.load(tx);
508
509 // If it was a topic...
510 if (topic) {
511
512 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
513 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
514 rc.subscriptions.load(tx);
515
516 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
517 rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
518 rc.subscriptionAcks.load(tx);
519
520 rc.ackPositions = new TreeMap<Long, HashSet<String>>();
521 rc.subscriptionCursors = new HashMap<String, Long>();
522
523 for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
524 Entry<String, Long> entry = iterator.next();
525 addAckByteSequence(rc, entry.getValue(), entry.getKey());
526 }
527
528 }
529 return rc;
530 }
531
532 /**
533 * @param sd
534 * @param messageSequence
535 * @param subscriptionKey
536 */
537 private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) {
538 HashSet<String> hs = sd.ackPositions.get(messageSequence);
539 if (hs == null) {
540 hs = new HashSet<String>();
541 sd.ackPositions.put(messageSequence, hs);
542 }
543 hs.add(subscriptionKey);
544 }
545
546 /**
547 * @param tx
548 * @param sd
549 * @param subscriptionKey
550 * @param sequenceId
551 * @throws IOException
552 */
553 private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
554 // Remove the sub from the previous location set..
555 if (sequenceId != null) {
556 HashSet<String> hs = sd.ackPositions.get(sequenceId);
557 if (hs != null) {
558 hs.remove(subscriptionKey);
559 if (hs.isEmpty()) {
560 HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
561 sd.ackPositions.remove(sequenceId);
562
563 // Did we just empty out the first set in the
564 // ordered list of ack locations? Then it's time to
565 // delete some messages.
566 if (hs == firstSet) {
567
568 // Find all the entries that need to get deleted.
569 ArrayList<Entry<Long, MessageRecord>> deletes = new ArrayList<Entry<Long, MessageRecord>>();
570 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
571 Entry<Long, MessageRecord> entry = iterator.next();
572 if (entry.getKey().compareTo(sequenceId) <= 0) {
573 // We don't do the actually delete while we are
574 // iterating the BTree since
575 // iterating would fail.
576 deletes.add(entry);
577 }
578 }
579
580 // Do the actual deletes.
581 for (Entry<Long, MessageRecord> entry : deletes) {
582 sd.messageIdIndex.remove(tx,entry.getValue().messageId);
583 sd.orderIndex.remove(tx,entry.getKey());
584 }
585 }
586 }
587 }
588 }
589 }
590
591 private String key(KahaDestination destination) {
592 return destination.getType().getNumber() + ":" + destination.getName();
593 }
594
595 // /////////////////////////////////////////////////////////////////
596 // Transaction related implementation methods.
597 // /////////////////////////////////////////////////////////////////
598 protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
599 protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
600
601 private ArrayList<Operation> getInflightTx(TransactionId key) {
602 ArrayList<Operation> tx = inflightTransactions.get(key);
603 if (tx == null) {
604 tx = new ArrayList<Operation>();
605 inflightTransactions.put(key, tx);
606 }
607 return tx;
608 }
609
610 abstract class Operation {
611 abstract public void execute(Transaction tx) throws IOException;
612 }
613
614 class AddOpperation extends Operation {
615 final KahaAddMessageCommand command;
616 private final ByteSequence data;
617
618 public AddOpperation(KahaAddMessageCommand command, ByteSequence location) {
619 this.command = command;
620 this.data = location;
621 }
622
623 public void execute(Transaction tx) throws IOException {
624 upadateIndex(tx, command, data);
625 }
626
627 public KahaAddMessageCommand getCommand() {
628 return command;
629 }
630 }
631
632 class RemoveOpperation extends Operation {
633 final KahaRemoveMessageCommand command;
634
635 public RemoveOpperation(KahaRemoveMessageCommand command) {
636 this.command = command;
637 }
638
639 public void execute(Transaction tx) throws IOException {
640 updateIndex(tx, command);
641 }
642
643 public KahaRemoveMessageCommand getCommand() {
644 return command;
645 }
646 }
647
648 // /////////////////////////////////////////////////////////////////
649 // Initialization related implementation methods.
650 // /////////////////////////////////////////////////////////////////
651
652 private PageFile createPageFile() {
653 PageFile index = new PageFile(directory, "temp-db");
654 index.setEnableWriteThread(isEnableIndexWriteAsync());
655 index.setWriteBatchSize(getIndexWriteBatchSize());
656 index.setEnableDiskSyncs(false);
657 index.setEnableRecoveryFile(false);
658 return index;
659 }
660
661 public File getDirectory() {
662 return directory;
663 }
664
665 public void setDirectory(File directory) {
666 this.directory = directory;
667 }
668
669 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
670 this.setIndexWriteBatchSize = setIndexWriteBatchSize;
671 }
672
673 public int getIndexWriteBatchSize() {
674 return setIndexWriteBatchSize;
675 }
676
677 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
678 this.enableIndexWriteAsync = enableIndexWriteAsync;
679 }
680
681 boolean isEnableIndexWriteAsync() {
682 return enableIndexWriteAsync;
683 }
684
685 public PageFile getPageFile() {
686 if (pageFile == null) {
687 pageFile = createPageFile();
688 }
689 return pageFile;
690 }
691
692 }