001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.amq;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.io.RandomAccessFile;
022 import java.nio.channels.FileLock;
023 import java.util.Date;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.Iterator;
027 import java.util.Map;
028 import java.util.Set;
029 import java.util.concurrent.ConcurrentHashMap;
030 import java.util.concurrent.CountDownLatch;
031 import java.util.concurrent.atomic.AtomicBoolean;
032 import java.util.concurrent.atomic.AtomicInteger;
033 import java.util.concurrent.atomic.AtomicLong;
034 import org.apache.activeio.journal.Journal;
035 import org.apache.activemq.broker.BrokerService;
036 import org.apache.activemq.broker.BrokerServiceAware;
037 import org.apache.activemq.broker.ConnectionContext;
038 import org.apache.activemq.command.ActiveMQDestination;
039 import org.apache.activemq.command.ActiveMQQueue;
040 import org.apache.activemq.command.ActiveMQTopic;
041 import org.apache.activemq.command.DataStructure;
042 import org.apache.activemq.command.JournalQueueAck;
043 import org.apache.activemq.command.JournalTopicAck;
044 import org.apache.activemq.command.JournalTrace;
045 import org.apache.activemq.command.JournalTransaction;
046 import org.apache.activemq.command.Message;
047 import org.apache.activemq.command.ProducerId;
048 import org.apache.activemq.command.SubscriptionInfo;
049 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
050 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
051 import org.apache.activemq.kaha.impl.async.Location;
052 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
053 import org.apache.activemq.openwire.OpenWireFormat;
054 import org.apache.activemq.store.MessageStore;
055 import org.apache.activemq.store.PersistenceAdapter;
056 import org.apache.activemq.store.ReferenceStore;
057 import org.apache.activemq.store.ReferenceStoreAdapter;
058 import org.apache.activemq.store.TopicMessageStore;
059 import org.apache.activemq.store.TopicReferenceStore;
060 import org.apache.activemq.store.TransactionStore;
061 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
062 import org.apache.activemq.thread.Scheduler;
063 import org.apache.activemq.thread.Task;
064 import org.apache.activemq.thread.TaskRunner;
065 import org.apache.activemq.thread.TaskRunnerFactory;
066 import org.apache.activemq.usage.SystemUsage;
067 import org.apache.activemq.usage.Usage;
068 import org.apache.activemq.usage.UsageListener;
069 import org.apache.activemq.util.ByteSequence;
070 import org.apache.activemq.util.IOExceptionSupport;
071 import org.apache.activemq.util.IOHelper;
072 import org.apache.activemq.wireformat.WireFormat;
073 import org.slf4j.Logger;
074 import org.slf4j.LoggerFactory;
075
076
077 /**
078 * An implementation of {@link PersistenceAdapter} designed for use with a
079 * {@link Journal} and then check pointing asynchronously on a timeout with some
080 * other long term persistent storage.
081 *
082 * @org.apache.xbean.XBean element="amqPersistenceAdapter"
083 *
084 */
085 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
086
087 private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
088 private Scheduler scheduler;
089 private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
090 private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
091 private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
092 private static final boolean BROKEN_FILE_LOCK;
093 private static final boolean DISABLE_LOCKING;
094 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
095 private AsyncDataManager asyncDataManager;
096 private ReferenceStoreAdapter referenceStoreAdapter;
097 private TaskRunnerFactory taskRunnerFactory;
098 private WireFormat wireFormat = new OpenWireFormat();
099 private SystemUsage usageManager;
100 private long checkpointInterval = 1000 * 20;
101 private int maxCheckpointMessageAddSize = 1024 * 4;
102 private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
103 private TaskRunner checkpointTask;
104 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
105 private final AtomicBoolean started = new AtomicBoolean(false);
106 private Runnable periodicCheckpointTask;
107 private Runnable periodicCleanupTask;
108 private boolean deleteAllMessages;
109 private boolean syncOnWrite;
110 private String brokerName = "";
111 private File directory;
112 private File directoryArchive;
113 private BrokerService brokerService;
114 private final AtomicLong storeSize = new AtomicLong();
115 private boolean persistentIndex=true;
116 private boolean useNio = true;
117 private boolean archiveDataLogs=false;
118 private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
119 private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
120 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
121 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
122 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
123 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
124 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
125 private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
126 private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
127 private RandomAccessFile lockFile;
128 private FileLock lock;
129 private boolean disableLocking = DISABLE_LOCKING;
130 private boolean failIfJournalIsLocked;
131 private boolean lockLogged;
132 private boolean lockAquired;
133 private boolean recoverReferenceStore=true;
134 private boolean forceRecoverReferenceStore=false;
135 private boolean useDedicatedTaskRunner=false;
136 private int journalThreadPriority = Thread.MAX_PRIORITY;
137
138 public String getBrokerName() {
139 return this.brokerName;
140 }
141
142 public void setBrokerName(String brokerName) {
143 this.brokerName = brokerName;
144 if (this.referenceStoreAdapter != null) {
145 this.referenceStoreAdapter.setBrokerName(brokerName);
146 }
147 }
148
149 public BrokerService getBrokerService() {
150 return brokerService;
151 }
152
153 public void setBrokerService(BrokerService brokerService) {
154 this.brokerService = brokerService;
155 }
156
157 public synchronized void start() throws Exception {
158 if (!started.compareAndSet(false, true)) {
159 return;
160 }
161 if (this.directory == null) {
162 if (brokerService != null) {
163 this.directory = brokerService.getBrokerDataDirectory();
164
165 } else {
166 this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
167 this.directory = new File(directory, "amqstore");
168 directory.getAbsolutePath();
169 }
170 }
171 if (this.directoryArchive == null) {
172 this.directoryArchive = new File(this.directory,"archive");
173 }
174 if (this.brokerService != null) {
175 this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
176 this.scheduler = this.brokerService.getScheduler();
177 } else {
178 this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
179 true, 1000, isUseDedicatedTaskRunner());
180 this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
181 }
182
183 IOHelper.mkdirs(this.directory);
184 lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
185 lock();
186 LOG.info("AMQStore starting using directory: " + directory);
187 if (archiveDataLogs) {
188 IOHelper.mkdirs(this.directoryArchive);
189 }
190
191 if (this.usageManager != null) {
192 this.usageManager.getMemoryUsage().addUsageListener(this);
193 }
194 if (asyncDataManager == null) {
195 asyncDataManager = createAsyncDataManager();
196 }
197 if (referenceStoreAdapter == null) {
198 referenceStoreAdapter = createReferenceStoreAdapter();
199 }
200 referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
201 referenceStoreAdapter.setBrokerName(getBrokerName());
202 referenceStoreAdapter.setUsageManager(usageManager);
203 referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
204
205 if (failIfJournalIsLocked) {
206 asyncDataManager.lock();
207 } else {
208 while (true) {
209 try {
210 asyncDataManager.lock();
211 break;
212 } catch (IOException e) {
213 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
214 try {
215 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
216 } catch (InterruptedException e1) {
217 }
218 }
219 }
220 }
221
222 asyncDataManager.start();
223 if (deleteAllMessages) {
224 asyncDataManager.delete();
225 try {
226 JournalTrace trace = new JournalTrace();
227 trace.setMessage("DELETED " + new Date());
228 Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
229 asyncDataManager.setMark(location, true);
230 LOG.info("Journal deleted: ");
231 deleteAllMessages = false;
232 } catch (IOException e) {
233 throw e;
234 } catch (Throwable e) {
235 throw IOExceptionSupport.create(e);
236 }
237 referenceStoreAdapter.deleteAllMessages();
238 }
239 referenceStoreAdapter.start();
240 Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
241 LOG.info("Active data files: " + files);
242 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
243
244 public boolean iterate() {
245 doCheckpoint();
246 return false;
247 }
248 }, "ActiveMQ Journal Checkpoint Worker");
249 createTransactionStore();
250
251 //
252 // The following was attempting to reduce startup times by avoiding the
253 // log
254 // file scanning that recovery performs. The problem with it is that XA
255 // transactions
256 // only live in transaction log and are not stored in the reference
257 // store, but they still
258 // need to be recovered when the broker starts up.
259
260 if (isForceRecoverReferenceStore()
261 || (isRecoverReferenceStore() && !referenceStoreAdapter
262 .isStoreValid())) {
263 LOG.warn("The ReferenceStore is not valid - recovering ...");
264 recover();
265 LOG.info("Finished recovering the ReferenceStore");
266 } else {
267 Location location = writeTraceMessage("RECOVERED " + new Date(),
268 true);
269 asyncDataManager.setMark(location, true);
270 // recover transactions
271 getTransactionStore().setPreparedTransactions(
272 referenceStoreAdapter.retrievePreparedState());
273 }
274
275 // Do a checkpoint periodically.
276 periodicCheckpointTask = new Runnable() {
277
278 public void run() {
279 checkpoint(false);
280 }
281 };
282 scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
283 periodicCleanupTask = new Runnable() {
284
285 public void run() {
286 cleanup();
287 }
288 };
289 scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
290
291 if (lockAquired && lockLogged) {
292 LOG.info("Aquired lock for AMQ Store" + getDirectory());
293 if (brokerService != null) {
294 brokerService.getBroker().nowMasterBroker();
295 }
296 }
297
298 }
299
300 public void stop() throws Exception {
301
302 if (!started.compareAndSet(true, false)) {
303 return;
304 }
305 unlock();
306 if (lockFile != null) {
307 lockFile.close();
308 lockFile = null;
309 }
310 this.usageManager.getMemoryUsage().removeUsageListener(this);
311 synchronized (this) {
312 scheduler.cancel(periodicCheckpointTask);
313 scheduler.cancel(periodicCleanupTask);
314 }
315 Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
316 while (queueIterator.hasNext()) {
317 AMQMessageStore ms = queueIterator.next();
318 ms.stop();
319 }
320 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
321 while (topicIterator.hasNext()) {
322 final AMQTopicMessageStore ms = topicIterator.next();
323 ms.stop();
324 }
325 // Take one final checkpoint and stop checkpoint processing.
326 checkpoint(true);
327 synchronized (this) {
328 checkpointTask.shutdown();
329 }
330 referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
331 queues.clear();
332 topics.clear();
333 IOException firstException = null;
334 referenceStoreAdapter.stop();
335 referenceStoreAdapter = null;
336
337 if (this.brokerService == null) {
338 this.taskRunnerFactory.shutdown();
339 this.scheduler.stop();
340 }
341 try {
342 LOG.debug("Journal close");
343 asyncDataManager.close();
344 } catch (Exception e) {
345 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
346 }
347 if (firstException != null) {
348 throw firstException;
349 }
350 }
351
352 /**
353 * When we checkpoint we move all the journalled data to long term storage.
354 *
355 * @param sync
356 */
357 public void checkpoint(boolean sync) {
358 try {
359 if (asyncDataManager == null) {
360 throw new IllegalStateException("Journal is closed.");
361 }
362 CountDownLatch latch = null;
363 synchronized (this) {
364 latch = nextCheckpointCountDownLatch;
365 checkpointTask.wakeup();
366 }
367 if (sync) {
368 if (LOG.isDebugEnabled()) {
369 LOG.debug("Waitng for checkpoint to complete.");
370 }
371 latch.await();
372 }
373 referenceStoreAdapter.checkpoint(sync);
374 } catch (InterruptedException e) {
375 Thread.currentThread().interrupt();
376 LOG.warn("Request to start checkpoint failed: " + e, e);
377 } catch (IOException e) {
378 LOG.error("checkpoint failed: " + e, e);
379 }
380 }
381
382 /**
383 * This does the actual checkpoint.
384 *
385 * @return true if successful
386 */
387 public boolean doCheckpoint() {
388 CountDownLatch latch = null;
389 synchronized (this) {
390 latch = nextCheckpointCountDownLatch;
391 nextCheckpointCountDownLatch = new CountDownLatch(1);
392 }
393 try {
394 if (LOG.isDebugEnabled()) {
395 LOG.debug("Checkpoint started.");
396 }
397
398 Location currentMark = asyncDataManager.getMark();
399 Location newMark = currentMark;
400 Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
401 while (queueIterator.hasNext()) {
402 final AMQMessageStore ms = queueIterator.next();
403 Location mark = ms.getMark();
404 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
405 newMark = mark;
406 }
407 }
408 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
409 while (topicIterator.hasNext()) {
410 final AMQTopicMessageStore ms = topicIterator.next();
411 Location mark = ms.getMark();
412 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
413 newMark = mark;
414 }
415 }
416 try {
417 if (newMark != currentMark) {
418 if (LOG.isDebugEnabled()) {
419 LOG.debug("Marking journal at: " + newMark);
420 }
421 asyncDataManager.setMark(newMark, false);
422 writeTraceMessage("CHECKPOINT " + new Date(), true);
423 }
424 } catch (Exception e) {
425 LOG.error("Failed to mark the Journal: " + e, e);
426 }
427 if (LOG.isDebugEnabled()) {
428 LOG.debug("Checkpoint done.");
429 }
430 } finally {
431 latch.countDown();
432 }
433 return true;
434 }
435
436 /**
437 * Cleans up the data files
438 * @throws IOException
439 */
440 public void cleanup() {
441 try {
442 Set<Integer>inProgress = new HashSet<Integer>();
443 if (LOG.isDebugEnabled()) {
444 LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
445 }
446 for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
447 inProgress.addAll(set.keySet());
448 }
449 Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
450 inProgress.add(lastDataFile);
451 lastDataFile = asyncDataManager.getMark().getDataFileId();
452 inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
453 Location lastActiveTx = transactionStore.checkpoint();
454 if (lastActiveTx != null) {
455 lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
456 }
457 LOG.debug("lastDataFile: " + lastDataFile);
458 asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
459 } catch (IOException e) {
460 LOG.error("Could not cleanup data files: " + e, e);
461 }
462 }
463
464 public Set<ActiveMQDestination> getDestinations() {
465 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
466 destinations.addAll(queues.keySet());
467 destinations.addAll(topics.keySet());
468 return destinations;
469 }
470
471 MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
472 if (destination.isQueue()) {
473 return createQueueMessageStore((ActiveMQQueue)destination);
474 } else {
475 return createTopicMessageStore((ActiveMQTopic)destination);
476 }
477 }
478
479 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
480 AMQMessageStore store = queues.get(destination);
481 if (store == null) {
482 ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
483 store = new AMQMessageStore(this, checkpointStore, destination);
484 try {
485 store.start();
486 } catch (Exception e) {
487 throw IOExceptionSupport.create(e);
488 }
489 queues.put(destination, store);
490 }
491 return store;
492 }
493
494 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
495 AMQTopicMessageStore store = topics.get(destinationName);
496 if (store == null) {
497 TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
498 store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
499 try {
500 store.start();
501 } catch (Exception e) {
502 throw IOExceptionSupport.create(e);
503 }
504 topics.put(destinationName, store);
505 }
506 return store;
507 }
508
509 /**
510 * Cleanup method to remove any state associated with the given destination
511 *
512 * @param destination
513 */
514 public void removeQueueMessageStore(ActiveMQQueue destination) {
515 AMQMessageStore store= queues.remove(destination);
516 referenceStoreAdapter.removeQueueMessageStore(destination);
517 }
518
519 /**
520 * Cleanup method to remove any state associated with the given destination
521 *
522 * @param destination
523 */
524 public void removeTopicMessageStore(ActiveMQTopic destination) {
525 topics.remove(destination);
526 }
527
528 public TransactionStore createTransactionStore() throws IOException {
529 return transactionStore;
530 }
531
532 public long getLastMessageBrokerSequenceId() throws IOException {
533 return referenceStoreAdapter.getLastMessageBrokerSequenceId();
534 }
535
536 public void beginTransaction(ConnectionContext context) throws IOException {
537 referenceStoreAdapter.beginTransaction(context);
538 }
539
540 public void commitTransaction(ConnectionContext context) throws IOException {
541 referenceStoreAdapter.commitTransaction(context);
542 }
543
544 public void rollbackTransaction(ConnectionContext context) throws IOException {
545 referenceStoreAdapter.rollbackTransaction(context);
546 }
547
548 public boolean isPersistentIndex() {
549 return persistentIndex;
550 }
551
552 public void setPersistentIndex(boolean persistentIndex) {
553 this.persistentIndex = persistentIndex;
554 }
555
556 /**
557 * @param location
558 * @return
559 * @throws IOException
560 */
561 public DataStructure readCommand(Location location) throws IOException {
562 try {
563 ByteSequence packet = asyncDataManager.read(location);
564 return (DataStructure)wireFormat.unmarshal(packet);
565 } catch (IOException e) {
566 throw createReadException(location, e);
567 }
568 }
569
570 /**
571 * Move all the messages that were in the journal into long term storage. We
572 * just replay and do a checkpoint.
573 *
574 * @throws IOException
575 * @throws IOException
576 * @throws IllegalStateException
577 */
578 private void recover() throws IllegalStateException, IOException {
579 referenceStoreAdapter.clearMessages();
580 Location pos = null;
581 int redoCounter = 0;
582 LOG.info("Journal Recovery Started from: " + asyncDataManager);
583 long start = System.currentTimeMillis();
584 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
585 // While we have records in the journal.
586 while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
587 ByteSequence data = asyncDataManager.read(pos);
588 DataStructure c = (DataStructure)wireFormat.unmarshal(data);
589 if (c instanceof Message) {
590 Message message = (Message)c;
591 AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
592 if (message.isInTransaction()) {
593 transactionStore.addMessage(store, message, pos);
594 } else {
595 if (store.replayAddMessage(context, message, pos)) {
596 redoCounter++;
597 }
598 }
599 } else {
600 switch (c.getDataStructureType()) {
601 case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
602 referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
603 }
604 break;
605 case JournalQueueAck.DATA_STRUCTURE_TYPE: {
606 JournalQueueAck command = (JournalQueueAck)c;
607 AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
608 if (command.getMessageAck().isInTransaction()) {
609 transactionStore.removeMessage(store, command.getMessageAck(), pos);
610 } else {
611 if (store.replayRemoveMessage(context, command.getMessageAck())) {
612 redoCounter++;
613 }
614 }
615 }
616 break;
617 case JournalTopicAck.DATA_STRUCTURE_TYPE: {
618 JournalTopicAck command = (JournalTopicAck)c;
619 AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
620 if (command.getTransactionId() != null) {
621 transactionStore.acknowledge(store, command, pos);
622 } else {
623 if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
624 redoCounter++;
625 }
626 }
627 }
628 break;
629 case JournalTransaction.DATA_STRUCTURE_TYPE: {
630 JournalTransaction command = (JournalTransaction)c;
631 try {
632 // Try to replay the packet.
633 switch (command.getType()) {
634 case JournalTransaction.XA_PREPARE:
635 transactionStore.replayPrepare(command.getTransactionId());
636 break;
637 case JournalTransaction.XA_COMMIT:
638 case JournalTransaction.LOCAL_COMMIT:
639 AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
640 if (tx == null) {
641 break; // We may be trying to replay a commit
642 }
643 // that
644 // was already committed.
645 // Replay the committed operations.
646 tx.getOperations();
647 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
648 AMQTxOperation op = (AMQTxOperation)iter.next();
649 if (op.replay(this, context)) {
650 redoCounter++;
651 }
652 }
653 break;
654 case JournalTransaction.LOCAL_ROLLBACK:
655 case JournalTransaction.XA_ROLLBACK:
656 transactionStore.replayRollback(command.getTransactionId());
657 break;
658 default:
659 throw new IOException("Invalid journal command type: " + command.getType());
660 }
661 } catch (IOException e) {
662 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
663 }
664 }
665 break;
666 case JournalTrace.DATA_STRUCTURE_TYPE:
667 JournalTrace trace = (JournalTrace)c;
668 LOG.debug("TRACE Entry: " + trace.getMessage());
669 break;
670 default:
671 LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
672 }
673 }
674 }
675 Location location = writeTraceMessage("RECOVERED " + new Date(), true);
676 asyncDataManager.setMark(location, true);
677 long end = System.currentTimeMillis();
678 LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
679 }
680
681 private IOException createReadException(Location location, Exception e) {
682 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
683 }
684
685 protected IOException createWriteException(DataStructure packet, Exception e) {
686 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
687 }
688
689 protected IOException createWriteException(String command, Exception e) {
690 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
691 }
692
693 protected IOException createRecoveryFailedException(Exception e) {
694 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
695 }
696
697 /**
698 * @param command
699 * @param syncHint
700 * @return
701 * @throws IOException
702 */
703 public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
704 return writeCommand(command, syncHint,false);
705 }
706
707 public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
708 try {
709 return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
710 } catch (IOException ioe) {
711 LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
712 brokerService.handleIOException(ioe);
713 throw ioe;
714 }
715 }
716
717 private Location writeTraceMessage(String message, boolean sync) throws IOException {
718 JournalTrace trace = new JournalTrace();
719 trace.setMessage(message);
720 return writeCommand(trace, sync);
721 }
722
723 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
724 newPercentUsage = (newPercentUsage / 10) * 10;
725 oldPercentUsage = (oldPercentUsage / 10) * 10;
726 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
727 checkpoint(false);
728 }
729 }
730
731 public AMQTransactionStore getTransactionStore() {
732 return transactionStore;
733 }
734
735 public synchronized void deleteAllMessages() throws IOException {
736 deleteAllMessages = true;
737 }
738
739 @Override
740 public String toString() {
741 return "AMQPersistenceAdapter(" + directory + ")";
742 }
743
744 // /////////////////////////////////////////////////////////////////
745 // Subclass overridables
746 // /////////////////////////////////////////////////////////////////
747 protected AsyncDataManager createAsyncDataManager() {
748 AsyncDataManager manager = new AsyncDataManager(storeSize);
749 manager.setDirectory(new File(directory, "journal"));
750 manager.setDirectoryArchive(getDirectoryArchive());
751 manager.setArchiveDataLogs(isArchiveDataLogs());
752 manager.setMaxFileLength(maxFileLength);
753 manager.setUseNio(useNio);
754 return manager;
755 }
756
757 protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
758 KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
759 adaptor.setPersistentIndex(isPersistentIndex());
760 adaptor.setIndexBinSize(getIndexBinSize());
761 adaptor.setIndexKeySize(getIndexKeySize());
762 adaptor.setIndexPageSize(getIndexPageSize());
763 adaptor.setIndexMaxBinSize(getIndexMaxBinSize());
764 adaptor.setIndexLoadFactor(getIndexLoadFactor());
765 return adaptor;
766 }
767
768 // /////////////////////////////////////////////////////////////////
769 // Property Accessors
770 // /////////////////////////////////////////////////////////////////
771 public AsyncDataManager getAsyncDataManager() {
772 return asyncDataManager;
773 }
774
775 public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
776 this.asyncDataManager = asyncDataManager;
777 }
778
779 public ReferenceStoreAdapter getReferenceStoreAdapter() {
780 return referenceStoreAdapter;
781 }
782
783 public TaskRunnerFactory getTaskRunnerFactory() {
784 return taskRunnerFactory;
785 }
786
787 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
788 this.taskRunnerFactory = taskRunnerFactory;
789 }
790
791 /**
792 * @return Returns the wireFormat.
793 */
794 public WireFormat getWireFormat() {
795 return wireFormat;
796 }
797
798 public void setWireFormat(WireFormat wireFormat) {
799 this.wireFormat = wireFormat;
800 }
801
802 public SystemUsage getUsageManager() {
803 return usageManager;
804 }
805
806 public void setUsageManager(SystemUsage usageManager) {
807 this.usageManager = usageManager;
808 }
809
810 public int getMaxCheckpointMessageAddSize() {
811 return maxCheckpointMessageAddSize;
812 }
813
814 /**
815 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
816 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
817 */
818 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
819 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
820 }
821
822
823 public synchronized File getDirectory() {
824 return directory;
825 }
826
827 public synchronized void setDirectory(File directory) {
828 this.directory = directory;
829 }
830
831 public boolean isSyncOnWrite() {
832 return this.syncOnWrite;
833 }
834
835 public void setSyncOnWrite(boolean syncOnWrite) {
836 this.syncOnWrite = syncOnWrite;
837 }
838
839 /**
840 * @param referenceStoreAdapter the referenceStoreAdapter to set
841 */
842 public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
843 this.referenceStoreAdapter = referenceStoreAdapter;
844 }
845
846 public long size(){
847 return storeSize.get();
848 }
849
850 public boolean isUseNio() {
851 return useNio;
852 }
853
854 public void setUseNio(boolean useNio) {
855 this.useNio = useNio;
856 }
857
858 public int getMaxFileLength() {
859 return maxFileLength;
860 }
861
862 /**
863 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
864 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
865 */
866 public void setMaxFileLength(int maxFileLength) {
867 this.maxFileLength = maxFileLength;
868 }
869
870 public long getCleanupInterval() {
871 return cleanupInterval;
872 }
873
874 public void setCleanupInterval(long cleanupInterval) {
875 this.cleanupInterval = cleanupInterval;
876 }
877
878 public long getCheckpointInterval() {
879 return checkpointInterval;
880 }
881
882 public void setCheckpointInterval(long checkpointInterval) {
883 this.checkpointInterval = checkpointInterval;
884 }
885
886 public int getIndexBinSize() {
887 return indexBinSize;
888 }
889
890 public void setIndexBinSize(int indexBinSize) {
891 this.indexBinSize = indexBinSize;
892 }
893
894 public int getIndexKeySize() {
895 return indexKeySize;
896 }
897
898 public void setIndexKeySize(int indexKeySize) {
899 this.indexKeySize = indexKeySize;
900 }
901
902 public int getIndexPageSize() {
903 return indexPageSize;
904 }
905
906 public int getIndexMaxBinSize() {
907 return indexMaxBinSize;
908 }
909
910 public void setIndexMaxBinSize(int maxBinSize) {
911 this.indexMaxBinSize = maxBinSize;
912 }
913
914 /**
915 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
916 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
917 */
918 public void setIndexPageSize(int indexPageSize) {
919 this.indexPageSize = indexPageSize;
920 }
921
922 public void setIndexLoadFactor(int factor){
923 this.indexLoadFactor=factor;
924 }
925
926 public int getIndexLoadFactor(){
927 return this.indexLoadFactor;
928 }
929
930 public int getMaxReferenceFileLength() {
931 return maxReferenceFileLength;
932 }
933
934 /**
935 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
936 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
937 */
938 public void setMaxReferenceFileLength(int maxReferenceFileLength) {
939 this.maxReferenceFileLength = maxReferenceFileLength;
940 }
941
942 public File getDirectoryArchive() {
943 return directoryArchive;
944 }
945
946 public void setDirectoryArchive(File directoryArchive) {
947 this.directoryArchive = directoryArchive;
948 }
949
950 public boolean isArchiveDataLogs() {
951 return archiveDataLogs;
952 }
953
954 public void setArchiveDataLogs(boolean archiveDataLogs) {
955 this.archiveDataLogs = archiveDataLogs;
956 }
957
958 public boolean isDisableLocking() {
959 return disableLocking;
960 }
961
962 public void setDisableLocking(boolean disableLocking) {
963 this.disableLocking = disableLocking;
964 }
965
966 /**
967 * @return the recoverReferenceStore
968 */
969 public boolean isRecoverReferenceStore() {
970 return recoverReferenceStore;
971 }
972
973 /**
974 * @param recoverReferenceStore the recoverReferenceStore to set
975 */
976 public void setRecoverReferenceStore(boolean recoverReferenceStore) {
977 this.recoverReferenceStore = recoverReferenceStore;
978 }
979
980 /**
981 * @return the forceRecoverReferenceStore
982 */
983 public boolean isForceRecoverReferenceStore() {
984 return forceRecoverReferenceStore;
985 }
986
987 /**
988 * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set
989 */
990 public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
991 this.forceRecoverReferenceStore = forceRecoverReferenceStore;
992 }
993
994 public boolean isUseDedicatedTaskRunner() {
995 return useDedicatedTaskRunner;
996 }
997
998 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
999 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1000 }
1001
1002 /**
1003 * @return the journalThreadPriority
1004 */
1005 public int getJournalThreadPriority() {
1006 return this.journalThreadPriority;
1007 }
1008
1009 /**
1010 * @param journalThreadPriority the journalThreadPriority to set
1011 */
1012 public void setJournalThreadPriority(int journalThreadPriority) {
1013 this.journalThreadPriority = journalThreadPriority;
1014 }
1015
1016
1017 protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
1018 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1019 if (map == null) {
1020 map = new ConcurrentHashMap<Integer, AtomicInteger>();
1021 dataFilesInProgress.put(store, map);
1022 }
1023 AtomicInteger count = map.get(dataFileId);
1024 if (count == null) {
1025 count = new AtomicInteger(0);
1026 map.put(dataFileId, count);
1027 }
1028 count.incrementAndGet();
1029 }
1030
1031 protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
1032 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1033 if (map != null) {
1034 AtomicInteger count = map.get(dataFileId);
1035 if (count != null) {
1036 int newCount = count.decrementAndGet();
1037 if (newCount <=0) {
1038 map.remove(dataFileId);
1039 }
1040 }
1041 if (map.isEmpty()) {
1042 dataFilesInProgress.remove(store);
1043 }
1044 }
1045 }
1046
1047
1048 protected void lock() throws Exception {
1049 lockLogged = false;
1050 lockAquired = false;
1051 do {
1052 if (doLock()) {
1053 lockAquired = true;
1054 } else {
1055 if (!lockLogged) {
1056 LOG.warn("Waiting to Lock the Store " + getDirectory());
1057 lockLogged = true;
1058 }
1059 Thread.sleep(1000);
1060 }
1061
1062 } while (!lockAquired && !disableLocking);
1063 }
1064
1065 private synchronized void unlock() throws IOException {
1066 if (!disableLocking && (null != lock)) {
1067 //clear property doesn't work on some platforms
1068 System.getProperties().remove(getPropertyKey());
1069 System.clearProperty(getPropertyKey());
1070 assert(System.getProperty(getPropertyKey())==null);
1071 if (lock.isValid()) {
1072 lock.release();
1073 lock.channel().close();
1074
1075 }
1076 lock = null;
1077 }
1078 }
1079
1080
1081 protected boolean doLock() throws IOException {
1082 boolean result = true;
1083 if (!disableLocking && directory != null && lock == null) {
1084 String key = getPropertyKey();
1085 String property = System.getProperty(key);
1086 if (null == property) {
1087 if (!BROKEN_FILE_LOCK) {
1088 lock = lockFile.getChannel().tryLock(0, Math.max(1, lockFile.getChannel().size()), false);
1089 if (lock == null) {
1090 result = false;
1091 } else {
1092 System.setProperty(key, new Date().toString());
1093 }
1094 }
1095 } else { // already locked
1096 result = false;
1097 }
1098 }
1099 return result;
1100 }
1101
1102 private String getPropertyKey() throws IOException {
1103 return getClass().getName() + ".lock." + directory.getCanonicalPath();
1104 }
1105
1106 static {
1107 BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
1108 + ".FileLockBroken",
1109 "false"));
1110 DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
1111 + ".DisableLocking",
1112 "false"));
1113 }
1114
1115
1116 public long getLastProducerSequenceId(ProducerId id) {
1117 // reference store send has adequate duplicate suppression
1118 return -1;
1119 }
1120 }