001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.journal;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.HashSet;
023 import java.util.Iterator;
024 import java.util.Set;
025 import java.util.concurrent.Callable;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.CountDownLatch;
028 import java.util.concurrent.FutureTask;
029 import java.util.concurrent.LinkedBlockingQueue;
030 import java.util.concurrent.ThreadFactory;
031 import java.util.concurrent.ThreadPoolExecutor;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicBoolean;
034 import org.apache.activeio.journal.InvalidRecordLocationException;
035 import org.apache.activeio.journal.Journal;
036 import org.apache.activeio.journal.JournalEventListener;
037 import org.apache.activeio.journal.RecordLocation;
038 import org.apache.activeio.packet.ByteArrayPacket;
039 import org.apache.activeio.packet.Packet;
040 import org.apache.activemq.broker.BrokerService;
041 import org.apache.activemq.broker.BrokerServiceAware;
042 import org.apache.activemq.broker.ConnectionContext;
043 import org.apache.activemq.command.ActiveMQDestination;
044 import org.apache.activemq.command.ActiveMQQueue;
045 import org.apache.activemq.command.ActiveMQTopic;
046 import org.apache.activemq.command.DataStructure;
047 import org.apache.activemq.command.JournalQueueAck;
048 import org.apache.activemq.command.JournalTopicAck;
049 import org.apache.activemq.command.JournalTrace;
050 import org.apache.activemq.command.JournalTransaction;
051 import org.apache.activemq.command.Message;
052 import org.apache.activemq.command.MessageAck;
053 import org.apache.activemq.command.ProducerId;
054 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
055 import org.apache.activemq.openwire.OpenWireFormat;
056 import org.apache.activemq.store.MessageStore;
057 import org.apache.activemq.store.PersistenceAdapter;
058 import org.apache.activemq.store.TopicMessageStore;
059 import org.apache.activemq.store.TransactionStore;
060 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
061 import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
062 import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
063 import org.apache.activemq.thread.Scheduler;
064 import org.apache.activemq.thread.Task;
065 import org.apache.activemq.thread.TaskRunner;
066 import org.apache.activemq.thread.TaskRunnerFactory;
067 import org.apache.activemq.usage.SystemUsage;
068 import org.apache.activemq.usage.Usage;
069 import org.apache.activemq.usage.UsageListener;
070 import org.apache.activemq.util.ByteSequence;
071 import org.apache.activemq.util.IOExceptionSupport;
072 import org.apache.activemq.wireformat.WireFormat;
073 import org.slf4j.Logger;
074 import org.slf4j.LoggerFactory;
075
076 /**
077 * An implementation of {@link PersistenceAdapter} designed for use with a
078 * {@link Journal} and then check pointing asynchronously on a timeout with some
079 * other long term persistent storage.
080 *
081 * @org.apache.xbean.XBean
082 *
083 */
084 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
085
086 private BrokerService brokerService;
087
088 protected Scheduler scheduler;
089 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
090
091 private Journal journal;
092 private PersistenceAdapter longTermPersistence;
093
094 private final WireFormat wireFormat = new OpenWireFormat();
095
096 private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
097 private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
098
099 private SystemUsage usageManager;
100 private final long checkpointInterval = 1000 * 60 * 5;
101 private long lastCheckpointRequest = System.currentTimeMillis();
102 private long lastCleanup = System.currentTimeMillis();
103 private int maxCheckpointWorkers = 10;
104 private int maxCheckpointMessageAddSize = 1024 * 1024;
105
106 private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
107 private ThreadPoolExecutor checkpointExecutor;
108
109 private TaskRunner checkpointTask;
110 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
111 private boolean fullCheckPoint;
112
113 private final AtomicBoolean started = new AtomicBoolean(false);
114
115 private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
116
117 private TaskRunnerFactory taskRunnerFactory;
118 private File directory;
119
120 public JournalPersistenceAdapter() {
121 }
122
123 public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
124 setJournal(journal);
125 setTaskRunnerFactory(taskRunnerFactory);
126 setPersistenceAdapter(longTermPersistence);
127 }
128
129 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
130 this.taskRunnerFactory = taskRunnerFactory;
131 }
132
133 public void setJournal(Journal journal) {
134 this.journal = journal;
135 journal.setJournalEventListener(this);
136 }
137
138 public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
139 this.longTermPersistence = longTermPersistence;
140 }
141
142 final Runnable createPeriodicCheckpointTask() {
143 return new Runnable() {
144 public void run() {
145 long lastTime = 0;
146 synchronized (this) {
147 lastTime = lastCheckpointRequest;
148 }
149 if (System.currentTimeMillis() > lastTime + checkpointInterval) {
150 checkpoint(false, true);
151 }
152 }
153 };
154 }
155
156 /**
157 * @param usageManager The UsageManager that is controlling the
158 * destination's memory usage.
159 */
160 public void setUsageManager(SystemUsage usageManager) {
161 this.usageManager = usageManager;
162 longTermPersistence.setUsageManager(usageManager);
163 }
164
165 public Set<ActiveMQDestination> getDestinations() {
166 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
167 destinations.addAll(queues.keySet());
168 destinations.addAll(topics.keySet());
169 return destinations;
170 }
171
172 private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
173 if (destination.isQueue()) {
174 return createQueueMessageStore((ActiveMQQueue)destination);
175 } else {
176 return createTopicMessageStore((ActiveMQTopic)destination);
177 }
178 }
179
180 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
181 JournalMessageStore store = queues.get(destination);
182 if (store == null) {
183 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
184 store = new JournalMessageStore(this, checkpointStore, destination);
185 queues.put(destination, store);
186 }
187 return store;
188 }
189
190 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
191 JournalTopicMessageStore store = topics.get(destinationName);
192 if (store == null) {
193 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
194 store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
195 topics.put(destinationName, store);
196 }
197 return store;
198 }
199
200 /**
201 * Cleanup method to remove any state associated with the given destination
202 *
203 * @param destination Destination to forget
204 */
205 public void removeQueueMessageStore(ActiveMQQueue destination) {
206 queues.remove(destination);
207 }
208
209 /**
210 * Cleanup method to remove any state associated with the given destination
211 *
212 * @param destination Destination to forget
213 */
214 public void removeTopicMessageStore(ActiveMQTopic destination) {
215 topics.remove(destination);
216 }
217
218 public TransactionStore createTransactionStore() throws IOException {
219 return transactionStore;
220 }
221
222 public long getLastMessageBrokerSequenceId() throws IOException {
223 return longTermPersistence.getLastMessageBrokerSequenceId();
224 }
225
226 public void beginTransaction(ConnectionContext context) throws IOException {
227 longTermPersistence.beginTransaction(context);
228 }
229
230 public void commitTransaction(ConnectionContext context) throws IOException {
231 longTermPersistence.commitTransaction(context);
232 }
233
234 public void rollbackTransaction(ConnectionContext context) throws IOException {
235 longTermPersistence.rollbackTransaction(context);
236 }
237
238 public synchronized void start() throws Exception {
239 if (!started.compareAndSet(false, true)) {
240 return;
241 }
242
243 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
244 public boolean iterate() {
245 return doCheckpoint();
246 }
247 }, "ActiveMQ Journal Checkpoint Worker");
248
249 checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
250 public Thread newThread(Runnable runable) {
251 Thread t = new Thread(runable, "Journal checkpoint worker");
252 t.setPriority(7);
253 return t;
254 }
255 });
256 // checkpointExecutor.allowCoreThreadTimeOut(true);
257
258 this.usageManager.getMemoryUsage().addUsageListener(this);
259
260 if (longTermPersistence instanceof JDBCPersistenceAdapter) {
261 // Disabled periodic clean up as it deadlocks with the checkpoint
262 // operations.
263 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
264 }
265
266 longTermPersistence.start();
267 createTransactionStore();
268 recover();
269
270 // Do a checkpoint periodically.
271 this.scheduler = new Scheduler("Journal Scheduler");
272 this.scheduler.start();
273 this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
274
275 }
276
277 public void stop() throws Exception {
278
279 this.usageManager.getMemoryUsage().removeUsageListener(this);
280 if (!started.compareAndSet(true, false)) {
281 return;
282 }
283
284 this.scheduler.cancel(periodicCheckpointTask);
285 this.scheduler.stop();
286
287 // Take one final checkpoint and stop checkpoint processing.
288 checkpoint(true, true);
289 checkpointTask.shutdown();
290 checkpointExecutor.shutdown();
291
292 queues.clear();
293 topics.clear();
294
295 IOException firstException = null;
296 try {
297 journal.close();
298 } catch (Exception e) {
299 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
300 }
301 longTermPersistence.stop();
302
303 if (firstException != null) {
304 throw firstException;
305 }
306 }
307
308 // Properties
309 // -------------------------------------------------------------------------
310 public PersistenceAdapter getLongTermPersistence() {
311 return longTermPersistence;
312 }
313
314 /**
315 * @return Returns the wireFormat.
316 */
317 public WireFormat getWireFormat() {
318 return wireFormat;
319 }
320
321 // Implementation methods
322 // -------------------------------------------------------------------------
323
324 /**
325 * The Journal give us a call back so that we can move old data out of the
326 * journal. Taking a checkpoint does this for us.
327 *
328 * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
329 */
330 public void overflowNotification(RecordLocation safeLocation) {
331 checkpoint(false, true);
332 }
333
334 /**
335 * When we checkpoint we move all the journalled data to long term storage.
336 *
337 */
338 public void checkpoint(boolean sync, boolean fullCheckpoint) {
339 try {
340 if (journal == null) {
341 throw new IllegalStateException("Journal is closed.");
342 }
343
344 long now = System.currentTimeMillis();
345 CountDownLatch latch = null;
346 synchronized (this) {
347 latch = nextCheckpointCountDownLatch;
348 lastCheckpointRequest = now;
349 if (fullCheckpoint) {
350 this.fullCheckPoint = true;
351 }
352 }
353
354 checkpointTask.wakeup();
355
356 if (sync) {
357 LOG.debug("Waking for checkpoint to complete.");
358 latch.await();
359 }
360 } catch (InterruptedException e) {
361 Thread.currentThread().interrupt();
362 LOG.warn("Request to start checkpoint failed: " + e, e);
363 }
364 }
365
366 public void checkpoint(boolean sync) {
367 checkpoint(sync, sync);
368 }
369
370 /**
371 * This does the actual checkpoint.
372 *
373 * @return
374 */
375 public boolean doCheckpoint() {
376 CountDownLatch latch = null;
377 boolean fullCheckpoint;
378 synchronized (this) {
379 latch = nextCheckpointCountDownLatch;
380 nextCheckpointCountDownLatch = new CountDownLatch(1);
381 fullCheckpoint = this.fullCheckPoint;
382 this.fullCheckPoint = false;
383 }
384 try {
385
386 LOG.debug("Checkpoint started.");
387 RecordLocation newMark = null;
388
389 ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
390
391 //
392 // We do many partial checkpoints (fullCheckpoint==false) to move
393 // topic messages
394 // to long term store as soon as possible.
395 //
396 // We want to avoid doing that for queue messages since removes the
397 // come in the same
398 // checkpoint cycle will nullify the previous message add.
399 // Therefore, we only
400 // checkpoint queues on the fullCheckpoint cycles.
401 //
402 if (fullCheckpoint) {
403 Iterator<JournalMessageStore> iterator = queues.values().iterator();
404 while (iterator.hasNext()) {
405 try {
406 final JournalMessageStore ms = iterator.next();
407 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
408 public RecordLocation call() throws Exception {
409 return ms.checkpoint();
410 }
411 });
412 futureTasks.add(task);
413 checkpointExecutor.execute(task);
414 } catch (Exception e) {
415 LOG.error("Failed to checkpoint a message store: " + e, e);
416 }
417 }
418 }
419
420 Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
421 while (iterator.hasNext()) {
422 try {
423 final JournalTopicMessageStore ms = iterator.next();
424 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
425 public RecordLocation call() throws Exception {
426 return ms.checkpoint();
427 }
428 });
429 futureTasks.add(task);
430 checkpointExecutor.execute(task);
431 } catch (Exception e) {
432 LOG.error("Failed to checkpoint a message store: " + e, e);
433 }
434 }
435
436 try {
437 for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
438 FutureTask<RecordLocation> ft = iter.next();
439 RecordLocation mark = ft.get();
440 // We only set a newMark on full checkpoints.
441 if (fullCheckpoint) {
442 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
443 newMark = mark;
444 }
445 }
446 }
447 } catch (Throwable e) {
448 LOG.error("Failed to checkpoint a message store: " + e, e);
449 }
450
451 if (fullCheckpoint) {
452 try {
453 if (newMark != null) {
454 LOG.debug("Marking journal at: " + newMark);
455 journal.setMark(newMark, true);
456 }
457 } catch (Exception e) {
458 LOG.error("Failed to mark the Journal: " + e, e);
459 }
460
461 if (longTermPersistence instanceof JDBCPersistenceAdapter) {
462 // We may be check pointing more often than the
463 // checkpointInterval if under high use
464 // But we don't want to clean up the db that often.
465 long now = System.currentTimeMillis();
466 if (now > lastCleanup + checkpointInterval) {
467 lastCleanup = now;
468 ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
469 }
470 }
471 }
472
473 LOG.debug("Checkpoint done.");
474 } finally {
475 latch.countDown();
476 }
477 synchronized (this) {
478 return this.fullCheckPoint;
479 }
480
481 }
482
483 /**
484 * @param location
485 * @return
486 * @throws IOException
487 */
488 public DataStructure readCommand(RecordLocation location) throws IOException {
489 try {
490 Packet packet = journal.read(location);
491 return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
492 } catch (InvalidRecordLocationException e) {
493 throw createReadException(location, e);
494 } catch (IOException e) {
495 throw createReadException(location, e);
496 }
497 }
498
499 /**
500 * Move all the messages that were in the journal into long term storage. We
501 * just replay and do a checkpoint.
502 *
503 * @throws IOException
504 * @throws IOException
505 * @throws InvalidRecordLocationException
506 * @throws IllegalStateException
507 */
508 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
509
510 RecordLocation pos = null;
511 int transactionCounter = 0;
512
513 LOG.info("Journal Recovery Started from: " + journal);
514 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
515
516 // While we have records in the journal.
517 while ((pos = journal.getNextRecordLocation(pos)) != null) {
518 Packet data = journal.read(pos);
519 DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
520
521 if (c instanceof Message) {
522 Message message = (Message)c;
523 JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
524 if (message.isInTransaction()) {
525 transactionStore.addMessage(store, message, pos);
526 } else {
527 store.replayAddMessage(context, message);
528 transactionCounter++;
529 }
530 } else {
531 switch (c.getDataStructureType()) {
532 case JournalQueueAck.DATA_STRUCTURE_TYPE: {
533 JournalQueueAck command = (JournalQueueAck)c;
534 JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
535 if (command.getMessageAck().isInTransaction()) {
536 transactionStore.removeMessage(store, command.getMessageAck(), pos);
537 } else {
538 store.replayRemoveMessage(context, command.getMessageAck());
539 transactionCounter++;
540 }
541 }
542 break;
543 case JournalTopicAck.DATA_STRUCTURE_TYPE: {
544 JournalTopicAck command = (JournalTopicAck)c;
545 JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
546 if (command.getTransactionId() != null) {
547 transactionStore.acknowledge(store, command, pos);
548 } else {
549 store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
550 transactionCounter++;
551 }
552 }
553 break;
554 case JournalTransaction.DATA_STRUCTURE_TYPE: {
555 JournalTransaction command = (JournalTransaction)c;
556 try {
557 // Try to replay the packet.
558 switch (command.getType()) {
559 case JournalTransaction.XA_PREPARE:
560 transactionStore.replayPrepare(command.getTransactionId());
561 break;
562 case JournalTransaction.XA_COMMIT:
563 case JournalTransaction.LOCAL_COMMIT:
564 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
565 if (tx == null) {
566 break; // We may be trying to replay a commit
567 }
568 // that
569 // was already committed.
570
571 // Replay the committed operations.
572 tx.getOperations();
573 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
574 TxOperation op = (TxOperation)iter.next();
575 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
576 op.store.replayAddMessage(context, (Message)op.data);
577 }
578 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
579 op.store.replayRemoveMessage(context, (MessageAck)op.data);
580 }
581 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
582 JournalTopicAck ack = (JournalTopicAck)op.data;
583 ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
584 }
585 }
586 transactionCounter++;
587 break;
588 case JournalTransaction.LOCAL_ROLLBACK:
589 case JournalTransaction.XA_ROLLBACK:
590 transactionStore.replayRollback(command.getTransactionId());
591 break;
592 default:
593 throw new IOException("Invalid journal command type: " + command.getType());
594 }
595 } catch (IOException e) {
596 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
597 }
598 }
599 break;
600 case JournalTrace.DATA_STRUCTURE_TYPE:
601 JournalTrace trace = (JournalTrace)c;
602 LOG.debug("TRACE Entry: " + trace.getMessage());
603 break;
604 default:
605 LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
606 }
607 }
608 }
609
610 RecordLocation location = writeTraceMessage("RECOVERED", true);
611 journal.setMark(location, true);
612
613 LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
614 }
615
616 private IOException createReadException(RecordLocation location, Exception e) {
617 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
618 }
619
620 protected IOException createWriteException(DataStructure packet, Exception e) {
621 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
622 }
623
624 protected IOException createWriteException(String command, Exception e) {
625 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
626 }
627
628 protected IOException createRecoveryFailedException(Exception e) {
629 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
630 }
631
632 /**
633 * @param command
634 * @param sync
635 * @return
636 * @throws IOException
637 */
638 public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
639 if (started.get()) {
640 try {
641 return journal.write(toPacket(wireFormat.marshal(command)), sync);
642 } catch (IOException ioe) {
643 LOG.error("Cannot write to the journal", ioe);
644 brokerService.handleIOException(ioe);
645 throw ioe;
646 }
647 }
648 throw new IOException("closed");
649 }
650
651 private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
652 JournalTrace trace = new JournalTrace();
653 trace.setMessage(message);
654 return writeCommand(trace, sync);
655 }
656
657 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
658 newPercentUsage = (newPercentUsage / 10) * 10;
659 oldPercentUsage = (oldPercentUsage / 10) * 10;
660 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
661 boolean sync = newPercentUsage >= 90;
662 checkpoint(sync, true);
663 }
664 }
665
666 public JournalTransactionStore getTransactionStore() {
667 return transactionStore;
668 }
669
670 public void deleteAllMessages() throws IOException {
671 try {
672 JournalTrace trace = new JournalTrace();
673 trace.setMessage("DELETED");
674 RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
675 journal.setMark(location, true);
676 LOG.info("Journal deleted: ");
677 } catch (IOException e) {
678 throw e;
679 } catch (Throwable e) {
680 throw IOExceptionSupport.create(e);
681 }
682 longTermPersistence.deleteAllMessages();
683 }
684
685 public SystemUsage getUsageManager() {
686 return usageManager;
687 }
688
689 public int getMaxCheckpointMessageAddSize() {
690 return maxCheckpointMessageAddSize;
691 }
692
693 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
694 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
695 }
696
697 public int getMaxCheckpointWorkers() {
698 return maxCheckpointWorkers;
699 }
700
701 public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
702 this.maxCheckpointWorkers = maxCheckpointWorkers;
703 }
704
705 public boolean isUseExternalMessageReferences() {
706 return false;
707 }
708
709 public void setUseExternalMessageReferences(boolean enable) {
710 if (enable) {
711 throw new IllegalArgumentException("The journal does not support message references.");
712 }
713 }
714
715 public Packet toPacket(ByteSequence sequence) {
716 return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
717 }
718
719 public ByteSequence toByteSequence(Packet packet) {
720 org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
721 return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
722 }
723
724 public void setBrokerName(String brokerName) {
725 longTermPersistence.setBrokerName(brokerName);
726 }
727
728 @Override
729 public String toString() {
730 return "JournalPersistenceAdapator(" + longTermPersistence + ")";
731 }
732
733 public void setDirectory(File dir) {
734 this.directory=dir;
735 }
736
737 public File getDirectory(){
738 return directory;
739 }
740
741 public long size(){
742 return 0;
743 }
744
745 public void setBrokerService(BrokerService brokerService) {
746 this.brokerService = brokerService;
747 PersistenceAdapter pa = getLongTermPersistence();
748 if( pa instanceof BrokerServiceAware ) {
749 ((BrokerServiceAware)pa).setBrokerService(brokerService);
750 }
751 }
752
753 public long getLastProducerSequenceId(ProducerId id) {
754 return -1;
755 }
756
757 }