001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Collection;
022 import java.util.Collections;
023 import java.util.Comparator;
024 import java.util.HashSet;
025 import java.util.Iterator;
026 import java.util.LinkedHashMap;
027 import java.util.LinkedList;
028 import java.util.List;
029 import java.util.Map;
030 import java.util.Set;
031 import java.util.concurrent.CancellationException;
032 import java.util.concurrent.ConcurrentLinkedQueue;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.DelayQueue;
035 import java.util.concurrent.Delayed;
036 import java.util.concurrent.ExecutorService;
037 import java.util.concurrent.Future;
038 import java.util.concurrent.TimeUnit;
039 import java.util.concurrent.atomic.AtomicLong;
040 import java.util.concurrent.locks.Lock;
041 import java.util.concurrent.locks.ReentrantLock;
042 import java.util.concurrent.locks.ReentrantReadWriteLock;
043
044 import javax.jms.InvalidSelectorException;
045 import javax.jms.JMSException;
046 import javax.jms.ResourceAllocationException;
047
048 import org.apache.activemq.broker.BrokerService;
049 import org.apache.activemq.broker.ConnectionContext;
050 import org.apache.activemq.broker.ProducerBrokerExchange;
051 import org.apache.activemq.broker.region.cursors.OrderedPendingList;
052 import org.apache.activemq.broker.region.cursors.PendingList;
053 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
054 import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
055 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
056 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
057 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
058 import org.apache.activemq.broker.region.group.MessageGroupMap;
059 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
060 import org.apache.activemq.broker.region.policy.DispatchPolicy;
061 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
062 import org.apache.activemq.broker.util.InsertionCountList;
063 import org.apache.activemq.command.ActiveMQDestination;
064 import org.apache.activemq.command.ActiveMQMessage;
065 import org.apache.activemq.command.ConsumerId;
066 import org.apache.activemq.command.ExceptionResponse;
067 import org.apache.activemq.command.Message;
068 import org.apache.activemq.command.MessageAck;
069 import org.apache.activemq.command.MessageDispatchNotification;
070 import org.apache.activemq.command.MessageId;
071 import org.apache.activemq.command.ProducerAck;
072 import org.apache.activemq.command.ProducerInfo;
073 import org.apache.activemq.command.Response;
074 import org.apache.activemq.filter.BooleanExpression;
075 import org.apache.activemq.filter.MessageEvaluationContext;
076 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
077 import org.apache.activemq.selector.SelectorParser;
078 import org.apache.activemq.state.ProducerState;
079 import org.apache.activemq.store.MessageRecoveryListener;
080 import org.apache.activemq.store.MessageStore;
081 import org.apache.activemq.thread.Task;
082 import org.apache.activemq.thread.TaskRunner;
083 import org.apache.activemq.thread.TaskRunnerFactory;
084 import org.apache.activemq.transaction.Synchronization;
085 import org.apache.activemq.usage.Usage;
086 import org.apache.activemq.usage.UsageListener;
087 import org.apache.activemq.util.BrokerSupport;
088 import org.slf4j.Logger;
089 import org.slf4j.LoggerFactory;
090 import org.slf4j.MDC;
091
092 /**
093 * The Queue is a List of MessageEntry objects that are dispatched to matching
094 * subscriptions.
095 */
096 public class Queue extends BaseDestination implements Task, UsageListener {
097 protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
098 protected final TaskRunnerFactory taskFactory;
099 protected TaskRunner taskRunner;
100 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
101 protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
102 private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
103 protected PendingMessageCursor messages;
104 private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
105 private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
106 // Messages that are paged in but have not yet been targeted at a
107 // subscription
108 private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
109 protected PendingList pagedInPendingDispatch = new OrderedPendingList();
110 protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
111 private MessageGroupMap messageGroupOwners;
112 private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
113 private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
114 final Lock sendLock = new ReentrantLock();
115 private ExecutorService executor;
116 private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>();
117 private boolean useConsumerPriority = true;
118 private boolean strictOrderDispatch = false;
119 private final QueueDispatchSelector dispatchSelector;
120 private boolean optimizedDispatch = false;
121 private boolean iterationRunning = false;
122 private boolean firstConsumer = false;
123 private int timeBeforeDispatchStarts = 0;
124 private int consumersBeforeDispatchStarts = 0;
125 private CountDownLatch consumersBeforeStartsLatch;
126 private final AtomicLong pendingWakeups = new AtomicLong();
127 private boolean allConsumersExclusiveByDefault = false;
128
129 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
130 public void run() {
131 asyncWakeup();
132 }
133 };
134 private final Runnable expireMessagesTask = new Runnable() {
135 public void run() {
136 expireMessages();
137 }
138 };
139
140 private final Object iteratingMutex = new Object();
141
142 class TimeoutMessage implements Delayed {
143
144 Message message;
145 ConnectionContext context;
146 long trigger;
147
148 public TimeoutMessage(Message message, ConnectionContext context, long delay) {
149 this.message = message;
150 this.context = context;
151 this.trigger = System.currentTimeMillis() + delay;
152 }
153
154 public long getDelay(TimeUnit unit) {
155 long n = trigger - System.currentTimeMillis();
156 return unit.convert(n, TimeUnit.MILLISECONDS);
157 }
158
159 public int compareTo(Delayed delayed) {
160 long other = ((TimeoutMessage) delayed).trigger;
161 int returnValue;
162 if (this.trigger < other) {
163 returnValue = -1;
164 } else if (this.trigger > other) {
165 returnValue = 1;
166 } else {
167 returnValue = 0;
168 }
169 return returnValue;
170 }
171
172 }
173
174 DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
175
176 class FlowControlTimeoutTask extends Thread {
177
178 @Override
179 public void run() {
180 TimeoutMessage timeout;
181 try {
182 while (true) {
183 timeout = flowControlTimeoutMessages.take();
184 if (timeout != null) {
185 synchronized (messagesWaitingForSpace) {
186 if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
187 ExceptionResponse response = new ExceptionResponse(
188 new ResourceAllocationException(
189 "Usage Manager Memory Limit reached. Stopping producer ("
190 + timeout.message.getProducerId()
191 + ") to prevent flooding "
192 + getActiveMQDestination().getQualifiedName()
193 + "."
194 + " See http://activemq.apache.org/producer-flow-control.html for more info"));
195 response.setCorrelationId(timeout.message.getCommandId());
196 timeout.context.getConnection().dispatchAsync(response);
197 }
198 }
199 }
200 }
201 } catch (InterruptedException e) {
202 if (LOG.isDebugEnabled()) {
203 LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping");
204 }
205 }
206 }
207 };
208
209 private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
210
211 private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
212
213 public int compare(Subscription s1, Subscription s2) {
214 // We want the list sorted in descending order
215 return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
216 }
217 };
218
219 public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
220 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
221 super(brokerService, store, destination, parentStats);
222 this.taskFactory = taskFactory;
223 this.dispatchSelector = new QueueDispatchSelector(destination);
224 }
225
226 public List<Subscription> getConsumers() {
227 consumersLock.readLock().lock();
228 try {
229 return new ArrayList<Subscription>(consumers);
230 }finally {
231 consumersLock.readLock().unlock();
232 }
233 }
234
235 // make the queue easily visible in the debugger from its task runner
236 // threads
237 final class QueueThread extends Thread {
238 final Queue queue;
239
240 public QueueThread(Runnable runnable, String name, Queue queue) {
241 super(runnable, name);
242 this.queue = queue;
243 }
244 }
245
246 class BatchMessageRecoveryListener implements MessageRecoveryListener {
247 final LinkedList<Message> toExpire = new LinkedList<Message>();
248 final double totalMessageCount;
249 int recoveredAccumulator = 0;
250 int currentBatchCount;
251
252 BatchMessageRecoveryListener(int totalMessageCount) {
253 this.totalMessageCount = totalMessageCount;
254 currentBatchCount = recoveredAccumulator;
255 }
256
257 public boolean recoverMessage(Message message) {
258 recoveredAccumulator++;
259 if (LOG.isInfoEnabled() && (recoveredAccumulator % 10000) == 0) {
260 LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered "
261 + recoveredAccumulator + " messages. " +
262 (int) (recoveredAccumulator * 100 / totalMessageCount) + "% complete");
263 }
264 // Message could have expired while it was being
265 // loaded..
266 if (message.isExpired() && broker.isExpired(message)) {
267 toExpire.add(message);
268 return true;
269 }
270 if (hasSpace()) {
271 message.setRegionDestination(Queue.this);
272 messagesLock.writeLock().lock();
273 try {
274 try {
275 messages.addMessageLast(message);
276 } catch (Exception e) {
277 LOG.error("Failed to add message to cursor", e);
278 }
279 } finally {
280 messagesLock.writeLock().unlock();
281 }
282 destinationStatistics.getMessages().increment();
283 return true;
284 }
285 return false;
286 }
287
288 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
289 throw new RuntimeException("Should not be called.");
290 }
291
292 public boolean hasSpace() {
293 return true;
294 }
295
296 public boolean isDuplicate(MessageId id) {
297 return false;
298 }
299
300 public void reset() {
301 currentBatchCount = recoveredAccumulator;
302 }
303
304 public void processExpired() {
305 for (Message message: toExpire) {
306 messageExpired(createConnectionContext(), createMessageReference(message));
307 // drop message will decrement so counter
308 // balance here
309 destinationStatistics.getMessages().increment();
310 }
311 toExpire.clear();
312 }
313
314 public boolean done() {
315 return currentBatchCount == recoveredAccumulator;
316 }
317 }
318
319 @Override
320 public void setPrioritizedMessages(boolean prioritizedMessages) {
321 super.setPrioritizedMessages(prioritizedMessages);
322
323 if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
324 pagedInPendingDispatch = new PrioritizedPendingList();
325 redeliveredWaitingDispatch = new PrioritizedPendingList();
326 } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
327 pagedInPendingDispatch = new OrderedPendingList();
328 redeliveredWaitingDispatch = new OrderedPendingList();
329 }
330 }
331
332 @Override
333 public void initialize() throws Exception {
334
335 if (this.messages == null) {
336 if (destination.isTemporary() || broker == null || store == null) {
337 this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
338 } else {
339 this.messages = new StoreQueueCursor(broker, this);
340 }
341 }
342
343 // If a VMPendingMessageCursor don't use the default Producer System
344 // Usage
345 // since it turns into a shared blocking queue which can lead to a
346 // network deadlock.
347 // If we are cursoring to disk..it's not and issue because it does not
348 // block due
349 // to large disk sizes.
350 if (messages instanceof VMPendingMessageCursor) {
351 this.systemUsage = brokerService.getSystemUsage();
352 memoryUsage.setParent(systemUsage.getMemoryUsage());
353 }
354
355 this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
356
357 super.initialize();
358 if (store != null) {
359 // Restore the persistent messages.
360 messages.setSystemUsage(systemUsage);
361 messages.setEnableAudit(isEnableAudit());
362 messages.setMaxAuditDepth(getMaxAuditDepth());
363 messages.setMaxProducersToAudit(getMaxProducersToAudit());
364 messages.setUseCache(isUseCache());
365 messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
366 final int messageCount = store.getMessageCount();
367 if (messageCount > 0 && messages.isRecoveryRequired()) {
368 BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
369 do {
370 listener.reset();
371 store.recoverNextMessages(getMaxPageSize(), listener);
372 listener.processExpired();
373 } while (!listener.done());
374 } else {
375 destinationStatistics.getMessages().setCount(messageCount);
376 }
377 }
378 }
379
380 /*
381 * Holder for subscription that needs attention on next iterate browser
382 * needs access to existing messages in the queue that have already been
383 * dispatched
384 */
385 class BrowserDispatch {
386 QueueBrowserSubscription browser;
387
388 public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
389 browser = browserSubscription;
390 browser.incrementQueueRef();
391 }
392
393 void done() {
394 try {
395 browser.decrementQueueRef();
396 } catch (Exception e) {
397 LOG.warn("decrement ref on browser: " + browser, e);
398 }
399 }
400
401 public QueueBrowserSubscription getBrowser() {
402 return browser;
403 }
404 }
405
406 ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
407
408 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
409 if (LOG.isDebugEnabled()) {
410 LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: "
411 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
412 + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
413 + getDestinationStatistics().getInflight().getCount());
414 }
415
416 super.addSubscription(context, sub);
417 // synchronize with dispatch method so that no new messages are sent
418 // while setting up a subscription. avoid out of order messages,
419 // duplicates, etc.
420 pagedInPendingDispatchLock.writeLock().lock();
421 try {
422
423 sub.add(context, this);
424
425 // needs to be synchronized - so no contention with dispatching
426 // consumersLock.
427 consumersLock.writeLock().lock();
428 try {
429
430 // set a flag if this is a first consumer
431 if (consumers.size() == 0) {
432 firstConsumer = true;
433 if (consumersBeforeDispatchStarts != 0) {
434 consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
435 }
436 } else {
437 if (consumersBeforeStartsLatch != null) {
438 consumersBeforeStartsLatch.countDown();
439 }
440 }
441
442 addToConsumerList(sub);
443 if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
444 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
445 if (exclusiveConsumer == null) {
446 exclusiveConsumer = sub;
447 } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
448 sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
449 exclusiveConsumer = sub;
450 }
451 dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
452 }
453 }finally {
454 consumersLock.writeLock().unlock();
455 }
456
457 if (sub instanceof QueueBrowserSubscription) {
458 // tee up for dispatch in next iterate
459 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
460 BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
461 browserDispatches.add(browserDispatch);
462 }
463
464 if (!(this.optimizedDispatch || isSlave())) {
465 wakeup();
466 }
467 }finally {
468 pagedInPendingDispatchLock.writeLock().unlock();
469 }
470 if (this.optimizedDispatch || isSlave()) {
471 // Outside of dispatchLock() to maintain the lock hierarchy of
472 // iteratingMutex -> dispatchLock. - see
473 // https://issues.apache.org/activemq/browse/AMQ-1878
474 wakeup();
475 }
476 }
477
478 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
479 throws Exception {
480 super.removeSubscription(context, sub, lastDeiveredSequenceId);
481 // synchronize with dispatch method so that no new messages are sent
482 // while removing up a subscription.
483 pagedInPendingDispatchLock.writeLock().lock();
484 try {
485 if (LOG.isDebugEnabled()) {
486 LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
487 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
488 + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
489 + getDestinationStatistics().getInflight().getCount());
490 }
491 consumersLock.writeLock().lock();
492 try {
493 removeFromConsumerList(sub);
494 if (sub.getConsumerInfo().isExclusive()) {
495 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
496 if (exclusiveConsumer == sub) {
497 exclusiveConsumer = null;
498 for (Subscription s : consumers) {
499 if (s.getConsumerInfo().isExclusive()
500 && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
501 .getConsumerInfo().getPriority())) {
502 exclusiveConsumer = s;
503
504 }
505 }
506 dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
507 }
508 } else if (isAllConsumersExclusiveByDefault()) {
509 Subscription exclusiveConsumer = null;
510 for (Subscription s : consumers) {
511 if (exclusiveConsumer == null
512 || s.getConsumerInfo().getPriority() > exclusiveConsumer
513 .getConsumerInfo().getPriority()) {
514 exclusiveConsumer = s;
515 }
516 }
517 dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
518 }
519 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
520 getMessageGroupOwners().removeConsumer(consumerId);
521
522 // redeliver inflight messages
523
524 boolean markAsRedelivered = false;
525 MessageReference lastDeliveredRef = null;
526 List<MessageReference> unAckedMessages = sub.remove(context, this);
527
528 // locate last redelivered in unconsumed list (list in delivery rather than seq order)
529 if (lastDeiveredSequenceId != 0) {
530 for (MessageReference ref : unAckedMessages) {
531 if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
532 lastDeliveredRef = ref;
533 markAsRedelivered = true;
534 if (LOG.isDebugEnabled()) {
535 LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId());
536 }
537 break;
538 }
539 }
540 }
541 for (MessageReference ref : unAckedMessages) {
542 QueueMessageReference qmr = (QueueMessageReference) ref;
543 if (qmr.getLockOwner() == sub) {
544 qmr.unlock();
545
546 // have no delivery information
547 if (lastDeiveredSequenceId == 0) {
548 qmr.incrementRedeliveryCounter();
549 } else {
550 if (markAsRedelivered) {
551 qmr.incrementRedeliveryCounter();
552 }
553 if (ref == lastDeliveredRef) {
554 // all that follow were not redelivered
555 markAsRedelivered = false;
556 }
557 }
558 }
559 redeliveredWaitingDispatch.addMessageLast(qmr);
560 }
561 if (!redeliveredWaitingDispatch.isEmpty()) {
562 doDispatch(new OrderedPendingList());
563 }
564 }finally {
565 consumersLock.writeLock().unlock();
566 }
567 if (!(this.optimizedDispatch || isSlave())) {
568 wakeup();
569 }
570 }finally {
571 pagedInPendingDispatchLock.writeLock().unlock();
572 }
573 if (this.optimizedDispatch || isSlave()) {
574 // Outside of dispatchLock() to maintain the lock hierarchy of
575 // iteratingMutex -> dispatchLock. - see
576 // https://issues.apache.org/activemq/browse/AMQ-1878
577 wakeup();
578 }
579 }
580
581 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
582 final ConnectionContext context = producerExchange.getConnectionContext();
583 // There is delay between the client sending it and it arriving at the
584 // destination.. it may have expired.
585 message.setRegionDestination(this);
586 ProducerState state = producerExchange.getProducerState();
587 if (state == null) {
588 LOG.warn("Send failed for: " + message + ", missing producer state for: " + producerExchange);
589 throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
590 }
591 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
592 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
593 && !context.isInRecoveryMode();
594 if (message.isExpired()) {
595 // message not stored - or added to stats yet - so chuck here
596 broker.getRoot().messageExpired(context, message, null);
597 if (sendProducerAck) {
598 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
599 context.getConnection().dispatchAsync(ack);
600 }
601 return;
602 }
603 if (memoryUsage.isFull()) {
604 isFull(context, memoryUsage);
605 fastProducer(context, producerInfo);
606 if (isProducerFlowControl() && context.isProducerFlowControl()) {
607 if (warnOnProducerFlowControl) {
608 warnOnProducerFlowControl = false;
609 LOG
610 .info("Usage Manager Memory Limit ("
611 + memoryUsage.getLimit()
612 + ") reached on "
613 + getActiveMQDestination().getQualifiedName()
614 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
615 + " See http://activemq.apache.org/producer-flow-control.html for more info");
616 }
617
618 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
619 throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
620 + message.getProducerId() + ") to prevent flooding "
621 + getActiveMQDestination().getQualifiedName() + "."
622 + " See http://activemq.apache.org/producer-flow-control.html for more info");
623 }
624
625 // We can avoid blocking due to low usage if the producer is
626 // sending
627 // a sync message or if it is using a producer window
628 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
629 // copy the exchange state since the context will be
630 // modified while we are waiting
631 // for space.
632 final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
633 synchronized (messagesWaitingForSpace) {
634 // Start flow control timeout task
635 // Prevent trying to start it multiple times
636 if (!flowControlTimeoutTask.isAlive()) {
637 flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task");
638 flowControlTimeoutTask.start();
639 }
640 messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
641 public void run() {
642
643 try {
644 // While waiting for space to free up... the
645 // message may have expired.
646 if (message.isExpired()) {
647 LOG.error("expired waiting for space..");
648 broker.messageExpired(context, message, null);
649 destinationStatistics.getExpired().increment();
650 } else {
651 doMessageSend(producerExchangeCopy, message);
652 }
653
654 if (sendProducerAck) {
655 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
656 .getSize());
657 context.getConnection().dispatchAsync(ack);
658 } else {
659 Response response = new Response();
660 response.setCorrelationId(message.getCommandId());
661 context.getConnection().dispatchAsync(response);
662 }
663
664 } catch (Exception e) {
665 if (!sendProducerAck && !context.isInRecoveryMode()) {
666 ExceptionResponse response = new ExceptionResponse(e);
667 response.setCorrelationId(message.getCommandId());
668 context.getConnection().dispatchAsync(response);
669 } else {
670 LOG.debug("unexpected exception on deferred send of :" + message, e);
671 }
672 }
673 }
674 });
675
676 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
677 flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
678 .getSendFailIfNoSpaceAfterTimeout()));
679 }
680
681 registerCallbackForNotFullNotification();
682 context.setDontSendReponse(true);
683 return;
684 }
685
686 } else {
687
688 if (memoryUsage.isFull()) {
689 waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
690 + message.getProducerId() + ") stopped to prevent flooding "
691 + getActiveMQDestination().getQualifiedName() + "."
692 + " See http://activemq.apache.org/producer-flow-control.html for more info");
693 }
694
695 // The usage manager could have delayed us by the time
696 // we unblock the message could have expired..
697 if (message.isExpired()) {
698 if (LOG.isDebugEnabled()) {
699 LOG.debug("Expired message: " + message);
700 }
701 broker.getRoot().messageExpired(context, message, null);
702 return;
703 }
704 }
705 }
706 }
707 doMessageSend(producerExchange, message);
708 if (sendProducerAck) {
709 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
710 context.getConnection().dispatchAsync(ack);
711 }
712 }
713
714 private void registerCallbackForNotFullNotification() {
715 // If the usage manager is not full, then the task will not
716 // get called..
717 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
718 // so call it directly here.
719 sendMessagesWaitingForSpaceTask.run();
720 }
721 }
722
723 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
724 Exception {
725 final ConnectionContext context = producerExchange.getConnectionContext();
726 Future<Object> result = null;
727
728 checkUsage(context, message);
729 sendLock.lockInterruptibly();
730 try {
731 if (store != null && message.isPersistent()) {
732 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
733 if (messages.isCacheEnabled()) {
734 result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
735 } else {
736 store.addMessage(context, message);
737 }
738 if (isReduceMemoryFootprint()) {
739 message.clearMarshalledState();
740 }
741 }
742 if (context.isInTransaction()) {
743 // If this is a transacted message.. increase the usage now so that
744 // a big TX does not blow up
745 // our memory. This increment is decremented once the tx finishes..
746 message.incrementReferenceCount();
747
748 context.getTransaction().addSynchronization(new Synchronization() {
749 @Override
750 public void afterCommit() throws Exception {
751 sendLock.lockInterruptibly();
752 try {
753 // It could take while before we receive the commit
754 // op, by that time the message could have expired..
755 if (broker.isExpired(message)) {
756 broker.messageExpired(context, message, null);
757 destinationStatistics.getExpired().increment();
758 return;
759 }
760 sendMessage(message);
761 } finally {
762 sendLock.unlock();
763 message.decrementReferenceCount();
764 }
765 messageSent(context, message);
766 }
767 @Override
768 public void afterRollback() throws Exception {
769 message.decrementReferenceCount();
770 }
771 });
772 } else {
773 // Add to the pending list, this takes care of incrementing the
774 // usage manager.
775 sendMessage(message);
776 }
777 } finally {
778 sendLock.unlock();
779 }
780 if (!context.isInTransaction()) {
781 messageSent(context, message);
782 }
783 if (result != null && !result.isCancelled()) {
784 try {
785 result.get();
786 } catch (CancellationException e) {
787 // ignore - the task has been cancelled if the message
788 // has already been deleted
789 }
790 }
791 }
792
793 private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
794 if (message.isPersistent()) {
795 if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
796 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
797 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
798 + message.getProducerId() + ") to prevent flooding "
799 + getActiveMQDestination().getQualifiedName() + "."
800 + " See http://activemq.apache.org/producer-flow-control.html for more info";
801
802 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
803 }
804 } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
805 final String logMessage = "Temp Store is Full ("
806 + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
807 +"). Stopping producer (" + message.getProducerId()
808 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
809 + " See http://activemq.apache.org/producer-flow-control.html for more info";
810
811 waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
812 }
813 }
814
815 private void expireMessages() {
816 if (LOG.isDebugEnabled()) {
817 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages ..");
818 }
819
820 // just track the insertion count
821 List<Message> browsedMessages = new InsertionCountList<Message>();
822 doBrowse(browsedMessages, this.getMaxExpirePageSize());
823 asyncWakeup();
824 if (LOG.isDebugEnabled()) {
825 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages done.");
826 }
827 }
828
829 public void gc() {
830 }
831
832 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
833 throws IOException {
834 messageConsumed(context, node);
835 if (store != null && node.isPersistent()) {
836 // the original ack may be a ranged ack, but we are trying to delete
837 // a specific
838 // message store here so we need to convert to a non ranged ack.
839 if (ack.getMessageCount() > 0) {
840 // Dup the ack
841 MessageAck a = new MessageAck();
842 ack.copy(a);
843 ack = a;
844 // Convert to non-ranged.
845 ack.setFirstMessageId(node.getMessageId());
846 ack.setLastMessageId(node.getMessageId());
847 ack.setMessageCount(1);
848 }
849
850 store.removeAsyncMessage(context, ack);
851 }
852 }
853
854 Message loadMessage(MessageId messageId) throws IOException {
855 Message msg = null;
856 if (store != null) { // can be null for a temp q
857 msg = store.getMessage(messageId);
858 if (msg != null) {
859 msg.setRegionDestination(this);
860 }
861 }
862 return msg;
863 }
864
865 @Override
866 public String toString() {
867 int size = 0;
868 messagesLock.readLock().lock();
869 try{
870 size = messages.size();
871 }finally {
872 messagesLock.readLock().unlock();
873 }
874 return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
875 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
876 + messageGroupOwners;
877 }
878
879 public void start() throws Exception {
880 if (memoryUsage != null) {
881 memoryUsage.start();
882 }
883 if (systemUsage.getStoreUsage() != null) {
884 systemUsage.getStoreUsage().start();
885 }
886 systemUsage.getMemoryUsage().addUsageListener(this);
887 messages.start();
888 if (getExpireMessagesPeriod() > 0) {
889 scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
890 }
891 doPageIn(false);
892 }
893
894 public void stop() throws Exception {
895 if (taskRunner != null) {
896 taskRunner.shutdown();
897 }
898 if (this.executor != null) {
899 this.executor.shutdownNow();
900 }
901
902 scheduler.cancel(expireMessagesTask);
903
904 if (flowControlTimeoutTask.isAlive()) {
905 flowControlTimeoutTask.interrupt();
906 }
907
908 if (messages != null) {
909 messages.stop();
910 }
911
912 systemUsage.getMemoryUsage().removeUsageListener(this);
913 if (memoryUsage != null) {
914 memoryUsage.stop();
915 }
916 if (store != null) {
917 store.stop();
918 }
919 }
920
921 // Properties
922 // -------------------------------------------------------------------------
923 @Override
924 public ActiveMQDestination getActiveMQDestination() {
925 return destination;
926 }
927
928 public MessageGroupMap getMessageGroupOwners() {
929 if (messageGroupOwners == null) {
930 messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
931 }
932 return messageGroupOwners;
933 }
934
935 public DispatchPolicy getDispatchPolicy() {
936 return dispatchPolicy;
937 }
938
939 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
940 this.dispatchPolicy = dispatchPolicy;
941 }
942
943 public MessageGroupMapFactory getMessageGroupMapFactory() {
944 return messageGroupMapFactory;
945 }
946
947 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
948 this.messageGroupMapFactory = messageGroupMapFactory;
949 }
950
951 public PendingMessageCursor getMessages() {
952 return this.messages;
953 }
954
955 public void setMessages(PendingMessageCursor messages) {
956 this.messages = messages;
957 }
958
959 public boolean isUseConsumerPriority() {
960 return useConsumerPriority;
961 }
962
963 public void setUseConsumerPriority(boolean useConsumerPriority) {
964 this.useConsumerPriority = useConsumerPriority;
965 }
966
967 public boolean isStrictOrderDispatch() {
968 return strictOrderDispatch;
969 }
970
971 public void setStrictOrderDispatch(boolean strictOrderDispatch) {
972 this.strictOrderDispatch = strictOrderDispatch;
973 }
974
975 public boolean isOptimizedDispatch() {
976 return optimizedDispatch;
977 }
978
979 public void setOptimizedDispatch(boolean optimizedDispatch) {
980 this.optimizedDispatch = optimizedDispatch;
981 }
982
983 public int getTimeBeforeDispatchStarts() {
984 return timeBeforeDispatchStarts;
985 }
986
987 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
988 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
989 }
990
991 public int getConsumersBeforeDispatchStarts() {
992 return consumersBeforeDispatchStarts;
993 }
994
995 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
996 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
997 }
998
999 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
1000 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
1001 }
1002
1003 public boolean isAllConsumersExclusiveByDefault() {
1004 return allConsumersExclusiveByDefault;
1005 }
1006
1007
1008 // Implementation methods
1009 // -------------------------------------------------------------------------
1010 private QueueMessageReference createMessageReference(Message message) {
1011 QueueMessageReference result = new IndirectMessageReference(message);
1012 return result;
1013 }
1014
1015 public Message[] browse() {
1016 List<Message> browseList = new ArrayList<Message>();
1017 doBrowse(browseList, getMaxBrowsePageSize());
1018 return browseList.toArray(new Message[browseList.size()]);
1019 }
1020
1021 public void doBrowse(List<Message> browseList, int max) {
1022 final ConnectionContext connectionContext = createConnectionContext();
1023 try {
1024 pageInMessages(false);
1025 List<MessageReference> toExpire = new ArrayList<MessageReference>();
1026
1027 pagedInPendingDispatchLock.writeLock().lock();
1028 try {
1029 addAll(pagedInPendingDispatch.values(), browseList, max, toExpire);
1030 for (MessageReference ref : toExpire) {
1031 pagedInPendingDispatch.remove(ref);
1032 if (broker.isExpired(ref)) {
1033 if (LOG.isDebugEnabled()) {
1034 LOG.debug("expiring from pagedInPending: " + ref);
1035 }
1036 messageExpired(connectionContext, ref);
1037 }
1038 }
1039 } finally {
1040 pagedInPendingDispatchLock.writeLock().unlock();
1041 }
1042 toExpire.clear();
1043 pagedInMessagesLock.readLock().lock();
1044 try {
1045 addAll(pagedInMessages.values(), browseList, max, toExpire);
1046 } finally {
1047 pagedInMessagesLock.readLock().unlock();
1048 }
1049 for (MessageReference ref : toExpire) {
1050 if (broker.isExpired(ref)) {
1051 if (LOG.isDebugEnabled()) {
1052 LOG.debug("expiring from pagedInMessages: " + ref);
1053 }
1054 messageExpired(connectionContext, ref);
1055 } else {
1056 pagedInMessagesLock.writeLock().lock();
1057 try {
1058 pagedInMessages.remove(ref.getMessageId());
1059 } finally {
1060 pagedInMessagesLock.writeLock().unlock();
1061 }
1062 }
1063 }
1064
1065 if (browseList.size() < getMaxBrowsePageSize()) {
1066 messagesLock.writeLock().lock();
1067 try {
1068 try {
1069 messages.reset();
1070 while (messages.hasNext() && browseList.size() < max) {
1071 MessageReference node = messages.next();
1072 if (node.isExpired()) {
1073 if (broker.isExpired(node)) {
1074 if (LOG.isDebugEnabled()) {
1075 LOG.debug("expiring from messages: " + node);
1076 }
1077 messageExpired(connectionContext, createMessageReference(node.getMessage()));
1078 }
1079 messages.remove();
1080 } else {
1081 messages.rollback(node.getMessageId());
1082 if (browseList.contains(node.getMessage()) == false) {
1083 browseList.add(node.getMessage());
1084 }
1085 }
1086 node.decrementReferenceCount();
1087 }
1088 } finally {
1089 messages.release();
1090 }
1091 } finally {
1092 messagesLock.writeLock().unlock();
1093 }
1094 }
1095
1096 } catch (Exception e) {
1097 LOG.error("Problem retrieving message for browse", e);
1098 }
1099 }
1100
1101 private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize,
1102 List<MessageReference> toExpire) throws Exception {
1103 for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
1104 QueueMessageReference ref = (QueueMessageReference) i.next();
1105 if (ref.isExpired()) {
1106 toExpire.add(ref);
1107 } else if (l.contains(ref.getMessage()) == false) {
1108 l.add(ref.getMessage());
1109 }
1110 }
1111 }
1112
1113 public QueueMessageReference getMessage(String id) {
1114 MessageId msgId = new MessageId(id);
1115 pagedInMessagesLock.readLock().lock();
1116 try{
1117 QueueMessageReference ref = this.pagedInMessages.get(msgId);
1118 if (ref != null) {
1119 return ref;
1120 }
1121 }finally {
1122 pagedInMessagesLock.readLock().unlock();
1123 }
1124 messagesLock.readLock().lock();
1125 try{
1126 try {
1127 messages.reset();
1128 while (messages.hasNext()) {
1129 MessageReference mr = messages.next();
1130 QueueMessageReference qmr = createMessageReference(mr.getMessage());
1131 qmr.decrementReferenceCount();
1132 messages.rollback(qmr.getMessageId());
1133 if (msgId.equals(qmr.getMessageId())) {
1134 return qmr;
1135 }
1136 }
1137 } finally {
1138 messages.release();
1139 }
1140 }finally {
1141 messagesLock.readLock().unlock();
1142 }
1143 return null;
1144 }
1145
1146 public void purge() throws Exception {
1147 ConnectionContext c = createConnectionContext();
1148 List<MessageReference> list = null;
1149 do {
1150 doPageIn(true);
1151 pagedInMessagesLock.readLock().lock();
1152 try {
1153 list = new ArrayList<MessageReference>(pagedInMessages.values());
1154 }finally {
1155 pagedInMessagesLock.readLock().unlock();
1156 }
1157
1158 for (MessageReference ref : list) {
1159 try {
1160 QueueMessageReference r = (QueueMessageReference) ref;
1161 removeMessage(c, r);
1162 } catch (IOException e) {
1163 }
1164 }
1165 // don't spin/hang if stats are out and there is nothing left in the
1166 // store
1167 } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
1168 if (this.destinationStatistics.getMessages().getCount() > 0) {
1169 LOG.warn(getActiveMQDestination().getQualifiedName()
1170 + " after purge complete, message count stats report: "
1171 + this.destinationStatistics.getMessages().getCount());
1172 }
1173 gc();
1174 this.destinationStatistics.getMessages().setCount(0);
1175 getMessages().clear();
1176 }
1177
1178 public void clearPendingMessages() {
1179 messagesLock.writeLock().lock();
1180 try {
1181 if (store != null) {
1182 store.resetBatching();
1183 }
1184 messages.gc();
1185 asyncWakeup();
1186 } finally {
1187 messagesLock.writeLock().unlock();
1188 }
1189 }
1190
1191 /**
1192 * Removes the message matching the given messageId
1193 */
1194 public boolean removeMessage(String messageId) throws Exception {
1195 return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
1196 }
1197
1198 /**
1199 * Removes the messages matching the given selector
1200 *
1201 * @return the number of messages removed
1202 */
1203 public int removeMatchingMessages(String selector) throws Exception {
1204 return removeMatchingMessages(selector, -1);
1205 }
1206
1207 /**
1208 * Removes the messages matching the given selector up to the maximum number
1209 * of matched messages
1210 *
1211 * @return the number of messages removed
1212 */
1213 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
1214 return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
1215 }
1216
1217 /**
1218 * Removes the messages matching the given filter up to the maximum number
1219 * of matched messages
1220 *
1221 * @return the number of messages removed
1222 */
1223 public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
1224 int movedCounter = 0;
1225 Set<MessageReference> set = new HashSet<MessageReference>();
1226 ConnectionContext context = createConnectionContext();
1227 do {
1228 doPageIn(true);
1229 pagedInMessagesLock.readLock().lock();
1230 try{
1231 set.addAll(pagedInMessages.values());
1232 }finally {
1233 pagedInMessagesLock.readLock().unlock();
1234 }
1235 List<MessageReference> list = new ArrayList<MessageReference>(set);
1236 for (MessageReference ref : list) {
1237 IndirectMessageReference r = (IndirectMessageReference) ref;
1238 if (filter.evaluate(context, r)) {
1239
1240 removeMessage(context, r);
1241 set.remove(r);
1242 if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1243 return movedCounter;
1244 }
1245 }
1246 }
1247 } while (set.size() < this.destinationStatistics.getMessages().getCount());
1248 return movedCounter;
1249 }
1250
1251 /**
1252 * Copies the message matching the given messageId
1253 */
1254 public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1255 throws Exception {
1256 return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
1257 }
1258
1259 /**
1260 * Copies the messages matching the given selector
1261 *
1262 * @return the number of messages copied
1263 */
1264 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1265 throws Exception {
1266 return copyMatchingMessagesTo(context, selector, dest, -1);
1267 }
1268
1269 /**
1270 * Copies the messages matching the given selector up to the maximum number
1271 * of matched messages
1272 *
1273 * @return the number of messages copied
1274 */
1275 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1276 int maximumMessages) throws Exception {
1277 return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
1278 }
1279
1280 /**
1281 * Copies the messages matching the given filter up to the maximum number of
1282 * matched messages
1283 *
1284 * @return the number of messages copied
1285 */
1286 public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
1287 int maximumMessages) throws Exception {
1288 int movedCounter = 0;
1289 int count = 0;
1290 Set<MessageReference> set = new HashSet<MessageReference>();
1291 do {
1292 int oldMaxSize = getMaxPageSize();
1293 setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
1294 doPageIn(true);
1295 setMaxPageSize(oldMaxSize);
1296 pagedInMessagesLock.readLock().lock();
1297 try {
1298 set.addAll(pagedInMessages.values());
1299 }finally {
1300 pagedInMessagesLock.readLock().unlock();
1301 }
1302 List<MessageReference> list = new ArrayList<MessageReference>(set);
1303 for (MessageReference ref : list) {
1304 IndirectMessageReference r = (IndirectMessageReference) ref;
1305 if (filter.evaluate(context, r)) {
1306
1307 r.incrementReferenceCount();
1308 try {
1309 Message m = r.getMessage();
1310 BrokerSupport.resend(context, m, dest);
1311 if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1312 return movedCounter;
1313 }
1314 } finally {
1315 r.decrementReferenceCount();
1316 }
1317 }
1318 count++;
1319 }
1320 } while (count < this.destinationStatistics.getMessages().getCount());
1321 return movedCounter;
1322 }
1323
1324 /**
1325 * Move a message
1326 *
1327 * @param context
1328 * connection context
1329 * @param m
1330 * QueueMessageReference
1331 * @param dest
1332 * ActiveMQDestination
1333 * @throws Exception
1334 */
1335 public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
1336 BrokerSupport.resend(context, m.getMessage(), dest);
1337 removeMessage(context, m);
1338 messagesLock.writeLock().lock();
1339 try{
1340 messages.rollback(m.getMessageId());
1341 }finally {
1342 messagesLock.writeLock().unlock();
1343 }
1344 return true;
1345 }
1346
1347 /**
1348 * Moves the message matching the given messageId
1349 */
1350 public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1351 throws Exception {
1352 return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
1353 }
1354
1355 /**
1356 * Moves the messages matching the given selector
1357 *
1358 * @return the number of messages removed
1359 */
1360 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1361 throws Exception {
1362 return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
1363 }
1364
1365 /**
1366 * Moves the messages matching the given selector up to the maximum number
1367 * of matched messages
1368 */
1369 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1370 int maximumMessages) throws Exception {
1371 return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
1372 }
1373
1374 /**
1375 * Moves the messages matching the given filter up to the maximum number of
1376 * matched messages
1377 */
1378 public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
1379 ActiveMQDestination dest, int maximumMessages) throws Exception {
1380 int movedCounter = 0;
1381 Set<QueueMessageReference> set = new HashSet<QueueMessageReference>();
1382 do {
1383 doPageIn(true);
1384 pagedInMessagesLock.readLock().lock();
1385 try{
1386 set.addAll(pagedInMessages.values());
1387 }finally {
1388 pagedInMessagesLock.readLock().unlock();
1389 }
1390 List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
1391 for (QueueMessageReference ref : list) {
1392 if (filter.evaluate(context, ref)) {
1393 // We should only move messages that can be locked.
1394 moveMessageTo(context, ref, dest);
1395 set.remove(ref);
1396 if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1397 return movedCounter;
1398 }
1399 }
1400 }
1401 } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
1402 return movedCounter;
1403 }
1404
1405 /**
1406 * @return true if we would like to iterate again
1407 * @see org.apache.activemq.thread.Task#iterate()
1408 */
1409 public boolean iterate() {
1410 MDC.put("activemq.destination", getName());
1411 boolean pageInMoreMessages = false;
1412 synchronized (iteratingMutex) {
1413
1414 // If optimize dispatch is on or this is a slave this method could be called recursively
1415 // we set this state value to short-circuit wakeup in those cases to avoid that as it
1416 // could lead to errors.
1417 iterationRunning = true;
1418
1419 // do early to allow dispatch of these waiting messages
1420 synchronized (messagesWaitingForSpace) {
1421 Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
1422 while (it.hasNext()) {
1423 if (!memoryUsage.isFull()) {
1424 Runnable op = it.next();
1425 it.remove();
1426 op.run();
1427 } else {
1428 registerCallbackForNotFullNotification();
1429 break;
1430 }
1431 }
1432 }
1433
1434 if (firstConsumer) {
1435 firstConsumer = false;
1436 try {
1437 if (consumersBeforeDispatchStarts > 0) {
1438 int timeout = 1000; // wait one second by default if
1439 // consumer count isn't reached
1440 if (timeBeforeDispatchStarts > 0) {
1441 timeout = timeBeforeDispatchStarts;
1442 }
1443 if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
1444 if (LOG.isDebugEnabled()) {
1445 LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
1446 }
1447 } else {
1448 if (LOG.isDebugEnabled()) {
1449 LOG.debug(timeout + " ms elapsed and " + consumers.size()
1450 + " consumers subscribed. Starting dispatch.");
1451 }
1452 }
1453 }
1454 if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
1455 iteratingMutex.wait(timeBeforeDispatchStarts);
1456 if (LOG.isDebugEnabled()) {
1457 LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
1458 }
1459 }
1460 } catch (Exception e) {
1461 LOG.error(e.toString());
1462 }
1463 }
1464
1465 BrowserDispatch pendingBrowserDispatch = browserDispatches.poll();
1466
1467 messagesLock.readLock().lock();
1468 try{
1469 pageInMoreMessages |= !messages.isEmpty();
1470 } finally {
1471 messagesLock.readLock().unlock();
1472 }
1473
1474 pagedInPendingDispatchLock.readLock().lock();
1475 try {
1476 pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
1477 } finally {
1478 pagedInPendingDispatchLock.readLock().unlock();
1479 }
1480
1481 // Perhaps we should page always into the pagedInPendingDispatch
1482 // list if
1483 // !messages.isEmpty(), and then if
1484 // !pagedInPendingDispatch.isEmpty()
1485 // then we do a dispatch.
1486 if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
1487 try {
1488 pageInMessages(pendingBrowserDispatch != null);
1489
1490 } catch (Throwable e) {
1491 LOG.error("Failed to page in more queue messages ", e);
1492 }
1493 }
1494
1495 if (pendingBrowserDispatch != null) {
1496 ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
1497 pagedInMessagesLock.readLock().lock();
1498 try{
1499 alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
1500 }finally {
1501 pagedInMessagesLock.readLock().unlock();
1502 }
1503 if (LOG.isDebugEnabled()) {
1504 LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
1505 + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
1506 }
1507 do {
1508 try {
1509 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
1510 msgContext.setDestination(destination);
1511
1512 QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
1513 for (QueueMessageReference node : alreadyDispatchedMessages) {
1514 if (!node.isAcked()) {
1515 msgContext.setMessageReference(node);
1516 if (browser.matches(node, msgContext)) {
1517 browser.add(node);
1518 }
1519 }
1520 }
1521 pendingBrowserDispatch.done();
1522 } catch (Exception e) {
1523 LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
1524 }
1525
1526 } while ((pendingBrowserDispatch = browserDispatches.poll()) != null);
1527 }
1528
1529 if (pendingWakeups.get() > 0) {
1530 pendingWakeups.decrementAndGet();
1531 }
1532 MDC.remove("activemq.destination");
1533 iterationRunning = false;
1534
1535 return pendingWakeups.get() > 0;
1536 }
1537 }
1538
1539 protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
1540 return new MessageReferenceFilter() {
1541 public boolean evaluate(ConnectionContext context, MessageReference r) {
1542 return messageId.equals(r.getMessageId().toString());
1543 }
1544
1545 @Override
1546 public String toString() {
1547 return "MessageIdFilter: " + messageId;
1548 }
1549 };
1550 }
1551
1552 protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException {
1553
1554 if (selector == null || selector.isEmpty()) {
1555 return new MessageReferenceFilter() {
1556
1557 @Override
1558 public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException {
1559 return true;
1560 }
1561 };
1562 }
1563
1564 final BooleanExpression selectorExpression = SelectorParser.parse(selector);
1565
1566 return new MessageReferenceFilter() {
1567 public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
1568 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
1569
1570 messageEvaluationContext.setMessageReference(r);
1571 if (messageEvaluationContext.getDestination() == null) {
1572 messageEvaluationContext.setDestination(getActiveMQDestination());
1573 }
1574
1575 return selectorExpression.matches(messageEvaluationContext);
1576 }
1577 };
1578 }
1579
1580 protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
1581 removeMessage(c, null, r);
1582 pagedInPendingDispatchLock.writeLock().lock();
1583 try {
1584 pagedInPendingDispatch.remove(r);
1585 } finally {
1586 pagedInPendingDispatchLock.writeLock().unlock();
1587 }
1588
1589 }
1590
1591 protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
1592 MessageAck ack = new MessageAck();
1593 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
1594 ack.setDestination(destination);
1595 ack.setMessageID(r.getMessageId());
1596 removeMessage(c, subs, r, ack);
1597 }
1598
1599 protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
1600 MessageAck ack) throws IOException {
1601 reference.setAcked(true);
1602 // This sends the ack the the journal..
1603 if (!ack.isInTransaction()) {
1604 acknowledge(context, sub, ack, reference);
1605 getDestinationStatistics().getDequeues().increment();
1606 dropMessage(reference);
1607 } else {
1608 try {
1609 acknowledge(context, sub, ack, reference);
1610 } finally {
1611 context.getTransaction().addSynchronization(new Synchronization() {
1612
1613 @Override
1614 public void afterCommit() throws Exception {
1615 getDestinationStatistics().getDequeues().increment();
1616 dropMessage(reference);
1617 wakeup();
1618 }
1619
1620 @Override
1621 public void afterRollback() throws Exception {
1622 reference.setAcked(false);
1623 }
1624 });
1625 }
1626 }
1627 if (ack.isPoisonAck()) {
1628 // message gone to DLQ, is ok to allow redelivery
1629 messagesLock.writeLock().lock();
1630 try{
1631 messages.rollback(reference.getMessageId());
1632 }finally {
1633 messagesLock.writeLock().unlock();
1634 }
1635 }
1636
1637 }
1638
1639 private void dropMessage(QueueMessageReference reference) {
1640 reference.drop();
1641 destinationStatistics.getMessages().decrement();
1642 pagedInMessagesLock.writeLock().lock();
1643 try{
1644 pagedInMessages.remove(reference.getMessageId());
1645 }finally {
1646 pagedInMessagesLock.writeLock().unlock();
1647 }
1648 }
1649
1650 public void messageExpired(ConnectionContext context, MessageReference reference) {
1651 messageExpired(context, null, reference);
1652 }
1653
1654 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
1655 if (LOG.isDebugEnabled()) {
1656 LOG.debug("message expired: " + reference);
1657 }
1658 broker.messageExpired(context, reference, subs);
1659 destinationStatistics.getExpired().increment();
1660 try {
1661 removeMessage(context, subs, (QueueMessageReference) reference);
1662 } catch (IOException e) {
1663 LOG.error("Failed to remove expired Message from the store ", e);
1664 }
1665 }
1666
1667 final void sendMessage(final Message msg) throws Exception {
1668 messagesLock.writeLock().lock();
1669 try{
1670 messages.addMessageLast(msg);
1671 }finally {
1672 messagesLock.writeLock().unlock();
1673 }
1674 }
1675
1676 final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
1677 destinationStatistics.getEnqueues().increment();
1678 destinationStatistics.getMessages().increment();
1679 messageDelivered(context, msg);
1680 consumersLock.readLock().lock();
1681 try {
1682 if (consumers.isEmpty()) {
1683 onMessageWithNoConsumers(context, msg);
1684 }
1685 }finally {
1686 consumersLock.readLock().unlock();
1687 }
1688 if (LOG.isDebugEnabled()) {
1689 LOG.debug(broker.getBrokerName() + " Message " + msg.getMessageId() + " sent to " + this.destination);
1690 }
1691 wakeup();
1692 }
1693
1694 public void wakeup() {
1695 if ((optimizedDispatch || isSlave()) && !iterationRunning) {
1696 iterate();
1697 pendingWakeups.incrementAndGet();
1698 } else {
1699 asyncWakeup();
1700 }
1701 }
1702
1703 private void asyncWakeup() {
1704 try {
1705 pendingWakeups.incrementAndGet();
1706 this.taskRunner.wakeup();
1707 } catch (InterruptedException e) {
1708 LOG.warn("Async task tunner failed to wakeup ", e);
1709 }
1710 }
1711
1712 private boolean isSlave() {
1713 return broker.getBrokerService().isSlave();
1714 }
1715
1716 private void doPageIn(boolean force) throws Exception {
1717 PendingList newlyPaged = doPageInForDispatch(force);
1718 pagedInPendingDispatchLock.writeLock().lock();
1719 try {
1720 if (pagedInPendingDispatch.isEmpty()) {
1721 pagedInPendingDispatch.addAll(newlyPaged);
1722
1723 } else {
1724 for (MessageReference qmr : newlyPaged) {
1725 if (!pagedInPendingDispatch.contains(qmr)) {
1726 pagedInPendingDispatch.addMessageLast(qmr);
1727 }
1728 }
1729 }
1730 } finally {
1731 pagedInPendingDispatchLock.writeLock().unlock();
1732 }
1733 }
1734
1735 private PendingList doPageInForDispatch(boolean force) throws Exception {
1736 List<QueueMessageReference> result = null;
1737 PendingList resultList = null;
1738
1739 int toPageIn = Math.min(getMaxPageSize(), messages.size());
1740 if (LOG.isDebugEnabled()) {
1741 LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
1742 + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
1743 + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount()
1744 + ", dequeueCount: " + destinationStatistics.getDequeues().getCount());
1745 }
1746
1747 if (isLazyDispatch() && !force) {
1748 // Only page in the minimum number of messages which can be
1749 // dispatched immediately.
1750 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
1751 }
1752 int pagedInPendingSize = 0;
1753 pagedInPendingDispatchLock.readLock().lock();
1754 try {
1755 pagedInPendingSize = pagedInPendingDispatch.size();
1756 } finally {
1757 pagedInPendingDispatchLock.readLock().unlock();
1758 }
1759 if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
1760 int count = 0;
1761 result = new ArrayList<QueueMessageReference>(toPageIn);
1762 messagesLock.writeLock().lock();
1763 try {
1764 try {
1765 messages.setMaxBatchSize(toPageIn);
1766 messages.reset();
1767 while (messages.hasNext() && count < toPageIn) {
1768 MessageReference node = messages.next();
1769 messages.remove();
1770
1771 QueueMessageReference ref = createMessageReference(node.getMessage());
1772 if (ref.isExpired()) {
1773 if (broker.isExpired(ref)) {
1774 messageExpired(createConnectionContext(), ref);
1775 } else {
1776 ref.decrementReferenceCount();
1777 }
1778 } else {
1779 result.add(ref);
1780 count++;
1781 }
1782 }
1783 } finally {
1784 messages.release();
1785 }
1786 } finally {
1787 messagesLock.writeLock().unlock();
1788 }
1789 // Only add new messages, not already pagedIn to avoid multiple
1790 // dispatch attempts
1791 pagedInMessagesLock.writeLock().lock();
1792 try {
1793 if(isPrioritizedMessages()) {
1794 resultList = new PrioritizedPendingList();
1795 } else {
1796 resultList = new OrderedPendingList();
1797 }
1798 for (QueueMessageReference ref : result) {
1799 if (!pagedInMessages.containsKey(ref.getMessageId())) {
1800 pagedInMessages.put(ref.getMessageId(), ref);
1801 resultList.addMessageLast(ref);
1802 } else {
1803 ref.decrementReferenceCount();
1804 }
1805 }
1806 } finally {
1807 pagedInMessagesLock.writeLock().unlock();
1808 }
1809 } else {
1810 // Avoid return null list, if condition is not validated
1811 resultList = new OrderedPendingList();
1812 }
1813
1814 return resultList;
1815 }
1816
1817 private void doDispatch(PendingList list) throws Exception {
1818 boolean doWakeUp = false;
1819
1820 pagedInPendingDispatchLock.writeLock().lock();
1821 try {
1822 if (!redeliveredWaitingDispatch.isEmpty()) {
1823 // Try first to dispatch redelivered messages to keep an
1824 // proper order
1825 redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
1826 }
1827 if (!pagedInPendingDispatch.isEmpty()) {
1828 // Next dispatch anything that had not been
1829 // dispatched before.
1830 pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
1831 }
1832 // and now see if we can dispatch the new stuff.. and append to
1833 // the pending
1834 // list anything that does not actually get dispatched.
1835 if (list != null && !list.isEmpty()) {
1836 if (pagedInPendingDispatch.isEmpty()) {
1837 pagedInPendingDispatch.addAll(doActualDispatch(list));
1838 } else {
1839 for (MessageReference qmr : list) {
1840 if (!pagedInPendingDispatch.contains(qmr)) {
1841 pagedInPendingDispatch.addMessageLast(qmr);
1842 }
1843 }
1844 doWakeUp = true;
1845 }
1846 }
1847 } finally {
1848 pagedInPendingDispatchLock.writeLock().unlock();
1849 }
1850
1851 if (doWakeUp) {
1852 // avoid lock order contention
1853 asyncWakeup();
1854 }
1855 }
1856
1857 /**
1858 * @return list of messages that could get dispatched to consumers if they
1859 * were not full.
1860 */
1861 private PendingList doActualDispatch(PendingList list) throws Exception {
1862 List<Subscription> consumers;
1863 consumersLock.writeLock().lock();
1864
1865 try {
1866 if (this.consumers.isEmpty() || isSlave()) {
1867 // slave dispatch happens in processDispatchNotification
1868 return list;
1869 }
1870 consumers = new ArrayList<Subscription>(this.consumers);
1871 }finally {
1872 consumersLock.writeLock().unlock();
1873 }
1874
1875 PendingList rc;
1876 if(isPrioritizedMessages()) {
1877 rc = new PrioritizedPendingList();
1878 } else {
1879 rc = new OrderedPendingList();
1880 }
1881
1882 Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
1883
1884 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
1885
1886 MessageReference node = (MessageReference) iterator.next();
1887 Subscription target = null;
1888 int interestCount = 0;
1889 for (Subscription s : consumers) {
1890 if (s instanceof QueueBrowserSubscription) {
1891 interestCount++;
1892 continue;
1893 }
1894 if (!fullConsumers.contains(s) && !s.isFull()) {
1895 if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node)) {
1896 // Dispatch it.
1897 s.add(node);
1898 target = s;
1899 break;
1900 }
1901 } else {
1902 // no further dispatch of list to a full consumer to
1903 // avoid out of order message receipt
1904 fullConsumers.add(s);
1905 if (LOG.isTraceEnabled()) {
1906 LOG.trace("Sub full " + s);
1907 }
1908 }
1909 // make sure it gets dispatched again
1910 if (!node.isDropped() && !((QueueMessageReference) node).isAcked() &&
1911 (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
1912 interestCount++;
1913 }
1914 }
1915
1916 if ((target == null && interestCount > 0) || consumers.size() == 0) {
1917 // This means all subs were full or that there are no
1918 // consumers...
1919 rc.addMessageLast((QueueMessageReference) node);
1920 }
1921
1922 // If it got dispatched, rotate the consumer list to get round robin
1923 // distribution.
1924 if (target != null && !strictOrderDispatch && consumers.size() > 1
1925 && !dispatchSelector.isExclusiveConsumer(target)) {
1926 consumersLock.writeLock().lock();
1927 try {
1928 if (removeFromConsumerList(target)) {
1929 addToConsumerList(target);
1930 consumers = new ArrayList<Subscription>(this.consumers);
1931 }
1932 }finally {
1933 consumersLock.writeLock().unlock();
1934 }
1935 }
1936 }
1937
1938 return rc;
1939 }
1940
1941 protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
1942 boolean result = true;
1943 // Keep message groups together.
1944 String groupId = node.getGroupID();
1945 int sequence = node.getGroupSequence();
1946 if (groupId != null) {
1947 //MessageGroupMap messageGroupOwners = ((Queue) node
1948 // .getRegionDestination()).getMessageGroupOwners();
1949
1950 MessageGroupMap messageGroupOwners = getMessageGroupOwners();
1951 // If we can own the first, then no-one else should own the
1952 // rest.
1953 if (sequence == 1) {
1954 assignGroup(subscription, messageGroupOwners, node, groupId);
1955 } else {
1956
1957 // Make sure that the previous owner is still valid, we may
1958 // need to become the new owner.
1959 ConsumerId groupOwner;
1960
1961 groupOwner = messageGroupOwners.get(groupId);
1962 if (groupOwner == null) {
1963 assignGroup(subscription, messageGroupOwners, node, groupId);
1964 } else {
1965 if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
1966 // A group sequence < 1 is an end of group signal.
1967 if (sequence < 0) {
1968 messageGroupOwners.removeGroup(groupId);
1969 }
1970 } else {
1971 result = false;
1972 }
1973 }
1974 }
1975 }
1976
1977 return result;
1978
1979 }
1980
1981 protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
1982 messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
1983 Message message = n.getMessage();
1984 if (message instanceof ActiveMQMessage) {
1985 ActiveMQMessage activeMessage = (ActiveMQMessage) message;
1986 try {
1987 activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
1988 } catch (JMSException e) {
1989 LOG.warn("Failed to set boolean header: " + e, e);
1990 }
1991 }
1992 }
1993
1994 protected void pageInMessages(boolean force) throws Exception {
1995 doDispatch(doPageInForDispatch(force));
1996 }
1997
1998 private void addToConsumerList(Subscription sub) {
1999 if (useConsumerPriority) {
2000 consumers.add(sub);
2001 Collections.sort(consumers, orderedCompare);
2002 } else {
2003 consumers.add(sub);
2004 }
2005 }
2006
2007 private boolean removeFromConsumerList(Subscription sub) {
2008 return consumers.remove(sub);
2009 }
2010
2011 private int getConsumerMessageCountBeforeFull() throws Exception {
2012 int total = 0;
2013 boolean zeroPrefetch = false;
2014 consumersLock.readLock().lock();
2015 try{
2016 for (Subscription s : consumers) {
2017 zeroPrefetch |= s.getPrefetchSize() == 0;
2018 int countBeforeFull = s.countBeforeFull();
2019 total += countBeforeFull;
2020 }
2021 }finally {
2022 consumersLock.readLock().unlock();
2023 }
2024 if (total == 0 && zeroPrefetch) {
2025 total = 1;
2026 }
2027 return total;
2028 }
2029
2030 /*
2031 * In slave mode, dispatch is ignored till we get this notification as the
2032 * dispatch process is non deterministic between master and slave. On a
2033 * notification, the actual dispatch to the subscription (as chosen by the
2034 * master) is completed. (non-Javadoc)
2035 * @see
2036 * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
2037 * (org.apache.activemq.command.MessageDispatchNotification)
2038 */
2039 @Override
2040 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
2041 // do dispatch
2042 Subscription sub = getMatchingSubscription(messageDispatchNotification);
2043 if (sub != null) {
2044 MessageReference message = getMatchingMessage(messageDispatchNotification);
2045 sub.add(message);
2046 sub.processMessageDispatchNotification(messageDispatchNotification);
2047 }
2048 }
2049
2050 private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
2051 throws Exception {
2052 QueueMessageReference message = null;
2053 MessageId messageId = messageDispatchNotification.getMessageId();
2054
2055 pagedInPendingDispatchLock.writeLock().lock();
2056 try {
2057 for (MessageReference ref : pagedInPendingDispatch) {
2058 if (messageId.equals(ref.getMessageId())) {
2059 message = (QueueMessageReference)ref;
2060 pagedInPendingDispatch.remove(ref);
2061 break;
2062 }
2063 }
2064 } finally {
2065 pagedInPendingDispatchLock.writeLock().unlock();
2066 }
2067
2068 if (message == null) {
2069 pagedInMessagesLock.readLock().lock();
2070 try {
2071 message = pagedInMessages.get(messageId);
2072 } finally {
2073 pagedInMessagesLock.readLock().unlock();
2074 }
2075 }
2076
2077 if (message == null) {
2078 messagesLock.writeLock().lock();
2079 try {
2080 try {
2081 messages.setMaxBatchSize(getMaxPageSize());
2082 messages.reset();
2083 while (messages.hasNext()) {
2084 MessageReference node = messages.next();
2085 messages.remove();
2086 if (messageId.equals(node.getMessageId())) {
2087 message = this.createMessageReference(node.getMessage());
2088 break;
2089 }
2090 }
2091 } finally {
2092 messages.release();
2093 }
2094 } finally {
2095 messagesLock.writeLock().unlock();
2096 }
2097 }
2098
2099 if (message == null) {
2100 Message msg = loadMessage(messageId);
2101 if (msg != null) {
2102 message = this.createMessageReference(msg);
2103 }
2104 }
2105
2106 if (message == null) {
2107 throw new JMSException("Slave broker out of sync with master - Message: "
2108 + messageDispatchNotification.getMessageId() + " on "
2109 + messageDispatchNotification.getDestination() + " does not exist among pending("
2110 + pagedInPendingDispatch.size() + ") for subscription: "
2111 + messageDispatchNotification.getConsumerId());
2112 }
2113 return message;
2114 }
2115
2116 /**
2117 * Find a consumer that matches the id in the message dispatch notification
2118 *
2119 * @param messageDispatchNotification
2120 * @return sub or null if the subscription has been removed before dispatch
2121 * @throws JMSException
2122 */
2123 private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
2124 throws JMSException {
2125 Subscription sub = null;
2126 consumersLock.readLock().lock();
2127 try {
2128 for (Subscription s : consumers) {
2129 if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
2130 sub = s;
2131 break;
2132 }
2133 }
2134 }finally {
2135 consumersLock.readLock().unlock();
2136 }
2137 return sub;
2138 }
2139
2140 public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
2141 if (oldPercentUsage > newPercentUsage) {
2142 asyncWakeup();
2143 }
2144 }
2145
2146 @Override
2147 protected Logger getLog() {
2148 return LOG;
2149 }
2150
2151 protected boolean isOptimizeStorage(){
2152 boolean result = false;
2153 if (isDoOptimzeMessageStorage()){
2154 consumersLock.readLock().lock();
2155 try{
2156 if (consumers.isEmpty()==false){
2157 result = true;
2158 for (Subscription s : consumers) {
2159 if (s.getPrefetchSize()==0){
2160 result = false;
2161 break;
2162 }
2163 if (s.isSlowConsumer()){
2164 result = false;
2165 break;
2166 }
2167 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
2168 result = false;
2169 break;
2170 }
2171 }
2172 }
2173 }finally {
2174 consumersLock.readLock().unlock();
2175 }
2176 }
2177 return result;
2178 }
2179 }