001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.ByteArrayOutputStream;
021 import java.io.DataInput;
022 import java.io.DataOutput;
023 import java.io.EOFException;
024 import java.io.File;
025 import java.io.IOException;
026 import java.io.InputStream;
027 import java.io.InterruptedIOException;
028 import java.io.ObjectInputStream;
029 import java.io.ObjectOutputStream;
030 import java.io.OutputStream;
031 import java.util.ArrayList;
032 import java.util.Collection;
033 import java.util.Collections;
034 import java.util.Date;
035 import java.util.HashMap;
036 import java.util.HashSet;
037 import java.util.Iterator;
038 import java.util.LinkedHashMap;
039 import java.util.LinkedHashSet;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Map.Entry;
043 import java.util.Set;
044 import java.util.SortedSet;
045 import java.util.Stack;
046 import java.util.TreeMap;
047 import java.util.TreeSet;
048 import java.util.concurrent.atomic.AtomicBoolean;
049 import java.util.concurrent.atomic.AtomicLong;
050 import java.util.concurrent.locks.ReentrantReadWriteLock;
051
052 import org.apache.activemq.ActiveMQMessageAuditNoSync;
053 import org.apache.activemq.broker.BrokerService;
054 import org.apache.activemq.broker.BrokerServiceAware;
055 import org.apache.activemq.command.MessageAck;
056 import org.apache.activemq.command.SubscriptionInfo;
057 import org.apache.activemq.command.TransactionId;
058 import org.apache.activemq.protobuf.Buffer;
059 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
060 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
061 import org.apache.activemq.store.kahadb.data.KahaDestination;
062 import org.apache.activemq.store.kahadb.data.KahaEntryType;
063 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
064 import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
065 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
066 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
067 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
068 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
069 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
070 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
071 import org.apache.activemq.util.Callback;
072 import org.apache.activemq.util.IOHelper;
073 import org.apache.activemq.util.ServiceStopper;
074 import org.apache.activemq.util.ServiceSupport;
075 import org.apache.kahadb.index.BTreeIndex;
076 import org.apache.kahadb.index.BTreeVisitor;
077 import org.apache.kahadb.index.ListIndex;
078 import org.apache.kahadb.journal.DataFile;
079 import org.apache.kahadb.journal.Journal;
080 import org.apache.kahadb.journal.Location;
081 import org.apache.kahadb.page.Page;
082 import org.apache.kahadb.page.PageFile;
083 import org.apache.kahadb.page.Transaction;
084 import org.apache.kahadb.util.ByteSequence;
085 import org.apache.kahadb.util.DataByteArrayInputStream;
086 import org.apache.kahadb.util.DataByteArrayOutputStream;
087 import org.apache.kahadb.util.LocationMarshaller;
088 import org.apache.kahadb.util.LockFile;
089 import org.apache.kahadb.util.LongMarshaller;
090 import org.apache.kahadb.util.Marshaller;
091 import org.apache.kahadb.util.Sequence;
092 import org.apache.kahadb.util.SequenceSet;
093 import org.apache.kahadb.util.StringMarshaller;
094 import org.apache.kahadb.util.VariableMarshaller;
095 import org.slf4j.Logger;
096 import org.slf4j.LoggerFactory;
097
098 public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
099
100 protected BrokerService brokerService;
101
102 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
103 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
104 public static final File DEFAULT_DIRECTORY = new File("KahaDB");
105 protected static final Buffer UNMATCHED;
106 static {
107 UNMATCHED = new Buffer(new byte[]{});
108 }
109 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
110 private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
111
112 static final int CLOSED_STATE = 1;
113 static final int OPEN_STATE = 2;
114 static final long NOT_ACKED = -1;
115
116 static final int VERSION = 4;
117
118 protected class Metadata {
119 protected Page<Metadata> page;
120 protected int state;
121 protected BTreeIndex<String, StoredDestination> destinations;
122 protected Location lastUpdate;
123 protected Location firstInProgressTransactionLocation;
124 protected Location producerSequenceIdTrackerLocation = null;
125 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
126 protected int version = VERSION;
127 public void read(DataInput is) throws IOException {
128 state = is.readInt();
129 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
130 if (is.readBoolean()) {
131 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
132 } else {
133 lastUpdate = null;
134 }
135 if (is.readBoolean()) {
136 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
137 } else {
138 firstInProgressTransactionLocation = null;
139 }
140 try {
141 if (is.readBoolean()) {
142 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
143 } else {
144 producerSequenceIdTrackerLocation = null;
145 }
146 } catch (EOFException expectedOnUpgrade) {
147 }
148 try {
149 version = is.readInt();
150 } catch (EOFException expectedOnUpgrade) {
151 version=1;
152 }
153 LOG.info("KahaDB is version " + version);
154 }
155
156 public void write(DataOutput os) throws IOException {
157 os.writeInt(state);
158 os.writeLong(destinations.getPageId());
159
160 if (lastUpdate != null) {
161 os.writeBoolean(true);
162 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
163 } else {
164 os.writeBoolean(false);
165 }
166
167 if (firstInProgressTransactionLocation != null) {
168 os.writeBoolean(true);
169 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
170 } else {
171 os.writeBoolean(false);
172 }
173
174 if (producerSequenceIdTrackerLocation != null) {
175 os.writeBoolean(true);
176 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
177 } else {
178 os.writeBoolean(false);
179 }
180 os.writeInt(VERSION);
181 }
182 }
183
184 class MetadataMarshaller extends VariableMarshaller<Metadata> {
185 public Metadata readPayload(DataInput dataIn) throws IOException {
186 Metadata rc = new Metadata();
187 rc.read(dataIn);
188 return rc;
189 }
190
191 public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
192 object.write(dataOut);
193 }
194 }
195
196 protected PageFile pageFile;
197 protected Journal journal;
198 protected Metadata metadata = new Metadata();
199
200 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
201
202 protected boolean failIfDatabaseIsLocked;
203
204 protected boolean deleteAllMessages;
205 protected File directory = DEFAULT_DIRECTORY;
206 protected Thread checkpointThread;
207 protected boolean enableJournalDiskSyncs=true;
208 protected boolean archiveDataLogs;
209 protected File directoryArchive;
210 protected AtomicLong storeSize = new AtomicLong(0);
211 long checkpointInterval = 5*1000;
212 long cleanupInterval = 30*1000;
213 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
214 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
215 boolean enableIndexWriteAsync = false;
216 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
217
218 protected AtomicBoolean opened = new AtomicBoolean();
219 private LockFile lockFile;
220 private boolean ignoreMissingJournalfiles = false;
221 private int indexCacheSize = 10000;
222 private boolean checkForCorruptJournalFiles = false;
223 private boolean checksumJournalFiles = false;
224 private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
225 protected boolean forceRecoverIndex = false;
226 private final Object checkpointThreadLock = new Object();
227 private boolean rewriteOnRedelivery = false;
228 private boolean archiveCorruptedIndex = false;
229 private boolean useIndexLFRUEviction = false;
230 private float indexLFUEvictionFactor = 0.2f;
231 private boolean enableIndexDiskSyncs = true;
232 private boolean enableIndexRecoveryFile = true;
233 private boolean enableIndexPageCaching = true;
234
235 public MessageDatabase() {
236 }
237
238 @Override
239 public void doStart() throws Exception {
240 load();
241 }
242
243 @Override
244 public void doStop(ServiceStopper stopper) throws Exception {
245 unload();
246 }
247
248 private void loadPageFile() throws IOException {
249 this.indexLock.writeLock().lock();
250 try {
251 final PageFile pageFile = getPageFile();
252 pageFile.load();
253 pageFile.tx().execute(new Transaction.Closure<IOException>() {
254 public void execute(Transaction tx) throws IOException {
255 if (pageFile.getPageCount() == 0) {
256 // First time this is created.. Initialize the metadata
257 Page<Metadata> page = tx.allocate();
258 assert page.getPageId() == 0;
259 page.set(metadata);
260 metadata.page = page;
261 metadata.state = CLOSED_STATE;
262 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
263
264 tx.store(metadata.page, metadataMarshaller, true);
265 } else {
266 Page<Metadata> page = tx.load(0, metadataMarshaller);
267 metadata = page.get();
268 metadata.page = page;
269 }
270 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
271 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
272 metadata.destinations.load(tx);
273 }
274 });
275 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
276 // Perhaps we should just keep an index of file
277 storedDestinations.clear();
278 pageFile.tx().execute(new Transaction.Closure<IOException>() {
279 public void execute(Transaction tx) throws IOException {
280 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
281 Entry<String, StoredDestination> entry = iterator.next();
282 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
283 storedDestinations.put(entry.getKey(), sd);
284 }
285 }
286 });
287 pageFile.flush();
288 } finally {
289 this.indexLock.writeLock().unlock();
290 }
291 }
292
293 private void startCheckpoint() {
294 if (checkpointInterval == 0 && cleanupInterval == 0) {
295 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
296 return;
297 }
298 synchronized (checkpointThreadLock) {
299 boolean start = false;
300 if (checkpointThread == null) {
301 start = true;
302 } else if (!checkpointThread.isAlive()) {
303 start = true;
304 LOG.info("KahaDB: Recovering checkpoint thread after death");
305 }
306 if (start) {
307 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
308 @Override
309 public void run() {
310 try {
311 long lastCleanup = System.currentTimeMillis();
312 long lastCheckpoint = System.currentTimeMillis();
313 // Sleep for a short time so we can periodically check
314 // to see if we need to exit this thread.
315 long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
316 while (opened.get()) {
317 Thread.sleep(sleepTime);
318 long now = System.currentTimeMillis();
319 if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
320 checkpointCleanup(true);
321 lastCleanup = now;
322 lastCheckpoint = now;
323 } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
324 checkpointCleanup(false);
325 lastCheckpoint = now;
326 }
327 }
328 } catch (InterruptedException e) {
329 // Looks like someone really wants us to exit this thread...
330 } catch (IOException ioe) {
331 LOG.error("Checkpoint failed", ioe);
332 brokerService.handleIOException(ioe);
333 }
334 }
335 };
336
337 checkpointThread.setDaemon(true);
338 checkpointThread.start();
339 }
340 }
341 }
342
343 public void open() throws IOException {
344 if( opened.compareAndSet(false, true) ) {
345 getJournal().start();
346 try {
347 loadPageFile();
348 } catch (Throwable t) {
349 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
350 if (LOG.isDebugEnabled()) {
351 LOG.debug("Index load failure", t);
352 }
353 // try to recover index
354 try {
355 pageFile.unload();
356 } catch (Exception ignore) {}
357 if (archiveCorruptedIndex) {
358 pageFile.archive();
359 } else {
360 pageFile.delete();
361 }
362 metadata = new Metadata();
363 pageFile = null;
364 loadPageFile();
365 }
366 startCheckpoint();
367 recover();
368 }
369 }
370
371 private void lock() throws IOException {
372
373 if (lockFile == null) {
374 File lockFileName = new File(directory, "lock");
375 lockFile = new LockFile(lockFileName, true);
376 if (failIfDatabaseIsLocked) {
377 lockFile.lock();
378 } else {
379 boolean locked = false;
380 while ((!isStopped()) && (!isStopping())) {
381 try {
382 lockFile.lock();
383 locked = true;
384 break;
385 } catch (IOException e) {
386 LOG.info("Database "
387 + lockFileName
388 + " is locked... waiting "
389 + (getDatabaseLockedWaitDelay() / 1000)
390 + " seconds for the database to be unlocked. Reason: "
391 + e);
392 try {
393 Thread.sleep(getDatabaseLockedWaitDelay());
394 } catch (InterruptedException e1) {
395 }
396 }
397 }
398 if (!locked) {
399 throw new IOException("attempt to obtain lock aborted due to shutdown");
400 }
401 }
402 }
403 }
404
405 // for testing
406 public LockFile getLockFile() {
407 return lockFile;
408 }
409
410 public void load() throws IOException {
411 this.indexLock.writeLock().lock();
412 try {
413 lock();
414 if (deleteAllMessages) {
415 getJournal().start();
416 getJournal().delete();
417 getJournal().close();
418 journal = null;
419 getPageFile().delete();
420 LOG.info("Persistence store purged.");
421 deleteAllMessages = false;
422 }
423
424 open();
425 store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
426 } finally {
427 this.indexLock.writeLock().unlock();
428 }
429 }
430
431 public void close() throws IOException, InterruptedException {
432 if( opened.compareAndSet(true, false)) {
433 try {
434 this.indexLock.writeLock().lock();
435 try {
436 if (metadata.page != null) {
437 pageFile.tx().execute(new Transaction.Closure<IOException>() {
438 public void execute(Transaction tx) throws IOException {
439 checkpointUpdate(tx, true);
440 }
441 });
442 }
443 pageFile.unload();
444 metadata = new Metadata();
445 } finally {
446 this.indexLock.writeLock().unlock();
447 }
448 journal.close();
449 synchronized (checkpointThreadLock) {
450 if (checkpointThread != null) {
451 checkpointThread.join();
452 }
453 }
454 } finally {
455 lockFile.unlock();
456 lockFile=null;
457 }
458 }
459 }
460
461 public void unload() throws IOException, InterruptedException {
462 this.indexLock.writeLock().lock();
463 try {
464 if( pageFile != null && pageFile.isLoaded() ) {
465 metadata.state = CLOSED_STATE;
466 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
467
468 if (metadata.page != null) {
469 pageFile.tx().execute(new Transaction.Closure<IOException>() {
470 public void execute(Transaction tx) throws IOException {
471 tx.store(metadata.page, metadataMarshaller, true);
472 }
473 });
474 }
475 }
476 } finally {
477 this.indexLock.writeLock().unlock();
478 }
479 close();
480 }
481
482 // public for testing
483 @SuppressWarnings("rawtypes")
484 public Location getFirstInProgressTxLocation() {
485 Location l = null;
486 synchronized (inflightTransactions) {
487 if (!inflightTransactions.isEmpty()) {
488 for (List<Operation> ops : inflightTransactions.values()) {
489 if (!ops.isEmpty()) {
490 l = ops.get(0).getLocation();
491 break;
492 }
493 }
494 }
495 if (!preparedTransactions.isEmpty()) {
496 for (List<Operation> ops : preparedTransactions.values()) {
497 if (!ops.isEmpty()) {
498 Location t = ops.get(0).getLocation();
499 if (l==null || t.compareTo(l) <= 0) {
500 l = t;
501 }
502 break;
503 }
504 }
505 }
506 }
507 return l;
508 }
509
510 /**
511 * Move all the messages that were in the journal into long term storage. We
512 * just replay and do a checkpoint.
513 *
514 * @throws IOException
515 * @throws IOException
516 * @throws IllegalStateException
517 */
518 private void recover() throws IllegalStateException, IOException {
519 this.indexLock.writeLock().lock();
520 try {
521
522 long start = System.currentTimeMillis();
523 Location producerAuditPosition = recoverProducerAudit();
524 Location lastIndoubtPosition = getRecoveryPosition();
525
526 Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
527
528 if (recoveryPosition != null) {
529 int redoCounter = 0;
530 LOG.info("Recovering from the journal ...");
531 while (recoveryPosition != null) {
532 JournalCommand<?> message = load(recoveryPosition);
533 metadata.lastUpdate = recoveryPosition;
534 process(message, recoveryPosition, lastIndoubtPosition);
535 redoCounter++;
536 recoveryPosition = journal.getNextLocation(recoveryPosition);
537 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
538 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
539 }
540 }
541 if (LOG.isInfoEnabled()) {
542 long end = System.currentTimeMillis();
543 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
544 }
545 }
546
547 // We may have to undo some index updates.
548 pageFile.tx().execute(new Transaction.Closure<IOException>() {
549 public void execute(Transaction tx) throws IOException {
550 recoverIndex(tx);
551 }
552 });
553
554 // rollback any recovered inflight local transactions
555 Set<TransactionId> toRollback = new HashSet<TransactionId>();
556 synchronized (inflightTransactions) {
557 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
558 TransactionId id = it.next();
559 if (id.isLocalTransaction()) {
560 toRollback.add(id);
561 }
562 }
563 for (TransactionId tx: toRollback) {
564 if (LOG.isDebugEnabled()) {
565 LOG.debug("rolling back recovered indoubt local transaction " + tx);
566 }
567 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
568 }
569 }
570 } finally {
571 this.indexLock.writeLock().unlock();
572 }
573 }
574
575 @SuppressWarnings("unused")
576 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
577 return TransactionIdConversion.convertToLocal(tx);
578 }
579
580 private Location minimum(Location producerAuditPosition,
581 Location lastIndoubtPosition) {
582 Location min = null;
583 if (producerAuditPosition != null) {
584 min = producerAuditPosition;
585 if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
586 min = lastIndoubtPosition;
587 }
588 } else {
589 min = lastIndoubtPosition;
590 }
591 return min;
592 }
593
594 private Location recoverProducerAudit() throws IOException {
595 if (metadata.producerSequenceIdTrackerLocation != null) {
596 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
597 try {
598 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
599 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
600 return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
601 } catch (Exception e) {
602 LOG.warn("Cannot recover message audit", e);
603 return journal.getNextLocation(null);
604 }
605 } else {
606 // got no audit stored so got to recreate via replay from start of the journal
607 return journal.getNextLocation(null);
608 }
609 }
610
611 protected void recoverIndex(Transaction tx) throws IOException {
612 long start = System.currentTimeMillis();
613 // It is possible index updates got applied before the journal updates..
614 // in that case we need to removed references to messages that are not in the journal
615 final Location lastAppendLocation = journal.getLastAppendLocation();
616 long undoCounter=0;
617
618 // Go through all the destinations to see if they have messages past the lastAppendLocation
619 for (StoredDestination sd : storedDestinations.values()) {
620
621 final ArrayList<Long> matches = new ArrayList<Long>();
622 // Find all the Locations that are >= than the last Append Location.
623 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
624 @Override
625 protected void matched(Location key, Long value) {
626 matches.add(value);
627 }
628 });
629
630 for (Long sequenceId : matches) {
631 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
632 sd.locationIndex.remove(tx, keys.location);
633 sd.messageIdIndex.remove(tx, keys.messageId);
634 metadata.producerSequenceIdTracker.rollback(keys.messageId);
635 undoCounter++;
636 // TODO: do we need to modify the ack positions for the pub sub case?
637 }
638 }
639
640 if( undoCounter > 0 ) {
641 // The rolledback operations are basically in flight journal writes. To avoid getting
642 // these the end user should do sync writes to the journal.
643 if (LOG.isInfoEnabled()) {
644 long end = System.currentTimeMillis();
645 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
646 }
647 }
648
649 undoCounter = 0;
650 start = System.currentTimeMillis();
651
652 // Lets be extra paranoid here and verify that all the datafiles being referenced
653 // by the indexes still exists.
654
655 final SequenceSet ss = new SequenceSet();
656 for (StoredDestination sd : storedDestinations.values()) {
657 // Use a visitor to cut down the number of pages that we load
658 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
659 int last=-1;
660
661 public boolean isInterestedInKeysBetween(Location first, Location second) {
662 if( first==null ) {
663 return !ss.contains(0, second.getDataFileId());
664 } else if( second==null ) {
665 return true;
666 } else {
667 return !ss.contains(first.getDataFileId(), second.getDataFileId());
668 }
669 }
670
671 public void visit(List<Location> keys, List<Long> values) {
672 for (Location l : keys) {
673 int fileId = l.getDataFileId();
674 if( last != fileId ) {
675 ss.add(fileId);
676 last = fileId;
677 }
678 }
679 }
680
681 });
682 }
683 HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
684 while (!ss.isEmpty()) {
685 missingJournalFiles.add((int) ss.removeFirst());
686 }
687 missingJournalFiles.removeAll(journal.getFileMap().keySet());
688
689 if (!missingJournalFiles.isEmpty()) {
690 if (LOG.isInfoEnabled()) {
691 LOG.info("Some journal files are missing: " + missingJournalFiles);
692 }
693 }
694
695 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
696 for (Integer missing : missingJournalFiles) {
697 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
698 }
699
700 if (checkForCorruptJournalFiles) {
701 Collection<DataFile> dataFiles = journal.getFileMap().values();
702 for (DataFile dataFile : dataFiles) {
703 int id = dataFile.getDataFileId();
704 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
705 Sequence seq = dataFile.getCorruptedBlocks().getHead();
706 while (seq != null) {
707 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
708 seq = seq.getNext();
709 }
710 }
711 }
712
713 if (!missingPredicates.isEmpty()) {
714 for (StoredDestination sd : storedDestinations.values()) {
715
716 final ArrayList<Long> matches = new ArrayList<Long>();
717 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
718 @Override
719 protected void matched(Location key, Long value) {
720 matches.add(value);
721 }
722 });
723
724 // If somes message references are affected by the missing data files...
725 if (!matches.isEmpty()) {
726
727 // We either 'gracefully' recover dropping the missing messages or
728 // we error out.
729 if( ignoreMissingJournalfiles ) {
730 // Update the index to remove the references to the missing data
731 for (Long sequenceId : matches) {
732 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
733 sd.locationIndex.remove(tx, keys.location);
734 sd.messageIdIndex.remove(tx, keys.messageId);
735 undoCounter++;
736 // TODO: do we need to modify the ack positions for the pub sub case?
737 }
738
739 } else {
740 throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
741 }
742 }
743 }
744 }
745
746 if( undoCounter > 0 ) {
747 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
748 // should do sync writes to the journal.
749 if (LOG.isInfoEnabled()) {
750 long end = System.currentTimeMillis();
751 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
752 }
753 }
754 }
755
756 private Location nextRecoveryPosition;
757 private Location lastRecoveryPosition;
758
759 public void incrementalRecover() throws IOException {
760 this.indexLock.writeLock().lock();
761 try {
762 if( nextRecoveryPosition == null ) {
763 if( lastRecoveryPosition==null ) {
764 nextRecoveryPosition = getRecoveryPosition();
765 } else {
766 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
767 }
768 }
769 while (nextRecoveryPosition != null) {
770 lastRecoveryPosition = nextRecoveryPosition;
771 metadata.lastUpdate = lastRecoveryPosition;
772 JournalCommand<?> message = load(lastRecoveryPosition);
773 process(message, lastRecoveryPosition, (Runnable)null);
774 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
775 }
776 } finally {
777 this.indexLock.writeLock().unlock();
778 }
779 }
780
781 public Location getLastUpdatePosition() throws IOException {
782 return metadata.lastUpdate;
783 }
784
785 private Location getRecoveryPosition() throws IOException {
786
787 if (!this.forceRecoverIndex) {
788
789 // If we need to recover the transactions..
790 if (metadata.firstInProgressTransactionLocation != null) {
791 return metadata.firstInProgressTransactionLocation;
792 }
793
794 // Perhaps there were no transactions...
795 if( metadata.lastUpdate!=null) {
796 // Start replay at the record after the last one recorded in the index file.
797 return journal.getNextLocation(metadata.lastUpdate);
798 }
799 }
800 // This loads the first position.
801 return journal.getNextLocation(null);
802 }
803
804 protected void checkpointCleanup(final boolean cleanup) throws IOException {
805 long start;
806 this.indexLock.writeLock().lock();
807 try {
808 start = System.currentTimeMillis();
809 if( !opened.get() ) {
810 return;
811 }
812 pageFile.tx().execute(new Transaction.Closure<IOException>() {
813 public void execute(Transaction tx) throws IOException {
814 checkpointUpdate(tx, cleanup);
815 }
816 });
817 } finally {
818 this.indexLock.writeLock().unlock();
819 }
820
821 long end = System.currentTimeMillis();
822 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
823 if (LOG.isInfoEnabled()) {
824 LOG.info("Slow KahaDB access: cleanup took " + (end - start));
825 }
826 }
827 }
828
829 public void checkpoint(Callback closure) throws Exception {
830 this.indexLock.writeLock().lock();
831 try {
832 pageFile.tx().execute(new Transaction.Closure<IOException>() {
833 public void execute(Transaction tx) throws IOException {
834 checkpointUpdate(tx, false);
835 }
836 });
837 closure.execute();
838 } finally {
839 this.indexLock.writeLock().unlock();
840 }
841 }
842
843 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
844 int size = data.serializedSizeFramed();
845 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
846 os.writeByte(data.type().getNumber());
847 data.writeFramed(os);
848 return os.toByteSequence();
849 }
850
851 // /////////////////////////////////////////////////////////////////
852 // Methods call by the broker to update and query the store.
853 // /////////////////////////////////////////////////////////////////
854 public Location store(JournalCommand<?> data) throws IOException {
855 return store(data, false, null,null);
856 }
857
858 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
859 return store(data, false, null,null, onJournalStoreComplete);
860 }
861
862 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
863 return store(data, sync, before, after, null);
864 }
865
866 /**
867 * All updated are are funneled through this method. The updates are converted
868 * to a JournalMessage which is logged to the journal and then the data from
869 * the JournalMessage is used to update the index just like it would be done
870 * during a recovery process.
871 */
872 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
873 if (before != null) {
874 before.run();
875 }
876 try {
877 ByteSequence sequence = toByteSequence(data);
878 long start = System.currentTimeMillis();
879 Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
880 long start2 = System.currentTimeMillis();
881 process(data, location, after);
882 long end = System.currentTimeMillis();
883 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
884 if (LOG.isInfoEnabled()) {
885 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
886 }
887 }
888
889 if (after != null) {
890 Runnable afterCompletion = null;
891 synchronized (orderedTransactionAfters) {
892 if (!orderedTransactionAfters.empty()) {
893 afterCompletion = orderedTransactionAfters.pop();
894 }
895 }
896 if (afterCompletion != null) {
897 afterCompletion.run();
898 } else {
899 // non persistent message case
900 after.run();
901 }
902 }
903
904 if (checkpointThread != null && !checkpointThread.isAlive()) {
905 startCheckpoint();
906 }
907 return location;
908 } catch (IOException ioe) {
909 LOG.error("KahaDB failed to store to Journal", ioe);
910 brokerService.handleIOException(ioe);
911 throw ioe;
912 }
913 }
914
915 /**
916 * Loads a previously stored JournalMessage
917 *
918 * @param location
919 * @return
920 * @throws IOException
921 */
922 public JournalCommand<?> load(Location location) throws IOException {
923 long start = System.currentTimeMillis();
924 ByteSequence data = journal.read(location);
925 long end = System.currentTimeMillis();
926 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
927 if (LOG.isInfoEnabled()) {
928 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
929 }
930 }
931 DataByteArrayInputStream is = new DataByteArrayInputStream(data);
932 byte readByte = is.readByte();
933 KahaEntryType type = KahaEntryType.valueOf(readByte);
934 if( type == null ) {
935 throw new IOException("Could not load journal record. Invalid location: "+location);
936 }
937 JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
938 message.mergeFramed(is);
939 return message;
940 }
941
942 /**
943 * do minimal recovery till we reach the last inDoubtLocation
944 * @param data
945 * @param location
946 * @param inDoubtlocation
947 * @throws IOException
948 */
949 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
950 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
951 process(data, location, (Runnable) null);
952 } else {
953 // just recover producer audit
954 data.visit(new Visitor() {
955 public void visit(KahaAddMessageCommand command) throws IOException {
956 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
957 }
958 });
959 }
960 }
961
962 // /////////////////////////////////////////////////////////////////
963 // Journaled record processing methods. Once the record is journaled,
964 // these methods handle applying the index updates. These may be called
965 // from the recovery method too so they need to be idempotent
966 // /////////////////////////////////////////////////////////////////
967
968 void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException {
969 data.visit(new Visitor() {
970 @Override
971 public void visit(KahaAddMessageCommand command) throws IOException {
972 process(command, location);
973 }
974
975 @Override
976 public void visit(KahaRemoveMessageCommand command) throws IOException {
977 process(command, location);
978 }
979
980 @Override
981 public void visit(KahaPrepareCommand command) throws IOException {
982 process(command, location);
983 }
984
985 @Override
986 public void visit(KahaCommitCommand command) throws IOException {
987 process(command, location, after);
988 }
989
990 @Override
991 public void visit(KahaRollbackCommand command) throws IOException {
992 process(command, location);
993 }
994
995 @Override
996 public void visit(KahaRemoveDestinationCommand command) throws IOException {
997 process(command, location);
998 }
999
1000 @Override
1001 public void visit(KahaSubscriptionCommand command) throws IOException {
1002 process(command, location);
1003 }
1004
1005 @Override
1006 public void visit(KahaProducerAuditCommand command) throws IOException {
1007 processLocation(location);
1008 }
1009
1010 @Override
1011 public void visit(KahaTraceCommand command) {
1012 processLocation(location);
1013 }
1014 });
1015 }
1016
1017 @SuppressWarnings("rawtypes")
1018 protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
1019 if (command.hasTransactionInfo()) {
1020 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1021 inflightTx.add(new AddOpperation(command, location));
1022 } else {
1023 this.indexLock.writeLock().lock();
1024 try {
1025 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1026 public void execute(Transaction tx) throws IOException {
1027 upadateIndex(tx, command, location);
1028 }
1029 });
1030 } finally {
1031 this.indexLock.writeLock().unlock();
1032 }
1033 }
1034 }
1035
1036 @SuppressWarnings("rawtypes")
1037 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1038 if (command.hasTransactionInfo()) {
1039 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
1040 inflightTx.add(new RemoveOpperation(command, location));
1041 } else {
1042 this.indexLock.writeLock().lock();
1043 try {
1044 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1045 public void execute(Transaction tx) throws IOException {
1046 updateIndex(tx, command, location);
1047 }
1048 });
1049 } finally {
1050 this.indexLock.writeLock().unlock();
1051 }
1052 }
1053 }
1054
1055 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1056 this.indexLock.writeLock().lock();
1057 try {
1058 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1059 public void execute(Transaction tx) throws IOException {
1060 updateIndex(tx, command, location);
1061 }
1062 });
1063 } finally {
1064 this.indexLock.writeLock().unlock();
1065 }
1066 }
1067
1068 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1069 this.indexLock.writeLock().lock();
1070 try {
1071 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1072 public void execute(Transaction tx) throws IOException {
1073 updateIndex(tx, command, location);
1074 }
1075 });
1076 } finally {
1077 this.indexLock.writeLock().unlock();
1078 }
1079 }
1080
1081 protected void processLocation(final Location location) {
1082 this.indexLock.writeLock().lock();
1083 try {
1084 metadata.lastUpdate = location;
1085 } finally {
1086 this.indexLock.writeLock().unlock();
1087 }
1088 }
1089
1090 private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
1091 private void push(Runnable after) {
1092 if (after != null) {
1093 synchronized (orderedTransactionAfters) {
1094 orderedTransactionAfters.push(after);
1095 }
1096 }
1097 }
1098
1099 @SuppressWarnings("rawtypes")
1100 protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
1101 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1102 List<Operation> inflightTx;
1103 synchronized (inflightTransactions) {
1104 inflightTx = inflightTransactions.remove(key);
1105 if (inflightTx == null) {
1106 inflightTx = preparedTransactions.remove(key);
1107 }
1108 }
1109 if (inflightTx == null) {
1110 if (after != null) {
1111 // since we don't push this after and we may find another, lets run it now
1112 after.run();
1113 }
1114 return;
1115 }
1116
1117 final List<Operation> messagingTx = inflightTx;
1118 this.indexLock.writeLock().lock();
1119 try {
1120 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1121 public void execute(Transaction tx) throws IOException {
1122 for (Operation op : messagingTx) {
1123 op.execute(tx);
1124 }
1125 }
1126 });
1127 metadata.lastUpdate = location;
1128 push(after);
1129 } finally {
1130 this.indexLock.writeLock().unlock();
1131 }
1132 }
1133
1134 @SuppressWarnings("rawtypes")
1135 protected void process(KahaPrepareCommand command, Location location) {
1136 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1137 synchronized (inflightTransactions) {
1138 List<Operation> tx = inflightTransactions.remove(key);
1139 if (tx != null) {
1140 preparedTransactions.put(key, tx);
1141 }
1142 }
1143 }
1144
1145 @SuppressWarnings("rawtypes")
1146 protected void process(KahaRollbackCommand command, Location location) throws IOException {
1147 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1148 List<Operation> updates = null;
1149 synchronized (inflightTransactions) {
1150 updates = inflightTransactions.remove(key);
1151 if (updates == null) {
1152 updates = preparedTransactions.remove(key);
1153 }
1154 }
1155 if (isRewriteOnRedelivery()) {
1156 persistRedeliveryCount(updates);
1157 }
1158 }
1159
1160 @SuppressWarnings("rawtypes")
1161 private void persistRedeliveryCount(List<Operation> updates) throws IOException {
1162 if (updates != null) {
1163 for (Operation operation : updates) {
1164 operation.getCommand().visit(new Visitor() {
1165 @Override
1166 public void visit(KahaRemoveMessageCommand command) throws IOException {
1167 incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
1168 }
1169 });
1170 }
1171 }
1172 }
1173
1174 abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
1175
1176 // /////////////////////////////////////////////////////////////////
1177 // These methods do the actual index updates.
1178 // /////////////////////////////////////////////////////////////////
1179
1180 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1181 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1182
1183 void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1184 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1185
1186 // Skip adding the message to the index if this is a topic and there are
1187 // no subscriptions.
1188 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1189 return;
1190 }
1191
1192 // Add the message.
1193 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1194 long id = sd.orderIndex.getNextMessageId(priority);
1195 Long previous = sd.locationIndex.put(tx, location, id);
1196 if (previous == null) {
1197 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1198 if (previous == null) {
1199 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1200 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1201 addAckLocationForNewMessage(tx, sd, id);
1202 }
1203 } else {
1204 // If the message ID as indexed, then the broker asked us to
1205 // store a DUP
1206 // message. Bad BOY! Don't do it, and log a warning.
1207 LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1208 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1209 sd.locationIndex.remove(tx, location);
1210 rollbackStatsOnDuplicate(command.getDestination());
1211 }
1212 } else {
1213 // restore the previous value.. Looks like this was a redo of a
1214 // previously
1215 // added message. We don't want to assign it a new id as the other
1216 // indexes would
1217 // be wrong..
1218 //
1219 sd.locationIndex.put(tx, location, previous);
1220 }
1221 // record this id in any event, initial send or recovery
1222 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1223 metadata.lastUpdate = location;
1224 }
1225
1226 abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
1227
1228 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1229 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1230 if (!command.hasSubscriptionKey()) {
1231
1232 // In the queue case we just remove the message from the index..
1233 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1234 if (sequenceId != null) {
1235 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1236 if (keys != null) {
1237 sd.locationIndex.remove(tx, keys.location);
1238 recordAckMessageReferenceLocation(ackLocation, keys.location);
1239 } else if (LOG.isDebugEnabled()) {
1240 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId());
1241 }
1242 } else if (LOG.isDebugEnabled()) {
1243 LOG.debug("message not found in sequence id index: " + command.getMessageId());
1244 }
1245 } else {
1246 // In the topic case we need remove the message once it's been acked
1247 // by all the subs
1248 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1249
1250 // Make sure it's a valid message id...
1251 if (sequence != null) {
1252 String subscriptionKey = command.getSubscriptionKey();
1253 if (command.getAck() != UNMATCHED) {
1254 sd.orderIndex.get(tx, sequence);
1255 byte priority = sd.orderIndex.lastGetPriority();
1256 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1257 }
1258 // The following method handles deleting un-referenced messages.
1259 removeAckLocation(tx, sd, subscriptionKey, sequence);
1260 } else if (LOG.isDebugEnabled()) {
1261 LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1262 }
1263
1264 }
1265 metadata.lastUpdate = ackLocation;
1266 }
1267
1268 Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1269 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1270 Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1271 if (referenceFileIds == null) {
1272 referenceFileIds = new HashSet<Integer>();
1273 referenceFileIds.add(messageLocation.getDataFileId());
1274 ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1275 } else {
1276 Integer id = Integer.valueOf(messageLocation.getDataFileId());
1277 if (!referenceFileIds.contains(id)) {
1278 referenceFileIds.add(id);
1279 }
1280 }
1281 }
1282
1283 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1284 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1285 sd.orderIndex.remove(tx);
1286
1287 sd.locationIndex.clear(tx);
1288 sd.locationIndex.unload(tx);
1289 tx.free(sd.locationIndex.getPageId());
1290
1291 sd.messageIdIndex.clear(tx);
1292 sd.messageIdIndex.unload(tx);
1293 tx.free(sd.messageIdIndex.getPageId());
1294
1295 if (sd.subscriptions != null) {
1296 sd.subscriptions.clear(tx);
1297 sd.subscriptions.unload(tx);
1298 tx.free(sd.subscriptions.getPageId());
1299
1300 sd.subscriptionAcks.clear(tx);
1301 sd.subscriptionAcks.unload(tx);
1302 tx.free(sd.subscriptionAcks.getPageId());
1303
1304 sd.ackPositions.clear(tx);
1305 sd.ackPositions.unload(tx);
1306 tx.free(sd.ackPositions.getHeadPageId());
1307 }
1308
1309 String key = key(command.getDestination());
1310 storedDestinations.remove(key);
1311 metadata.destinations.remove(tx, key);
1312 }
1313
1314 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1315 StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1316 final String subscriptionKey = command.getSubscriptionKey();
1317
1318 // If set then we are creating it.. otherwise we are destroying the sub
1319 if (command.hasSubscriptionInfo()) {
1320 sd.subscriptions.put(tx, subscriptionKey, command);
1321 long ackLocation=NOT_ACKED;
1322 if (!command.getRetroactive()) {
1323 ackLocation = sd.orderIndex.nextMessageId-1;
1324 } else {
1325 addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
1326 }
1327 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1328 sd.subscriptionCache.add(subscriptionKey);
1329 } else {
1330 // delete the sub...
1331 sd.subscriptions.remove(tx, subscriptionKey);
1332 sd.subscriptionAcks.remove(tx, subscriptionKey);
1333 sd.subscriptionCache.remove(subscriptionKey);
1334 removeAckLocationsForSub(tx, sd, subscriptionKey);
1335
1336 if (sd.subscriptions.isEmpty(tx)) {
1337 sd.messageIdIndex.clear(tx);
1338 sd.locationIndex.clear(tx);
1339 sd.orderIndex.clear(tx);
1340 }
1341 }
1342 }
1343
1344 /**
1345 * @param tx
1346 * @throws IOException
1347 */
1348 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1349 LOG.debug("Checkpoint started.");
1350
1351 // reflect last update exclusive of current checkpoint
1352 Location firstTxLocation = metadata.lastUpdate;
1353
1354 metadata.state = OPEN_STATE;
1355 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1356 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
1357 tx.store(metadata.page, metadataMarshaller, true);
1358 pageFile.flush();
1359
1360 if( cleanup ) {
1361
1362 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1363 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1364
1365 if (LOG.isTraceEnabled()) {
1366 LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
1367 }
1368
1369 // Don't GC files under replication
1370 if( journalFilesBeingReplicated!=null ) {
1371 gcCandidateSet.removeAll(journalFilesBeingReplicated);
1372 }
1373
1374 if (metadata.producerSequenceIdTrackerLocation != null) {
1375 gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId());
1376 }
1377
1378 // Don't GC files after the first in progress tx
1379 if( metadata.firstInProgressTransactionLocation!=null ) {
1380 if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
1381 firstTxLocation = metadata.firstInProgressTransactionLocation;
1382 }
1383 }
1384
1385 if( firstTxLocation!=null ) {
1386 while( !gcCandidateSet.isEmpty() ) {
1387 Integer last = gcCandidateSet.last();
1388 if( last >= firstTxLocation.getDataFileId() ) {
1389 gcCandidateSet.remove(last);
1390 } else {
1391 break;
1392 }
1393 }
1394 if (LOG.isTraceEnabled()) {
1395 LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
1396 }
1397 }
1398
1399 // Go through all the destinations to see if any of them can remove GC candidates.
1400 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1401 if( gcCandidateSet.isEmpty() ) {
1402 break;
1403 }
1404
1405 // Use a visitor to cut down the number of pages that we load
1406 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1407 int last=-1;
1408 public boolean isInterestedInKeysBetween(Location first, Location second) {
1409 if( first==null ) {
1410 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1411 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1412 subset.remove(second.getDataFileId());
1413 }
1414 return !subset.isEmpty();
1415 } else if( second==null ) {
1416 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1417 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1418 subset.remove(first.getDataFileId());
1419 }
1420 return !subset.isEmpty();
1421 } else {
1422 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1423 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1424 subset.remove(first.getDataFileId());
1425 }
1426 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1427 subset.remove(second.getDataFileId());
1428 }
1429 return !subset.isEmpty();
1430 }
1431 }
1432
1433 public void visit(List<Location> keys, List<Long> values) {
1434 for (Location l : keys) {
1435 int fileId = l.getDataFileId();
1436 if( last != fileId ) {
1437 gcCandidateSet.remove(fileId);
1438 last = fileId;
1439 }
1440 }
1441 }
1442 });
1443 if (LOG.isTraceEnabled()) {
1444 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1445 }
1446 }
1447
1448 // check we are not deleting file with ack for in-use journal files
1449 if (LOG.isTraceEnabled()) {
1450 LOG.trace("gc candidates: " + gcCandidateSet);
1451 }
1452 final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1453 Iterator<Integer> candidates = gcCandidateSet.iterator();
1454 while (candidates.hasNext()) {
1455 Integer candidate = candidates.next();
1456 Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1457 if (referencedFileIds != null) {
1458 for (Integer referencedFileId : referencedFileIds) {
1459 if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1460 // active file that is not targeted for deletion is referenced so don't delete
1461 candidates.remove();
1462 break;
1463 }
1464 }
1465 if (gcCandidateSet.contains(candidate)) {
1466 ackMessageFileMap.remove(candidate);
1467 } else {
1468 if (LOG.isTraceEnabled()) {
1469 LOG.trace("not removing data file: " + candidate
1470 + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1471 }
1472 }
1473 }
1474 }
1475
1476 if (!gcCandidateSet.isEmpty()) {
1477 if (LOG.isDebugEnabled()) {
1478 LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
1479 }
1480 journal.removeDataFiles(gcCandidateSet);
1481 }
1482 }
1483
1484 LOG.debug("Checkpoint done.");
1485 }
1486
1487 final Runnable nullCompletionCallback = new Runnable() {
1488 @Override
1489 public void run() {
1490 }
1491 };
1492 private Location checkpointProducerAudit() throws IOException {
1493 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1494 ObjectOutputStream oout = new ObjectOutputStream(baos);
1495 oout.writeObject(metadata.producerSequenceIdTracker);
1496 oout.flush();
1497 oout.close();
1498 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1499 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1500 try {
1501 location.getLatch().await();
1502 } catch (InterruptedException e) {
1503 throw new InterruptedIOException(e.toString());
1504 }
1505 return location;
1506 }
1507
1508 public HashSet<Integer> getJournalFilesBeingReplicated() {
1509 return journalFilesBeingReplicated;
1510 }
1511
1512 // /////////////////////////////////////////////////////////////////
1513 // StoredDestination related implementation methods.
1514 // /////////////////////////////////////////////////////////////////
1515
1516 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1517
1518 class StoredSubscription {
1519 SubscriptionInfo subscriptionInfo;
1520 String lastAckId;
1521 Location lastAckLocation;
1522 Location cursor;
1523 }
1524
1525 static class MessageKeys {
1526 final String messageId;
1527 final Location location;
1528
1529 public MessageKeys(String messageId, Location location) {
1530 this.messageId=messageId;
1531 this.location=location;
1532 }
1533
1534 @Override
1535 public String toString() {
1536 return "["+messageId+","+location+"]";
1537 }
1538 }
1539
1540 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1541 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1542
1543 public MessageKeys readPayload(DataInput dataIn) throws IOException {
1544 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1545 }
1546
1547 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1548 dataOut.writeUTF(object.messageId);
1549 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1550 }
1551 }
1552
1553 class LastAck {
1554 long lastAckedSequence;
1555 byte priority;
1556
1557 public LastAck(LastAck source) {
1558 this.lastAckedSequence = source.lastAckedSequence;
1559 this.priority = source.priority;
1560 }
1561
1562 public LastAck() {
1563 this.priority = MessageOrderIndex.HI;
1564 }
1565
1566 public LastAck(long ackLocation) {
1567 this.lastAckedSequence = ackLocation;
1568 this.priority = MessageOrderIndex.LO;
1569 }
1570
1571 public LastAck(long ackLocation, byte priority) {
1572 this.lastAckedSequence = ackLocation;
1573 this.priority = priority;
1574 }
1575
1576 public String toString() {
1577 return "[" + lastAckedSequence + ":" + priority + "]";
1578 }
1579 }
1580
1581 protected class LastAckMarshaller implements Marshaller<LastAck> {
1582
1583 public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1584 dataOut.writeLong(object.lastAckedSequence);
1585 dataOut.writeByte(object.priority);
1586 }
1587
1588 public LastAck readPayload(DataInput dataIn) throws IOException {
1589 LastAck lastAcked = new LastAck();
1590 lastAcked.lastAckedSequence = dataIn.readLong();
1591 if (metadata.version >= 3) {
1592 lastAcked.priority = dataIn.readByte();
1593 }
1594 return lastAcked;
1595 }
1596
1597 public int getFixedSize() {
1598 return 9;
1599 }
1600
1601 public LastAck deepCopy(LastAck source) {
1602 return new LastAck(source);
1603 }
1604
1605 public boolean isDeepCopySupported() {
1606 return true;
1607 }
1608 }
1609
1610 class StoredDestination {
1611
1612 MessageOrderIndex orderIndex = new MessageOrderIndex();
1613 BTreeIndex<Location, Long> locationIndex;
1614 BTreeIndex<String, Long> messageIdIndex;
1615
1616 // These bits are only set for Topics
1617 BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1618 BTreeIndex<String, LastAck> subscriptionAcks;
1619 HashMap<String, MessageOrderCursor> subscriptionCursors;
1620 ListIndex<String, SequenceSet> ackPositions;
1621
1622 // Transient data used to track which Messages are no longer needed.
1623 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
1624 final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
1625 }
1626
1627 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1628
1629 public StoredDestination readPayload(final DataInput dataIn) throws IOException {
1630 final StoredDestination value = new StoredDestination();
1631 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1632 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1633 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1634
1635 if (dataIn.readBoolean()) {
1636 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1637 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1638 if (metadata.version >= 4) {
1639 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
1640 } else {
1641 // upgrade
1642 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1643 public void execute(Transaction tx) throws IOException {
1644 BTreeIndex<Long, HashSet<String>> oldAckPositions =
1645 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1646 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1647 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1648 oldAckPositions.load(tx);
1649
1650 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
1651
1652 // Do the initial build of the data in memory before writing into the store
1653 // based Ack Positions List to avoid a lot of disk thrashing.
1654 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
1655 while (iterator.hasNext()) {
1656 Entry<Long, HashSet<String>> entry = iterator.next();
1657
1658 for(String subKey : entry.getValue()) {
1659 SequenceSet pendingAcks = temp.get(subKey);
1660 if (pendingAcks == null) {
1661 pendingAcks = new SequenceSet();
1662 temp.put(subKey, pendingAcks);
1663 }
1664
1665 pendingAcks.add(entry.getKey());
1666 }
1667 }
1668
1669 // Now move the pending messages to ack data into the store backed
1670 // structure.
1671 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1672 for(String subscriptionKey : temp.keySet()) {
1673 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
1674 }
1675
1676 }
1677 });
1678 }
1679 }
1680 if (metadata.version >= 2) {
1681 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1682 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1683 } else {
1684 // upgrade
1685 pageFile.tx().execute(new Transaction.Closure<IOException>() {
1686 public void execute(Transaction tx) throws IOException {
1687 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1688 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1689 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1690 value.orderIndex.lowPriorityIndex.load(tx);
1691
1692 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1693 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1694 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1695 value.orderIndex.highPriorityIndex.load(tx);
1696 }
1697 });
1698 }
1699
1700 return value;
1701 }
1702
1703 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1704 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1705 dataOut.writeLong(value.locationIndex.getPageId());
1706 dataOut.writeLong(value.messageIdIndex.getPageId());
1707 if (value.subscriptions != null) {
1708 dataOut.writeBoolean(true);
1709 dataOut.writeLong(value.subscriptions.getPageId());
1710 dataOut.writeLong(value.subscriptionAcks.getPageId());
1711 dataOut.writeLong(value.ackPositions.getHeadPageId());
1712 } else {
1713 dataOut.writeBoolean(false);
1714 }
1715 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1716 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1717 }
1718 }
1719
1720 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1721 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1722
1723 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1724 KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1725 rc.mergeFramed((InputStream)dataIn);
1726 return rc;
1727 }
1728
1729 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1730 object.writeFramed((OutputStream)dataOut);
1731 }
1732 }
1733
1734 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1735 String key = key(destination);
1736 StoredDestination rc = storedDestinations.get(key);
1737 if (rc == null) {
1738 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1739 rc = loadStoredDestination(tx, key, topic);
1740 // Cache it. We may want to remove/unload destinations from the
1741 // cache that are not used for a while
1742 // to reduce memory usage.
1743 storedDestinations.put(key, rc);
1744 }
1745 return rc;
1746 }
1747
1748 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1749 String key = key(destination);
1750 StoredDestination rc = storedDestinations.get(key);
1751 if (rc == null && metadata.destinations.containsKey(tx, key)) {
1752 rc = getStoredDestination(destination, tx);
1753 }
1754 return rc;
1755 }
1756
1757 /**
1758 * @param tx
1759 * @param key
1760 * @param topic
1761 * @return
1762 * @throws IOException
1763 */
1764 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1765 // Try to load the existing indexes..
1766 StoredDestination rc = metadata.destinations.get(tx, key);
1767 if (rc == null) {
1768 // Brand new destination.. allocate indexes for it.
1769 rc = new StoredDestination();
1770 rc.orderIndex.allocate(tx);
1771 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1772 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1773
1774 if (topic) {
1775 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1776 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1777 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1778 }
1779 metadata.destinations.put(tx, key, rc);
1780 }
1781
1782 // Configure the marshalers and load.
1783 rc.orderIndex.load(tx);
1784
1785 // Figure out the next key using the last entry in the destination.
1786 rc.orderIndex.configureLast(tx);
1787
1788 rc.locationIndex.setKeyMarshaller(org.apache.kahadb.util.LocationMarshaller.INSTANCE);
1789 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1790 rc.locationIndex.load(tx);
1791
1792 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1793 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1794 rc.messageIdIndex.load(tx);
1795
1796 // If it was a topic...
1797 if (topic) {
1798
1799 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1800 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1801 rc.subscriptions.load(tx);
1802
1803 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1804 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1805 rc.subscriptionAcks.load(tx);
1806
1807 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
1808 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
1809 rc.ackPositions.load(tx);
1810
1811 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1812
1813 if (metadata.version < 3) {
1814
1815 // on upgrade need to fill ackLocation with available messages past last ack
1816 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1817 Entry<String, LastAck> entry = iterator.next();
1818 for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1819 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1820 Long sequence = orderIterator.next().getKey();
1821 addAckLocation(tx, rc, sequence, entry.getKey());
1822 }
1823 // modify so it is upgraded
1824 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1825 }
1826 }
1827
1828 // Configure the message references index
1829 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
1830 while (subscriptions.hasNext()) {
1831 Entry<String, SequenceSet> subscription = subscriptions.next();
1832 SequenceSet pendingAcks = subscription.getValue();
1833 if (pendingAcks != null && !pendingAcks.isEmpty()) {
1834 Long lastPendingAck = pendingAcks.getTail().getLast();
1835 for(Long sequenceId : pendingAcks) {
1836 Long current = rc.messageReferences.get(sequenceId);
1837 if (current == null) {
1838 current = new Long(0);
1839 }
1840
1841 // We always add a trailing empty entry for the next position to start from
1842 // so we need to ensure we don't count that as a message reference on reload.
1843 if (!sequenceId.equals(lastPendingAck)) {
1844 current = current.longValue() + 1;
1845 }
1846
1847 rc.messageReferences.put(sequenceId, current);
1848 }
1849 }
1850 }
1851
1852 // Configure the subscription cache
1853 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1854 Entry<String, LastAck> entry = iterator.next();
1855 rc.subscriptionCache.add(entry.getKey());
1856 }
1857
1858 if (rc.orderIndex.nextMessageId == 0) {
1859 // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1860 if (!rc.subscriptionAcks.isEmpty(tx)) {
1861 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1862 Entry<String, LastAck> entry = iterator.next();
1863 rc.orderIndex.nextMessageId =
1864 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1865 }
1866 }
1867 } else {
1868 // update based on ackPositions for unmatched, last entry is always the next
1869 if (!rc.messageReferences.isEmpty()) {
1870 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
1871 rc.orderIndex.nextMessageId =
1872 Math.max(rc.orderIndex.nextMessageId, nextMessageId);
1873 }
1874 }
1875 }
1876
1877 if (metadata.version < VERSION) {
1878 // store again after upgrade
1879 metadata.destinations.put(tx, key, rc);
1880 }
1881 return rc;
1882 }
1883
1884 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1885 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1886 if (sequences == null) {
1887 sequences = new SequenceSet();
1888 sequences.add(messageSequence);
1889 sd.ackPositions.add(tx, subscriptionKey, sequences);
1890 } else {
1891 sequences.add(messageSequence);
1892 sd.ackPositions.put(tx, subscriptionKey, sequences);
1893 }
1894
1895 Long count = sd.messageReferences.get(messageSequence);
1896 if (count == null) {
1897 count = Long.valueOf(0L);
1898 }
1899 count = count.longValue() + 1;
1900 sd.messageReferences.put(messageSequence, count);
1901 }
1902
1903 // new sub is interested in potentially all existing messages
1904 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1905 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1906 if (sequences == null) {
1907 sequences = new SequenceSet();
1908 sequences.add(messageSequence);
1909 sd.ackPositions.add(tx, subscriptionKey, sequences);
1910 } else {
1911 sequences.add(messageSequence);
1912 sd.ackPositions.put(tx, subscriptionKey, sequences);
1913 }
1914
1915 Long count = sd.messageReferences.get(messageSequence);
1916 if (count == null) {
1917 count = Long.valueOf(0L);
1918 }
1919 count = count.longValue() + 1;
1920 sd.messageReferences.put(messageSequence, count);
1921 }
1922
1923 // on a new message add, all existing subs are interested in this message
1924 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1925 for(String subscriptionKey : sd.subscriptionCache) {
1926 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
1927 if (sequences == null) {
1928 sequences = new SequenceSet();
1929 sequences.add(new Sequence(messageSequence, messageSequence + 1));
1930 sd.ackPositions.add(tx, subscriptionKey, sequences);
1931 } else {
1932 sequences.add(new Sequence(messageSequence, messageSequence + 1));
1933 sd.ackPositions.put(tx, subscriptionKey, sequences);
1934 }
1935
1936 Long count = sd.messageReferences.get(messageSequence);
1937 if (count == null) {
1938 count = Long.valueOf(0L);
1939 }
1940 count = count.longValue() + 1;
1941 sd.messageReferences.put(messageSequence, count);
1942 sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
1943 }
1944 }
1945
1946 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1947 if (!sd.ackPositions.isEmpty(tx)) {
1948 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
1949 if (sequences == null || sequences.isEmpty()) {
1950 return;
1951 }
1952
1953 ArrayList<Long> unreferenced = new ArrayList<Long>();
1954
1955 for(Long sequenceId : sequences) {
1956 Long references = sd.messageReferences.get(sequenceId);
1957 if (references != null) {
1958 references = references.longValue() - 1;
1959
1960 if (references.longValue() > 0) {
1961 sd.messageReferences.put(sequenceId, references);
1962 } else {
1963 sd.messageReferences.remove(sequenceId);
1964 unreferenced.add(sequenceId);
1965 }
1966 }
1967 }
1968
1969 for(Long sequenceId : unreferenced) {
1970 // Find all the entries that need to get deleted.
1971 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1972 sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
1973
1974 // Do the actual deletes.
1975 for (Entry<Long, MessageKeys> entry : deletes) {
1976 sd.locationIndex.remove(tx, entry.getValue().location);
1977 sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1978 sd.orderIndex.remove(tx, entry.getKey());
1979 }
1980 }
1981 }
1982 }
1983
1984 /**
1985 * @param tx
1986 * @param sd
1987 * @param subscriptionKey
1988 * @param messageSequence
1989 * @throws IOException
1990 */
1991 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
1992 // Remove the sub from the previous location set..
1993 if (messageSequence != null) {
1994 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
1995 if (range != null && !range.isEmpty()) {
1996 range.remove(messageSequence);
1997 if (!range.isEmpty()) {
1998 sd.ackPositions.put(tx, subscriptionKey, range);
1999 } else {
2000 sd.ackPositions.remove(tx, subscriptionKey);
2001 }
2002
2003 // Check if the message is reference by any other subscription.
2004 Long count = sd.messageReferences.get(messageSequence);
2005 if (count != null){
2006 long references = count.longValue() - 1;
2007 if (references > 0) {
2008 sd.messageReferences.put(messageSequence, Long.valueOf(references));
2009 return;
2010 } else {
2011 sd.messageReferences.remove(messageSequence);
2012 }
2013 }
2014
2015 // Find all the entries that need to get deleted.
2016 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2017 sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2018
2019 // Do the actual deletes.
2020 for (Entry<Long, MessageKeys> entry : deletes) {
2021 sd.locationIndex.remove(tx, entry.getValue().location);
2022 sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2023 sd.orderIndex.remove(tx, entry.getKey());
2024 }
2025 }
2026 }
2027 }
2028
2029 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2030 return sd.subscriptionAcks.get(tx, subscriptionKey);
2031 }
2032
2033 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2034 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2035 if (messageSequences != null) {
2036 long result = messageSequences.rangeSize();
2037 // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2038 return result > 0 ? result - 1 : 0;
2039 }
2040
2041 return 0;
2042 }
2043
2044 private String key(KahaDestination destination) {
2045 return destination.getType().getNumber() + ":" + destination.getName();
2046 }
2047
2048 // /////////////////////////////////////////////////////////////////
2049 // Transaction related implementation methods.
2050 // /////////////////////////////////////////////////////////////////
2051 @SuppressWarnings("rawtypes")
2052 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2053 @SuppressWarnings("rawtypes")
2054 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2055 protected final Set<String> ackedAndPrepared = new HashSet<String>();
2056
2057 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2058 // till then they are skipped by the store.
2059 // 'at most once' XA guarantee
2060 public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2061 this.indexLock.writeLock().lock();
2062 try {
2063 for (MessageAck ack : acks) {
2064 ackedAndPrepared.add(ack.getLastMessageId().toString());
2065 }
2066 } finally {
2067 this.indexLock.writeLock().unlock();
2068 }
2069 }
2070
2071 public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
2072 if (acks != null) {
2073 this.indexLock.writeLock().lock();
2074 try {
2075 for (MessageAck ack : acks) {
2076 ackedAndPrepared.remove(ack.getLastMessageId().toString());
2077 }
2078 } finally {
2079 this.indexLock.writeLock().unlock();
2080 }
2081 }
2082 }
2083
2084 @SuppressWarnings("rawtypes")
2085 private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
2086 TransactionId key = TransactionIdConversion.convert(info);
2087 List<Operation> tx;
2088 synchronized (inflightTransactions) {
2089 tx = inflightTransactions.get(key);
2090 if (tx == null) {
2091 tx = Collections.synchronizedList(new ArrayList<Operation>());
2092 inflightTransactions.put(key, tx);
2093 }
2094 }
2095 return tx;
2096 }
2097
2098 @SuppressWarnings("unused")
2099 private TransactionId key(KahaTransactionInfo transactionInfo) {
2100 return TransactionIdConversion.convert(transactionInfo);
2101 }
2102
2103 abstract class Operation <T extends JournalCommand<T>> {
2104 final T command;
2105 final Location location;
2106
2107 public Operation(T command, Location location) {
2108 this.command = command;
2109 this.location = location;
2110 }
2111
2112 public Location getLocation() {
2113 return location;
2114 }
2115
2116 public T getCommand() {
2117 return command;
2118 }
2119
2120 abstract public void execute(Transaction tx) throws IOException;
2121 }
2122
2123 class AddOpperation extends Operation<KahaAddMessageCommand> {
2124
2125 public AddOpperation(KahaAddMessageCommand command, Location location) {
2126 super(command, location);
2127 }
2128
2129 @Override
2130 public void execute(Transaction tx) throws IOException {
2131 upadateIndex(tx, command, location);
2132 }
2133
2134 }
2135
2136 class RemoveOpperation extends Operation<KahaRemoveMessageCommand> {
2137
2138 public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
2139 super(command, location);
2140 }
2141
2142 @Override
2143 public void execute(Transaction tx) throws IOException {
2144 updateIndex(tx, command, location);
2145 }
2146 }
2147
2148 // /////////////////////////////////////////////////////////////////
2149 // Initialization related implementation methods.
2150 // /////////////////////////////////////////////////////////////////
2151
2152 private PageFile createPageFile() {
2153 PageFile index = new PageFile(directory, "db");
2154 index.setEnableWriteThread(isEnableIndexWriteAsync());
2155 index.setWriteBatchSize(getIndexWriteBatchSize());
2156 index.setPageCacheSize(indexCacheSize);
2157 index.setUseLFRUEviction(isUseIndexLFRUEviction());
2158 index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2159 index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2160 index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2161 index.setEnablePageCaching(isEnableIndexPageCaching());
2162 return index;
2163 }
2164
2165 private Journal createJournal() throws IOException {
2166 Journal manager = new Journal();
2167 manager.setDirectory(directory);
2168 manager.setMaxFileLength(getJournalMaxFileLength());
2169 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2170 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2171 manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2172 manager.setArchiveDataLogs(isArchiveDataLogs());
2173 manager.setSizeAccumulator(storeSize);
2174 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2175 if (getDirectoryArchive() != null) {
2176 IOHelper.mkdirs(getDirectoryArchive());
2177 manager.setDirectoryArchive(getDirectoryArchive());
2178 }
2179 return manager;
2180 }
2181
2182 public int getJournalMaxWriteBatchSize() {
2183 return journalMaxWriteBatchSize;
2184 }
2185
2186 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2187 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2188 }
2189
2190 public File getDirectory() {
2191 return directory;
2192 }
2193
2194 public void setDirectory(File directory) {
2195 this.directory = directory;
2196 }
2197
2198 public boolean isDeleteAllMessages() {
2199 return deleteAllMessages;
2200 }
2201
2202 public void setDeleteAllMessages(boolean deleteAllMessages) {
2203 this.deleteAllMessages = deleteAllMessages;
2204 }
2205
2206 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2207 this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2208 }
2209
2210 public int getIndexWriteBatchSize() {
2211 return setIndexWriteBatchSize;
2212 }
2213
2214 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2215 this.enableIndexWriteAsync = enableIndexWriteAsync;
2216 }
2217
2218 boolean isEnableIndexWriteAsync() {
2219 return enableIndexWriteAsync;
2220 }
2221
2222 public boolean isEnableJournalDiskSyncs() {
2223 return enableJournalDiskSyncs;
2224 }
2225
2226 public void setEnableJournalDiskSyncs(boolean syncWrites) {
2227 this.enableJournalDiskSyncs = syncWrites;
2228 }
2229
2230 public long getCheckpointInterval() {
2231 return checkpointInterval;
2232 }
2233
2234 public void setCheckpointInterval(long checkpointInterval) {
2235 this.checkpointInterval = checkpointInterval;
2236 }
2237
2238 public long getCleanupInterval() {
2239 return cleanupInterval;
2240 }
2241
2242 public void setCleanupInterval(long cleanupInterval) {
2243 this.cleanupInterval = cleanupInterval;
2244 }
2245
2246 public void setJournalMaxFileLength(int journalMaxFileLength) {
2247 this.journalMaxFileLength = journalMaxFileLength;
2248 }
2249
2250 public int getJournalMaxFileLength() {
2251 return journalMaxFileLength;
2252 }
2253
2254 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2255 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2256 }
2257
2258 public int getMaxFailoverProducersToTrack() {
2259 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2260 }
2261
2262 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2263 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2264 }
2265
2266 public int getFailoverProducersAuditDepth() {
2267 return this.metadata.producerSequenceIdTracker.getAuditDepth();
2268 }
2269
2270 public PageFile getPageFile() {
2271 if (pageFile == null) {
2272 pageFile = createPageFile();
2273 }
2274 return pageFile;
2275 }
2276
2277 public Journal getJournal() throws IOException {
2278 if (journal == null) {
2279 journal = createJournal();
2280 }
2281 return journal;
2282 }
2283
2284 public boolean isFailIfDatabaseIsLocked() {
2285 return failIfDatabaseIsLocked;
2286 }
2287
2288 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
2289 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
2290 }
2291
2292 public boolean isIgnoreMissingJournalfiles() {
2293 return ignoreMissingJournalfiles;
2294 }
2295
2296 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
2297 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
2298 }
2299
2300 public int getIndexCacheSize() {
2301 return indexCacheSize;
2302 }
2303
2304 public void setIndexCacheSize(int indexCacheSize) {
2305 this.indexCacheSize = indexCacheSize;
2306 }
2307
2308 public boolean isCheckForCorruptJournalFiles() {
2309 return checkForCorruptJournalFiles;
2310 }
2311
2312 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
2313 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
2314 }
2315
2316 public boolean isChecksumJournalFiles() {
2317 return checksumJournalFiles;
2318 }
2319
2320 public void setChecksumJournalFiles(boolean checksumJournalFiles) {
2321 this.checksumJournalFiles = checksumJournalFiles;
2322 }
2323
2324 public void setBrokerService(BrokerService brokerService) {
2325 this.brokerService = brokerService;
2326 }
2327
2328 /**
2329 * @return the archiveDataLogs
2330 */
2331 public boolean isArchiveDataLogs() {
2332 return this.archiveDataLogs;
2333 }
2334
2335 /**
2336 * @param archiveDataLogs the archiveDataLogs to set
2337 */
2338 public void setArchiveDataLogs(boolean archiveDataLogs) {
2339 this.archiveDataLogs = archiveDataLogs;
2340 }
2341
2342 /**
2343 * @return the directoryArchive
2344 */
2345 public File getDirectoryArchive() {
2346 return this.directoryArchive;
2347 }
2348
2349 /**
2350 * @param directoryArchive the directoryArchive to set
2351 */
2352 public void setDirectoryArchive(File directoryArchive) {
2353 this.directoryArchive = directoryArchive;
2354 }
2355
2356 /**
2357 * @return the databaseLockedWaitDelay
2358 */
2359 public int getDatabaseLockedWaitDelay() {
2360 return this.databaseLockedWaitDelay;
2361 }
2362
2363 /**
2364 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
2365 */
2366 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
2367 this.databaseLockedWaitDelay = databaseLockedWaitDelay;
2368 }
2369
2370 public boolean isRewriteOnRedelivery() {
2371 return rewriteOnRedelivery;
2372 }
2373
2374 public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
2375 this.rewriteOnRedelivery = rewriteOnRedelivery;
2376 }
2377
2378 public boolean isArchiveCorruptedIndex() {
2379 return archiveCorruptedIndex;
2380 }
2381
2382 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
2383 this.archiveCorruptedIndex = archiveCorruptedIndex;
2384 }
2385
2386 public float getIndexLFUEvictionFactor() {
2387 return indexLFUEvictionFactor;
2388 }
2389
2390 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
2391 this.indexLFUEvictionFactor = indexLFUEvictionFactor;
2392 }
2393
2394 public boolean isUseIndexLFRUEviction() {
2395 return useIndexLFRUEviction;
2396 }
2397
2398 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
2399 this.useIndexLFRUEviction = useIndexLFRUEviction;
2400 }
2401
2402 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
2403 this.enableIndexDiskSyncs = enableIndexDiskSyncs;
2404 }
2405
2406 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
2407 this.enableIndexRecoveryFile = enableIndexRecoveryFile;
2408 }
2409
2410 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
2411 this.enableIndexPageCaching = enableIndexPageCaching;
2412 }
2413
2414 public boolean isEnableIndexDiskSyncs() {
2415 return enableIndexDiskSyncs;
2416 }
2417
2418 public boolean isEnableIndexRecoveryFile() {
2419 return enableIndexRecoveryFile;
2420 }
2421
2422 public boolean isEnableIndexPageCaching() {
2423 return enableIndexPageCaching;
2424 }
2425
2426 // /////////////////////////////////////////////////////////////////
2427 // Internal conversion methods.
2428 // /////////////////////////////////////////////////////////////////
2429
2430 class MessageOrderCursor{
2431 long defaultCursorPosition;
2432 long lowPriorityCursorPosition;
2433 long highPriorityCursorPosition;
2434 MessageOrderCursor(){
2435 }
2436
2437 MessageOrderCursor(long position){
2438 this.defaultCursorPosition=position;
2439 this.lowPriorityCursorPosition=position;
2440 this.highPriorityCursorPosition=position;
2441 }
2442
2443 MessageOrderCursor(MessageOrderCursor other){
2444 this.defaultCursorPosition=other.defaultCursorPosition;
2445 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2446 this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2447 }
2448
2449 MessageOrderCursor copy() {
2450 return new MessageOrderCursor(this);
2451 }
2452
2453 void reset() {
2454 this.defaultCursorPosition=0;
2455 this.highPriorityCursorPosition=0;
2456 this.lowPriorityCursorPosition=0;
2457 }
2458
2459 void increment() {
2460 if (defaultCursorPosition!=0) {
2461 defaultCursorPosition++;
2462 }
2463 if (highPriorityCursorPosition!=0) {
2464 highPriorityCursorPosition++;
2465 }
2466 if (lowPriorityCursorPosition!=0) {
2467 lowPriorityCursorPosition++;
2468 }
2469 }
2470
2471 public String toString() {
2472 return "MessageOrderCursor:[def:" + defaultCursorPosition
2473 + ", low:" + lowPriorityCursorPosition
2474 + ", high:" + highPriorityCursorPosition + "]";
2475 }
2476
2477 public void sync(MessageOrderCursor other) {
2478 this.defaultCursorPosition=other.defaultCursorPosition;
2479 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2480 this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2481 }
2482 }
2483
2484 class MessageOrderIndex {
2485 static final byte HI = 9;
2486 static final byte LO = 0;
2487 static final byte DEF = 4;
2488
2489 long nextMessageId;
2490 BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2491 BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2492 BTreeIndex<Long, MessageKeys> highPriorityIndex;
2493 MessageOrderCursor cursor = new MessageOrderCursor();
2494 Long lastDefaultKey;
2495 Long lastHighKey;
2496 Long lastLowKey;
2497 byte lastGetPriority;
2498
2499 MessageKeys remove(Transaction tx, Long key) throws IOException {
2500 MessageKeys result = defaultPriorityIndex.remove(tx, key);
2501 if (result == null && highPriorityIndex!=null) {
2502 result = highPriorityIndex.remove(tx, key);
2503 if (result ==null && lowPriorityIndex!=null) {
2504 result = lowPriorityIndex.remove(tx, key);
2505 }
2506 }
2507 return result;
2508 }
2509
2510 void load(Transaction tx) throws IOException {
2511 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2512 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2513 defaultPriorityIndex.load(tx);
2514 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2515 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2516 lowPriorityIndex.load(tx);
2517 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2518 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2519 highPriorityIndex.load(tx);
2520 }
2521
2522 void allocate(Transaction tx) throws IOException {
2523 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2524 if (metadata.version >= 2) {
2525 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2526 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2527 }
2528 }
2529
2530 void configureLast(Transaction tx) throws IOException {
2531 // Figure out the next key using the last entry in the destination.
2532 if (highPriorityIndex != null) {
2533 Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2534 if (lastEntry != null) {
2535 nextMessageId = lastEntry.getKey() + 1;
2536 } else {
2537 lastEntry = defaultPriorityIndex.getLast(tx);
2538 if (lastEntry != null) {
2539 nextMessageId = lastEntry.getKey() + 1;
2540 } else {
2541 lastEntry = lowPriorityIndex.getLast(tx);
2542 if (lastEntry != null) {
2543 nextMessageId = lastEntry.getKey() + 1;
2544 }
2545 }
2546 }
2547 } else {
2548 Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2549 if (lastEntry != null) {
2550 nextMessageId = lastEntry.getKey() + 1;
2551 }
2552 }
2553 }
2554
2555 void clear(Transaction tx) throws IOException {
2556 this.remove(tx);
2557 this.resetCursorPosition();
2558 this.allocate(tx);
2559 this.load(tx);
2560 this.configureLast(tx);
2561 }
2562
2563 void remove(Transaction tx) throws IOException {
2564 defaultPriorityIndex.clear(tx);
2565 defaultPriorityIndex.unload(tx);
2566 tx.free(defaultPriorityIndex.getPageId());
2567 if (lowPriorityIndex != null) {
2568 lowPriorityIndex.clear(tx);
2569 lowPriorityIndex.unload(tx);
2570
2571 tx.free(lowPriorityIndex.getPageId());
2572 }
2573 if (highPriorityIndex != null) {
2574 highPriorityIndex.clear(tx);
2575 highPriorityIndex.unload(tx);
2576 tx.free(highPriorityIndex.getPageId());
2577 }
2578 }
2579
2580 void resetCursorPosition() {
2581 this.cursor.reset();
2582 lastDefaultKey = null;
2583 lastHighKey = null;
2584 lastLowKey = null;
2585 }
2586
2587 void setBatch(Transaction tx, Long sequence) throws IOException {
2588 if (sequence != null) {
2589 Long nextPosition = new Long(sequence.longValue() + 1);
2590 if (defaultPriorityIndex.containsKey(tx, sequence)) {
2591 lastDefaultKey = sequence;
2592 cursor.defaultCursorPosition = nextPosition.longValue();
2593 } else if (highPriorityIndex != null) {
2594 if (highPriorityIndex.containsKey(tx, sequence)) {
2595 lastHighKey = sequence;
2596 cursor.highPriorityCursorPosition = nextPosition.longValue();
2597 } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2598 lastLowKey = sequence;
2599 cursor.lowPriorityCursorPosition = nextPosition.longValue();
2600 }
2601 } else {
2602 LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
2603 lastDefaultKey = sequence;
2604 cursor.defaultCursorPosition = nextPosition.longValue();
2605 }
2606 }
2607 }
2608
2609 void setBatch(Transaction tx, LastAck last) throws IOException {
2610 setBatch(tx, last.lastAckedSequence);
2611 if (cursor.defaultCursorPosition == 0
2612 && cursor.highPriorityCursorPosition == 0
2613 && cursor.lowPriorityCursorPosition == 0) {
2614 long next = last.lastAckedSequence + 1;
2615 switch (last.priority) {
2616 case DEF:
2617 cursor.defaultCursorPosition = next;
2618 cursor.highPriorityCursorPosition = next;
2619 break;
2620 case HI:
2621 cursor.highPriorityCursorPosition = next;
2622 break;
2623 case LO:
2624 cursor.lowPriorityCursorPosition = next;
2625 cursor.defaultCursorPosition = next;
2626 cursor.highPriorityCursorPosition = next;
2627 break;
2628 }
2629 }
2630 }
2631
2632 void stoppedIterating() {
2633 if (lastDefaultKey!=null) {
2634 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2635 }
2636 if (lastHighKey!=null) {
2637 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2638 }
2639 if (lastLowKey!=null) {
2640 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2641 }
2642 lastDefaultKey = null;
2643 lastHighKey = null;
2644 lastLowKey = null;
2645 }
2646
2647 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2648 throws IOException {
2649 if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2650 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2651 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2652 getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2653 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2654 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2655 }
2656 }
2657
2658 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2659 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2660
2661 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2662 deletes.add(iterator.next());
2663 }
2664
2665 long getNextMessageId(int priority) {
2666 return nextMessageId++;
2667 }
2668
2669 MessageKeys get(Transaction tx, Long key) throws IOException {
2670 MessageKeys result = defaultPriorityIndex.get(tx, key);
2671 if (result == null) {
2672 result = highPriorityIndex.get(tx, key);
2673 if (result == null) {
2674 result = lowPriorityIndex.get(tx, key);
2675 lastGetPriority = LO;
2676 } else {
2677 lastGetPriority = HI;
2678 }
2679 } else {
2680 lastGetPriority = DEF;
2681 }
2682 return result;
2683 }
2684
2685 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2686 if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2687 return defaultPriorityIndex.put(tx, key, value);
2688 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2689 return highPriorityIndex.put(tx, key, value);
2690 } else {
2691 return lowPriorityIndex.put(tx, key, value);
2692 }
2693 }
2694
2695 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2696 return new MessageOrderIterator(tx,cursor);
2697 }
2698
2699 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2700 return new MessageOrderIterator(tx,m);
2701 }
2702
2703 public byte lastGetPriority() {
2704 return lastGetPriority;
2705 }
2706
2707 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2708 Iterator<Entry<Long, MessageKeys>>currentIterator;
2709 final Iterator<Entry<Long, MessageKeys>>highIterator;
2710 final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2711 final Iterator<Entry<Long, MessageKeys>>lowIterator;
2712
2713 MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2714 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2715 if (highPriorityIndex != null) {
2716 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2717 } else {
2718 this.highIterator = null;
2719 }
2720 if (lowPriorityIndex != null) {
2721 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2722 } else {
2723 this.lowIterator = null;
2724 }
2725 }
2726
2727 public boolean hasNext() {
2728 if (currentIterator == null) {
2729 if (highIterator != null) {
2730 if (highIterator.hasNext()) {
2731 currentIterator = highIterator;
2732 return currentIterator.hasNext();
2733 }
2734 if (defaultIterator.hasNext()) {
2735 currentIterator = defaultIterator;
2736 return currentIterator.hasNext();
2737 }
2738 if (lowIterator.hasNext()) {
2739 currentIterator = lowIterator;
2740 return currentIterator.hasNext();
2741 }
2742 return false;
2743 } else {
2744 currentIterator = defaultIterator;
2745 return currentIterator.hasNext();
2746 }
2747 }
2748 if (highIterator != null) {
2749 if (currentIterator.hasNext()) {
2750 return true;
2751 }
2752 if (currentIterator == highIterator) {
2753 if (defaultIterator.hasNext()) {
2754 currentIterator = defaultIterator;
2755 return currentIterator.hasNext();
2756 }
2757 if (lowIterator.hasNext()) {
2758 currentIterator = lowIterator;
2759 return currentIterator.hasNext();
2760 }
2761 return false;
2762 }
2763
2764 if (currentIterator == defaultIterator) {
2765 if (lowIterator.hasNext()) {
2766 currentIterator = lowIterator;
2767 return currentIterator.hasNext();
2768 }
2769 return false;
2770 }
2771 }
2772 return currentIterator.hasNext();
2773 }
2774
2775 public Entry<Long, MessageKeys> next() {
2776 Entry<Long, MessageKeys> result = currentIterator.next();
2777 if (result != null) {
2778 Long key = result.getKey();
2779 if (highIterator != null) {
2780 if (currentIterator == defaultIterator) {
2781 lastDefaultKey = key;
2782 } else if (currentIterator == highIterator) {
2783 lastHighKey = key;
2784 } else {
2785 lastLowKey = key;
2786 }
2787 } else {
2788 lastDefaultKey = key;
2789 }
2790 }
2791 return result;
2792 }
2793
2794 public void remove() {
2795 throw new UnsupportedOperationException();
2796 }
2797
2798 }
2799 }
2800
2801 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2802 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2803
2804 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2805 ByteArrayOutputStream baos = new ByteArrayOutputStream();
2806 ObjectOutputStream oout = new ObjectOutputStream(baos);
2807 oout.writeObject(object);
2808 oout.flush();
2809 oout.close();
2810 byte[] data = baos.toByteArray();
2811 dataOut.writeInt(data.length);
2812 dataOut.write(data);
2813 }
2814
2815 @SuppressWarnings("unchecked")
2816 public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2817 int dataLen = dataIn.readInt();
2818 byte[] data = new byte[dataLen];
2819 dataIn.readFully(data);
2820 ByteArrayInputStream bais = new ByteArrayInputStream(data);
2821 ObjectInputStream oin = new ObjectInputStream(bais);
2822 try {
2823 return (HashSet<String>) oin.readObject();
2824 } catch (ClassNotFoundException cfe) {
2825 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2826 ioe.initCause(cfe);
2827 throw ioe;
2828 }
2829 }
2830 }
2831 }