001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.DataInputStream;
020 import java.io.IOException;
021 import java.io.InterruptedIOException;
022 import java.util.ArrayList;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.Iterator;
026 import java.util.LinkedList;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.Map.Entry;
031 import java.util.concurrent.*;
032 import java.util.concurrent.atomic.AtomicBoolean;
033 import java.util.concurrent.atomic.AtomicInteger;
034 import org.apache.activemq.broker.ConnectionContext;
035 import org.apache.activemq.broker.region.Destination;
036 import org.apache.activemq.broker.region.RegionBroker;
037 import org.apache.activemq.command.ActiveMQDestination;
038 import org.apache.activemq.command.ActiveMQQueue;
039 import org.apache.activemq.command.ActiveMQTempQueue;
040 import org.apache.activemq.command.ActiveMQTempTopic;
041 import org.apache.activemq.command.ActiveMQTopic;
042 import org.apache.activemq.command.Message;
043 import org.apache.activemq.command.MessageAck;
044 import org.apache.activemq.command.MessageId;
045 import org.apache.activemq.command.ProducerId;
046 import org.apache.activemq.command.SubscriptionInfo;
047 import org.apache.activemq.command.TransactionId;
048 import org.apache.activemq.openwire.OpenWireFormat;
049 import org.apache.activemq.protobuf.Buffer;
050 import org.apache.activemq.store.AbstractMessageStore;
051 import org.apache.activemq.store.MessageRecoveryListener;
052 import org.apache.activemq.store.MessageStore;
053 import org.apache.activemq.store.PersistenceAdapter;
054 import org.apache.activemq.store.TopicMessageStore;
055 import org.apache.activemq.store.TransactionStore;
056 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
057 import org.apache.activemq.store.kahadb.data.KahaDestination;
058 import org.apache.activemq.store.kahadb.data.KahaLocation;
059 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
060 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
061 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
062 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
063 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
064 import org.apache.activemq.usage.MemoryUsage;
065 import org.apache.activemq.usage.SystemUsage;
066 import org.apache.activemq.util.ServiceStopper;
067 import org.apache.activemq.wireformat.WireFormat;
068 import org.apache.kahadb.util.ByteSequence;
069 import org.slf4j.Logger;
070 import org.slf4j.LoggerFactory;
071 import org.apache.kahadb.journal.Location;
072 import org.apache.kahadb.page.Transaction;
073
074 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
075 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
076 private static final int MAX_ASYNC_JOBS = 10000;
077
078 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
079 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
080 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
081 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
082 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
083 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
084
085 protected ExecutorService queueExecutor;
086 protected ExecutorService topicExecutor;
087 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
088 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
089 final WireFormat wireFormat = new OpenWireFormat();
090 private SystemUsage usageManager;
091 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
092 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
093 Semaphore globalQueueSemaphore;
094 Semaphore globalTopicSemaphore;
095 private boolean concurrentStoreAndDispatchQueues = true;
096 // when true, message order may be compromised when cache is exhausted if store is out
097 // or order w.r.t cache
098 private boolean concurrentStoreAndDispatchTopics = false;
099 private boolean concurrentStoreAndDispatchTransactions = false;
100 private int maxAsyncJobs = MAX_ASYNC_JOBS;
101 private final KahaDBTransactionStore transactionStore;
102 private TransactionIdTransformer transactionIdTransformer;
103
104 public KahaDBStore() {
105 this.transactionStore = new KahaDBTransactionStore(this);
106 this.transactionIdTransformer = new TransactionIdTransformer() {
107 @Override
108 public KahaTransactionInfo transform(TransactionId txid) {
109 return TransactionIdConversion.convert(txid);
110 }
111 };
112 }
113
114 @Override
115 public String toString() {
116 return "KahaDB:[" + directory.getAbsolutePath() + "]";
117 }
118
119 public void setBrokerName(String brokerName) {
120 }
121
122 public void setUsageManager(SystemUsage usageManager) {
123 this.usageManager = usageManager;
124 }
125
126 public SystemUsage getUsageManager() {
127 return this.usageManager;
128 }
129
130 /**
131 * @return the concurrentStoreAndDispatch
132 */
133 public boolean isConcurrentStoreAndDispatchQueues() {
134 return this.concurrentStoreAndDispatchQueues;
135 }
136
137 /**
138 * @param concurrentStoreAndDispatch
139 * the concurrentStoreAndDispatch to set
140 */
141 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
142 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
143 }
144
145 /**
146 * @return the concurrentStoreAndDispatch
147 */
148 public boolean isConcurrentStoreAndDispatchTopics() {
149 return this.concurrentStoreAndDispatchTopics;
150 }
151
152 /**
153 * @param concurrentStoreAndDispatch
154 * the concurrentStoreAndDispatch to set
155 */
156 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
157 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
158 }
159
160 public boolean isConcurrentStoreAndDispatchTransactions() {
161 return this.concurrentStoreAndDispatchTransactions;
162 }
163
164 /**
165 * @return the maxAsyncJobs
166 */
167 public int getMaxAsyncJobs() {
168 return this.maxAsyncJobs;
169 }
170 /**
171 * @param maxAsyncJobs
172 * the maxAsyncJobs to set
173 */
174 public void setMaxAsyncJobs(int maxAsyncJobs) {
175 this.maxAsyncJobs = maxAsyncJobs;
176 }
177
178 @Override
179 public void doStart() throws Exception {
180 super.doStart();
181 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
182 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
183 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
184 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
185 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
186 asyncQueueJobQueue, new ThreadFactory() {
187 public Thread newThread(Runnable runnable) {
188 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
189 thread.setDaemon(true);
190 return thread;
191 }
192 });
193 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
194 asyncTopicJobQueue, new ThreadFactory() {
195 public Thread newThread(Runnable runnable) {
196 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
197 thread.setDaemon(true);
198 return thread;
199 }
200 });
201 }
202
203 @Override
204 public void doStop(ServiceStopper stopper) throws Exception {
205 // drain down async jobs
206 LOG.info("Stopping async queue tasks");
207 if (this.globalQueueSemaphore != null) {
208 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
209 }
210 synchronized (this.asyncQueueMaps) {
211 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
212 synchronized (m) {
213 for (StoreTask task : m.values()) {
214 task.cancel();
215 }
216 }
217 }
218 this.asyncQueueMaps.clear();
219 }
220 LOG.info("Stopping async topic tasks");
221 if (this.globalTopicSemaphore != null) {
222 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
223 }
224 synchronized (this.asyncTopicMaps) {
225 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
226 synchronized (m) {
227 for (StoreTask task : m.values()) {
228 task.cancel();
229 }
230 }
231 }
232 this.asyncTopicMaps.clear();
233 }
234 if (this.globalQueueSemaphore != null) {
235 this.globalQueueSemaphore.drainPermits();
236 }
237 if (this.globalTopicSemaphore != null) {
238 this.globalTopicSemaphore.drainPermits();
239 }
240 if (this.queueExecutor != null) {
241 this.queueExecutor.shutdownNow();
242 }
243 if (this.topicExecutor != null) {
244 this.topicExecutor.shutdownNow();
245 }
246 LOG.info("Stopped KahaDB");
247 super.doStop(stopper);
248 }
249
250 void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
251 Location location;
252 this.indexLock.writeLock().lock();
253 try {
254 location = findMessageLocation(key, destination);
255 } finally {
256 this.indexLock.writeLock().unlock();
257 }
258
259 if (location != null) {
260 KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
261 Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
262
263 message.incrementRedeliveryCounter();
264 if (LOG.isTraceEnabled()) {
265 LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
266 }
267 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
268 addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
269
270 final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
271
272 this.indexLock.writeLock().lock();
273 try {
274 pageFile.tx().execute(new Transaction.Closure<IOException>() {
275 public void execute(Transaction tx) throws IOException {
276 StoredDestination sd = getStoredDestination(destination, tx);
277 Long sequence = sd.messageIdIndex.get(tx, key);
278 MessageKeys keys = sd.orderIndex.get(tx, sequence);
279 sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
280 }
281 });
282 } finally {
283 this.indexLock.writeLock().unlock();
284 }
285 }
286 }
287
288 @Override
289 void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
290 if (brokerService != null) {
291 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
292 if (regionBroker != null) {
293 Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination));
294 for (Destination destination : destinationSet) {
295 destination.getDestinationStatistics().getMessages().decrement();
296 destination.getDestinationStatistics().getEnqueues().decrement();
297 }
298 }
299 }
300 }
301
302 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
303 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
304 public Location execute(Transaction tx) throws IOException {
305 StoredDestination sd = getStoredDestination(destination, tx);
306 Long sequence = sd.messageIdIndex.get(tx, key);
307 if (sequence == null) {
308 return null;
309 }
310 return sd.orderIndex.get(tx, sequence).location;
311 }
312 });
313 }
314
315 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
316 StoreQueueTask task = null;
317 synchronized (store.asyncTaskMap) {
318 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
319 }
320 return task;
321 }
322
323 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
324 synchronized (store.asyncTaskMap) {
325 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
326 }
327 this.queueExecutor.execute(task);
328 }
329
330 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
331 StoreTopicTask task = null;
332 synchronized (store.asyncTaskMap) {
333 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
334 }
335 return task;
336 }
337
338 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
339 synchronized (store.asyncTaskMap) {
340 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
341 }
342 this.topicExecutor.execute(task);
343 }
344
345 public TransactionStore createTransactionStore() throws IOException {
346 return this.transactionStore;
347 }
348
349 public boolean getForceRecoverIndex() {
350 return this.forceRecoverIndex;
351 }
352
353 public void setForceRecoverIndex(boolean forceRecoverIndex) {
354 this.forceRecoverIndex = forceRecoverIndex;
355 }
356
357 public class KahaDBMessageStore extends AbstractMessageStore {
358 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
359 protected KahaDestination dest;
360 private final int maxAsyncJobs;
361 private final Semaphore localDestinationSemaphore;
362
363 double doneTasks, canceledTasks = 0;
364
365 public KahaDBMessageStore(ActiveMQDestination destination) {
366 super(destination);
367 this.dest = convert(destination);
368 this.maxAsyncJobs = getMaxAsyncJobs();
369 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
370 }
371
372 @Override
373 public ActiveMQDestination getDestination() {
374 return destination;
375 }
376
377 @Override
378 public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
379 throws IOException {
380 if (isConcurrentStoreAndDispatchQueues()) {
381 StoreQueueTask result = new StoreQueueTask(this, context, message);
382 result.aquireLocks();
383 addQueueTask(this, result);
384 return result.getFuture();
385 } else {
386 return super.asyncAddQueueMessage(context, message);
387 }
388 }
389
390 @Override
391 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
392 if (isConcurrentStoreAndDispatchQueues()) {
393 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
394 StoreQueueTask task = null;
395 synchronized (asyncTaskMap) {
396 task = (StoreQueueTask) asyncTaskMap.get(key);
397 }
398 if (task != null) {
399 if (!task.cancel()) {
400 try {
401
402 task.future.get();
403 } catch (InterruptedException e) {
404 throw new InterruptedIOException(e.toString());
405 } catch (Exception ignored) {
406 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
407 }
408 removeMessage(context, ack);
409 } else {
410 synchronized (asyncTaskMap) {
411 asyncTaskMap.remove(key);
412 }
413 }
414 } else {
415 removeMessage(context, ack);
416 }
417 } else {
418 removeMessage(context, ack);
419 }
420 }
421
422 public void addMessage(ConnectionContext context, Message message) throws IOException {
423 KahaAddMessageCommand command = new KahaAddMessageCommand();
424 command.setDestination(dest);
425 command.setMessageId(message.getMessageId().toString());
426 command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
427 command.setPriority(message.getPriority());
428 command.setPrioritySupported(isPrioritizedMessages());
429 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
430 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
431 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
432
433 }
434
435 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
436 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
437 command.setDestination(dest);
438 command.setMessageId(ack.getLastMessageId().toString());
439 command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
440
441 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
442 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
443 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
444 }
445
446 public void removeAllMessages(ConnectionContext context) throws IOException {
447 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
448 command.setDestination(dest);
449 store(command, true, null, null);
450 }
451
452 public Message getMessage(MessageId identity) throws IOException {
453 final String key = identity.toString();
454
455 // Hopefully one day the page file supports concurrent read
456 // operations... but for now we must
457 // externally synchronize...
458 Location location;
459 indexLock.writeLock().lock();
460 try {
461 location = findMessageLocation(key, dest);
462 }finally {
463 indexLock.writeLock().unlock();
464 }
465 if (location == null) {
466 return null;
467 }
468
469 return loadMessage(location);
470 }
471
472 public int getMessageCount() throws IOException {
473 try {
474 lockAsyncJobQueue();
475 indexLock.writeLock().lock();
476 try {
477 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
478 public Integer execute(Transaction tx) throws IOException {
479 // Iterate through all index entries to get a count
480 // of
481 // messages in the destination.
482 StoredDestination sd = getStoredDestination(dest, tx);
483 int rc = 0;
484 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
485 .hasNext();) {
486 iterator.next();
487 rc++;
488 }
489 return rc;
490 }
491 });
492 }finally {
493 indexLock.writeLock().unlock();
494 }
495 } finally {
496 unlockAsyncJobQueue();
497 }
498 }
499
500 @Override
501 public boolean isEmpty() throws IOException {
502 indexLock.writeLock().lock();
503 try {
504 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
505 public Boolean execute(Transaction tx) throws IOException {
506 // Iterate through all index entries to get a count of
507 // messages in the destination.
508 StoredDestination sd = getStoredDestination(dest, tx);
509 return sd.locationIndex.isEmpty(tx);
510 }
511 });
512 }finally {
513 indexLock.writeLock().unlock();
514 }
515 }
516
517 public void recover(final MessageRecoveryListener listener) throws Exception {
518 // recovery may involve expiry which will modify
519 indexLock.writeLock().lock();
520 try {
521 pageFile.tx().execute(new Transaction.Closure<Exception>() {
522 public void execute(Transaction tx) throws Exception {
523 StoredDestination sd = getStoredDestination(dest, tx);
524 sd.orderIndex.resetCursorPosition();
525 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
526 .hasNext(); ) {
527 Entry<Long, MessageKeys> entry = iterator.next();
528 if (ackedAndPrepared.contains(entry.getValue().messageId)) {
529 continue;
530 }
531 Message msg = loadMessage(entry.getValue().location);
532 listener.recoverMessage(msg);
533 }
534 }
535 });
536 }finally {
537 indexLock.writeLock().unlock();
538 }
539 }
540
541
542 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
543 indexLock.writeLock().lock();
544 try {
545 pageFile.tx().execute(new Transaction.Closure<Exception>() {
546 public void execute(Transaction tx) throws Exception {
547 StoredDestination sd = getStoredDestination(dest, tx);
548 Entry<Long, MessageKeys> entry = null;
549 int counter = 0;
550 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
551 listener.hasSpace() && iterator.hasNext(); ) {
552 entry = iterator.next();
553 if (ackedAndPrepared.contains(entry.getValue().messageId)) {
554 continue;
555 }
556 Message msg = loadMessage(entry.getValue().location);
557 listener.recoverMessage(msg);
558 counter++;
559 if (counter >= maxReturned) {
560 break;
561 }
562 }
563 sd.orderIndex.stoppedIterating();
564 }
565 });
566 }finally {
567 indexLock.writeLock().unlock();
568 }
569 }
570
571 public void resetBatching() {
572 if (pageFile.isLoaded()) {
573 indexLock.writeLock().lock();
574 try {
575 pageFile.tx().execute(new Transaction.Closure<Exception>() {
576 public void execute(Transaction tx) throws Exception {
577 StoredDestination sd = getExistingStoredDestination(dest, tx);
578 if (sd != null) {
579 sd.orderIndex.resetCursorPosition();}
580 }
581 });
582 } catch (Exception e) {
583 LOG.error("Failed to reset batching",e);
584 }finally {
585 indexLock.writeLock().unlock();
586 }
587 }
588 }
589
590 @Override
591 public void setBatch(MessageId identity) throws IOException {
592 try {
593 final String key = identity.toString();
594 lockAsyncJobQueue();
595
596 // Hopefully one day the page file supports concurrent read
597 // operations... but for now we must
598 // externally synchronize...
599
600 indexLock.writeLock().lock();
601 try {
602 pageFile.tx().execute(new Transaction.Closure<IOException>() {
603 public void execute(Transaction tx) throws IOException {
604 StoredDestination sd = getStoredDestination(dest, tx);
605 Long location = sd.messageIdIndex.get(tx, key);
606 if (location != null) {
607 sd.orderIndex.setBatch(tx, location);
608 }
609 }
610 });
611 } finally {
612 indexLock.writeLock().unlock();
613 }
614 } finally {
615 unlockAsyncJobQueue();
616 }
617 }
618
619 @Override
620 public void setMemoryUsage(MemoryUsage memoeyUSage) {
621 }
622 @Override
623 public void start() throws Exception {
624 super.start();
625 }
626 @Override
627 public void stop() throws Exception {
628 super.stop();
629 }
630
631 protected void lockAsyncJobQueue() {
632 try {
633 this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
634 } catch (Exception e) {
635 LOG.error("Failed to lock async jobs for " + this.destination, e);
636 }
637 }
638
639 protected void unlockAsyncJobQueue() {
640 this.localDestinationSemaphore.release(this.maxAsyncJobs);
641 }
642
643 protected void acquireLocalAsyncLock() {
644 try {
645 this.localDestinationSemaphore.acquire();
646 } catch (InterruptedException e) {
647 LOG.error("Failed to aquire async lock for " + this.destination, e);
648 }
649 }
650
651 protected void releaseLocalAsyncLock() {
652 this.localDestinationSemaphore.release();
653 }
654
655 }
656
657 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
658 private final AtomicInteger subscriptionCount = new AtomicInteger();
659 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
660 super(destination);
661 this.subscriptionCount.set(getAllSubscriptions().length);
662 asyncTopicMaps.add(asyncTaskMap);
663 }
664
665 @Override
666 public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
667 throws IOException {
668 if (isConcurrentStoreAndDispatchTopics()) {
669 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
670 result.aquireLocks();
671 addTopicTask(this, result);
672 return result.getFuture();
673 } else {
674 return super.asyncAddTopicMessage(context, message);
675 }
676 }
677
678 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
679 MessageId messageId, MessageAck ack)
680 throws IOException {
681 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
682 if (isConcurrentStoreAndDispatchTopics()) {
683 AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
684 StoreTopicTask task = null;
685 synchronized (asyncTaskMap) {
686 task = (StoreTopicTask) asyncTaskMap.get(key);
687 }
688 if (task != null) {
689 if (task.addSubscriptionKey(subscriptionKey)) {
690 removeTopicTask(this, messageId);
691 if (task.cancel()) {
692 synchronized (asyncTaskMap) {
693 asyncTaskMap.remove(key);
694 }
695 }
696 }
697 } else {
698 doAcknowledge(context, subscriptionKey, messageId, ack);
699 }
700 } else {
701 doAcknowledge(context, subscriptionKey, messageId, ack);
702 }
703 }
704
705 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
706 throws IOException {
707 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
708 command.setDestination(dest);
709 command.setSubscriptionKey(subscriptionKey);
710 command.setMessageId(messageId.toString());
711 command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
712 if (ack != null && ack.isUnmatchedAck()) {
713 command.setAck(UNMATCHED);
714 }
715 store(command, false, null, null);
716 }
717
718 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
719 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
720 .getSubscriptionName());
721 KahaSubscriptionCommand command = new KahaSubscriptionCommand();
722 command.setDestination(dest);
723 command.setSubscriptionKey(subscriptionKey.toString());
724 command.setRetroactive(retroactive);
725 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
726 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
727 store(command, isEnableJournalDiskSyncs() && true, null, null);
728 this.subscriptionCount.incrementAndGet();
729 }
730
731 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
732 KahaSubscriptionCommand command = new KahaSubscriptionCommand();
733 command.setDestination(dest);
734 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
735 store(command, isEnableJournalDiskSyncs() && true, null, null);
736 this.subscriptionCount.decrementAndGet();
737 }
738
739 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
740
741 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
742 indexLock.writeLock().lock();
743 try {
744 pageFile.tx().execute(new Transaction.Closure<IOException>() {
745 public void execute(Transaction tx) throws IOException {
746 StoredDestination sd = getStoredDestination(dest, tx);
747 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
748 .hasNext();) {
749 Entry<String, KahaSubscriptionCommand> entry = iterator.next();
750 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
751 .getValue().getSubscriptionInfo().newInput()));
752 subscriptions.add(info);
753
754 }
755 }
756 });
757 }finally {
758 indexLock.writeLock().unlock();
759 }
760
761 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
762 subscriptions.toArray(rc);
763 return rc;
764 }
765
766 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
767 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
768 indexLock.writeLock().lock();
769 try {
770 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
771 public SubscriptionInfo execute(Transaction tx) throws IOException {
772 StoredDestination sd = getStoredDestination(dest, tx);
773 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
774 if (command == null) {
775 return null;
776 }
777 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
778 .getSubscriptionInfo().newInput()));
779 }
780 });
781 }finally {
782 indexLock.writeLock().unlock();
783 }
784 }
785
786 public int getMessageCount(String clientId, String subscriptionName) throws IOException {
787 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
788 indexLock.writeLock().lock();
789 try {
790 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
791 public Integer execute(Transaction tx) throws IOException {
792 StoredDestination sd = getStoredDestination(dest, tx);
793 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
794 if (cursorPos == null) {
795 // The subscription might not exist.
796 return 0;
797 }
798
799 return (int) getStoredMessageCount(tx, sd, subscriptionKey);
800 }
801 });
802 }finally {
803 indexLock.writeLock().unlock();
804 }
805 }
806
807 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
808 throws Exception {
809 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
810 @SuppressWarnings("unused")
811 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
812 indexLock.writeLock().lock();
813 try {
814 pageFile.tx().execute(new Transaction.Closure<Exception>() {
815 public void execute(Transaction tx) throws Exception {
816 StoredDestination sd = getStoredDestination(dest, tx);
817 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
818 sd.orderIndex.setBatch(tx, cursorPos);
819 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
820 .hasNext();) {
821 Entry<Long, MessageKeys> entry = iterator.next();
822 listener.recoverMessage(loadMessage(entry.getValue().location));
823 }
824 sd.orderIndex.resetCursorPosition();
825 }
826 });
827 }finally {
828 indexLock.writeLock().unlock();
829 }
830 }
831
832 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
833 final MessageRecoveryListener listener) throws Exception {
834 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
835 @SuppressWarnings("unused")
836 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
837 indexLock.writeLock().lock();
838 try {
839 pageFile.tx().execute(new Transaction.Closure<Exception>() {
840 public void execute(Transaction tx) throws Exception {
841 StoredDestination sd = getStoredDestination(dest, tx);
842 sd.orderIndex.resetCursorPosition();
843 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
844 if (moc == null) {
845 LastAck pos = getLastAck(tx, sd, subscriptionKey);
846 if (pos == null) {
847 // sub deleted
848 return;
849 }
850 sd.orderIndex.setBatch(tx, pos);
851 moc = sd.orderIndex.cursor;
852 } else {
853 sd.orderIndex.cursor.sync(moc);
854 }
855
856 Entry<Long, MessageKeys> entry = null;
857 int counter = 0;
858 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
859 .hasNext();) {
860 entry = iterator.next();
861 if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
862 counter++;
863 }
864 if (counter >= maxReturned || listener.hasSpace() == false) {
865 break;
866 }
867 }
868 sd.orderIndex.stoppedIterating();
869 if (entry != null) {
870 MessageOrderCursor copy = sd.orderIndex.cursor.copy();
871 sd.subscriptionCursors.put(subscriptionKey, copy);
872 }
873 }
874 });
875 }finally {
876 indexLock.writeLock().unlock();
877 }
878 }
879
880 public void resetBatching(String clientId, String subscriptionName) {
881 try {
882 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
883 indexLock.writeLock().lock();
884 try {
885 pageFile.tx().execute(new Transaction.Closure<IOException>() {
886 public void execute(Transaction tx) throws IOException {
887 StoredDestination sd = getStoredDestination(dest, tx);
888 sd.subscriptionCursors.remove(subscriptionKey);
889 }
890 });
891 }finally {
892 indexLock.writeLock().unlock();
893 }
894 } catch (IOException e) {
895 throw new RuntimeException(e);
896 }
897 }
898 }
899
900 String subscriptionKey(String clientId, String subscriptionName) {
901 return clientId + ":" + subscriptionName;
902 }
903
904 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
905 return this.transactionStore.proxy(new KahaDBMessageStore(destination));
906 }
907
908 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
909 return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
910 }
911
912 /**
913 * Cleanup method to remove any state associated with the given destination.
914 * This method does not stop the message store (it might not be cached).
915 *
916 * @param destination
917 * Destination to forget
918 */
919 public void removeQueueMessageStore(ActiveMQQueue destination) {
920 }
921
922 /**
923 * Cleanup method to remove any state associated with the given destination
924 * This method does not stop the message store (it might not be cached).
925 *
926 * @param destination
927 * Destination to forget
928 */
929 public void removeTopicMessageStore(ActiveMQTopic destination) {
930 }
931
932 public void deleteAllMessages() throws IOException {
933 deleteAllMessages = true;
934 }
935
936 public Set<ActiveMQDestination> getDestinations() {
937 try {
938 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
939 indexLock.writeLock().lock();
940 try {
941 pageFile.tx().execute(new Transaction.Closure<IOException>() {
942 public void execute(Transaction tx) throws IOException {
943 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
944 .hasNext();) {
945 Entry<String, StoredDestination> entry = iterator.next();
946 if (!isEmptyTopic(entry, tx)) {
947 rc.add(convert(entry.getKey()));
948 }
949 }
950 }
951
952 private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
953 throws IOException {
954 boolean isEmptyTopic = false;
955 ActiveMQDestination dest = convert(entry.getKey());
956 if (dest.isTopic()) {
957 StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
958 if (loadedStore.subscriptionAcks.isEmpty(tx)) {
959 isEmptyTopic = true;
960 }
961 }
962 return isEmptyTopic;
963 }
964 });
965 }finally {
966 indexLock.writeLock().unlock();
967 }
968 return rc;
969 } catch (IOException e) {
970 throw new RuntimeException(e);
971 }
972 }
973
974 public long getLastMessageBrokerSequenceId() throws IOException {
975 return 0;
976 }
977
978 public long getLastProducerSequenceId(ProducerId id) {
979 indexLock.readLock().lock();
980 try {
981 return metadata.producerSequenceIdTracker.getLastSeqId(id);
982 } finally {
983 indexLock.readLock().unlock();
984 }
985 }
986
987 public long size() {
988 return storeSize.get();
989 }
990
991 public void beginTransaction(ConnectionContext context) throws IOException {
992 throw new IOException("Not yet implemented.");
993 }
994 public void commitTransaction(ConnectionContext context) throws IOException {
995 throw new IOException("Not yet implemented.");
996 }
997 public void rollbackTransaction(ConnectionContext context) throws IOException {
998 throw new IOException("Not yet implemented.");
999 }
1000
1001 public void checkpoint(boolean sync) throws IOException {
1002 super.checkpointCleanup(sync);
1003 }
1004
1005 // /////////////////////////////////////////////////////////////////
1006 // Internal helper methods.
1007 // /////////////////////////////////////////////////////////////////
1008
1009 /**
1010 * @param location
1011 * @return
1012 * @throws IOException
1013 */
1014 Message loadMessage(Location location) throws IOException {
1015 KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
1016 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1017 return msg;
1018 }
1019
1020 // /////////////////////////////////////////////////////////////////
1021 // Internal conversion methods.
1022 // /////////////////////////////////////////////////////////////////
1023
1024 KahaLocation convert(Location location) {
1025 KahaLocation rc = new KahaLocation();
1026 rc.setLogId(location.getDataFileId());
1027 rc.setOffset(location.getOffset());
1028 return rc;
1029 }
1030
1031 KahaDestination convert(ActiveMQDestination dest) {
1032 KahaDestination rc = new KahaDestination();
1033 rc.setName(dest.getPhysicalName());
1034 switch (dest.getDestinationType()) {
1035 case ActiveMQDestination.QUEUE_TYPE:
1036 rc.setType(DestinationType.QUEUE);
1037 return rc;
1038 case ActiveMQDestination.TOPIC_TYPE:
1039 rc.setType(DestinationType.TOPIC);
1040 return rc;
1041 case ActiveMQDestination.TEMP_QUEUE_TYPE:
1042 rc.setType(DestinationType.TEMP_QUEUE);
1043 return rc;
1044 case ActiveMQDestination.TEMP_TOPIC_TYPE:
1045 rc.setType(DestinationType.TEMP_TOPIC);
1046 return rc;
1047 default:
1048 return null;
1049 }
1050 }
1051
1052 ActiveMQDestination convert(String dest) {
1053 int p = dest.indexOf(":");
1054 if (p < 0) {
1055 throw new IllegalArgumentException("Not in the valid destination format");
1056 }
1057 int type = Integer.parseInt(dest.substring(0, p));
1058 String name = dest.substring(p + 1);
1059 return convert(type, name);
1060 }
1061
1062 private ActiveMQDestination convert(KahaDestination commandDestination) {
1063 return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1064 }
1065
1066 private ActiveMQDestination convert(int type, String name) {
1067 switch (KahaDestination.DestinationType.valueOf(type)) {
1068 case QUEUE:
1069 return new ActiveMQQueue(name);
1070 case TOPIC:
1071 return new ActiveMQTopic(name);
1072 case TEMP_QUEUE:
1073 return new ActiveMQTempQueue(name);
1074 case TEMP_TOPIC:
1075 return new ActiveMQTempTopic(name);
1076 default:
1077 throw new IllegalArgumentException("Not in the valid destination format");
1078 }
1079 }
1080
1081 public TransactionIdTransformer getTransactionIdTransformer() {
1082 return transactionIdTransformer;
1083 }
1084
1085 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1086 this.transactionIdTransformer = transactionIdTransformer;
1087 }
1088
1089 static class AsyncJobKey {
1090 MessageId id;
1091 ActiveMQDestination destination;
1092
1093 AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1094 this.id = id;
1095 this.destination = destination;
1096 }
1097
1098 @Override
1099 public boolean equals(Object obj) {
1100 if (obj == this) {
1101 return true;
1102 }
1103 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1104 && destination.equals(((AsyncJobKey) obj).destination);
1105 }
1106
1107 @Override
1108 public int hashCode() {
1109 return id.hashCode() + destination.hashCode();
1110 }
1111
1112 @Override
1113 public String toString() {
1114 return destination.getPhysicalName() + "-" + id;
1115 }
1116 }
1117
1118 public interface StoreTask {
1119 public boolean cancel();
1120
1121 public void aquireLocks();
1122
1123 public void releaseLocks();
1124 }
1125
1126 class StoreQueueTask implements Runnable, StoreTask {
1127 protected final Message message;
1128 protected final ConnectionContext context;
1129 protected final KahaDBMessageStore store;
1130 protected final InnerFutureTask future;
1131 protected final AtomicBoolean done = new AtomicBoolean();
1132 protected final AtomicBoolean locked = new AtomicBoolean();
1133
1134 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1135 this.store = store;
1136 this.context = context;
1137 this.message = message;
1138 this.future = new InnerFutureTask(this);
1139 }
1140
1141 public Future<Object> getFuture() {
1142 return this.future;
1143 }
1144
1145 public boolean cancel() {
1146 if (this.done.compareAndSet(false, true)) {
1147 return this.future.cancel(false);
1148 }
1149 return false;
1150 }
1151
1152 public void aquireLocks() {
1153 if (this.locked.compareAndSet(false, true)) {
1154 try {
1155 globalQueueSemaphore.acquire();
1156 store.acquireLocalAsyncLock();
1157 message.incrementReferenceCount();
1158 } catch (InterruptedException e) {
1159 LOG.warn("Failed to aquire lock", e);
1160 }
1161 }
1162
1163 }
1164
1165 public void releaseLocks() {
1166 if (this.locked.compareAndSet(true, false)) {
1167 store.releaseLocalAsyncLock();
1168 globalQueueSemaphore.release();
1169 message.decrementReferenceCount();
1170 }
1171 }
1172
1173 public void run() {
1174 this.store.doneTasks++;
1175 try {
1176 if (this.done.compareAndSet(false, true)) {
1177 this.store.addMessage(context, message);
1178 removeQueueTask(this.store, this.message.getMessageId());
1179 this.future.complete();
1180 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1181 System.err.println(this.store.dest.getName() + " cancelled: "
1182 + (this.store.canceledTasks / this.store.doneTasks) * 100);
1183 this.store.canceledTasks = this.store.doneTasks = 0;
1184 }
1185 } catch (Exception e) {
1186 this.future.setException(e);
1187 }
1188 }
1189
1190 protected Message getMessage() {
1191 return this.message;
1192 }
1193
1194 private class InnerFutureTask extends FutureTask<Object> {
1195
1196 public InnerFutureTask(Runnable runnable) {
1197 super(runnable, null);
1198
1199 }
1200
1201 public void setException(final Exception e) {
1202 super.setException(e);
1203 }
1204
1205 public void complete() {
1206 super.set(null);
1207 }
1208 }
1209 }
1210
1211 class StoreTopicTask extends StoreQueueTask {
1212 private final int subscriptionCount;
1213 private final List<String> subscriptionKeys = new ArrayList<String>(1);
1214 private final KahaDBTopicMessageStore topicStore;
1215 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1216 int subscriptionCount) {
1217 super(store, context, message);
1218 this.topicStore = store;
1219 this.subscriptionCount = subscriptionCount;
1220
1221 }
1222
1223 @Override
1224 public void aquireLocks() {
1225 if (this.locked.compareAndSet(false, true)) {
1226 try {
1227 globalTopicSemaphore.acquire();
1228 store.acquireLocalAsyncLock();
1229 message.incrementReferenceCount();
1230 } catch (InterruptedException e) {
1231 LOG.warn("Failed to aquire lock", e);
1232 }
1233 }
1234
1235 }
1236
1237 @Override
1238 public void releaseLocks() {
1239 if (this.locked.compareAndSet(true, false)) {
1240 message.decrementReferenceCount();
1241 store.releaseLocalAsyncLock();
1242 globalTopicSemaphore.release();
1243 }
1244 }
1245
1246 /**
1247 * add a key
1248 *
1249 * @param key
1250 * @return true if all acknowledgements received
1251 */
1252 public boolean addSubscriptionKey(String key) {
1253 synchronized (this.subscriptionKeys) {
1254 this.subscriptionKeys.add(key);
1255 }
1256 return this.subscriptionKeys.size() >= this.subscriptionCount;
1257 }
1258
1259 @Override
1260 public void run() {
1261 this.store.doneTasks++;
1262 try {
1263 if (this.done.compareAndSet(false, true)) {
1264 this.topicStore.addMessage(context, message);
1265 // apply any acks we have
1266 synchronized (this.subscriptionKeys) {
1267 for (String key : this.subscriptionKeys) {
1268 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1269
1270 }
1271 }
1272 removeTopicTask(this.topicStore, this.message.getMessageId());
1273 this.future.complete();
1274 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1275 System.err.println(this.store.dest.getName() + " cancelled: "
1276 + (this.store.canceledTasks / this.store.doneTasks) * 100);
1277 this.store.canceledTasks = this.store.doneTasks = 0;
1278 }
1279 } catch (Exception e) {
1280 this.future.setException(e);
1281 }
1282 }
1283 }
1284
1285 public class StoreTaskExecutor extends ThreadPoolExecutor {
1286
1287 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1288 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1289 }
1290
1291 protected void afterExecute(Runnable runnable, Throwable throwable) {
1292 super.afterExecute(runnable, throwable);
1293
1294 if (runnable instanceof StoreTask) {
1295 ((StoreTask)runnable).releaseLocks();
1296 }
1297
1298 }
1299 }
1300 }