001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.concurrent.CountDownLatch;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicInteger;
026
027 import javax.jms.InvalidSelectorException;
028 import javax.jms.JMSException;
029
030 import org.apache.activemq.broker.Broker;
031 import org.apache.activemq.broker.ConnectionContext;
032 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
034 import org.apache.activemq.command.ActiveMQMessage;
035 import org.apache.activemq.command.ConsumerControl;
036 import org.apache.activemq.command.ConsumerInfo;
037 import org.apache.activemq.command.Message;
038 import org.apache.activemq.command.MessageAck;
039 import org.apache.activemq.command.MessageDispatch;
040 import org.apache.activemq.command.MessageDispatchNotification;
041 import org.apache.activemq.command.MessageId;
042 import org.apache.activemq.command.MessagePull;
043 import org.apache.activemq.command.Response;
044 import org.apache.activemq.thread.Scheduler;
045 import org.apache.activemq.transaction.Synchronization;
046 import org.apache.activemq.usage.SystemUsage;
047 import org.slf4j.Logger;
048 import org.slf4j.LoggerFactory;
049
050 /**
051 * A subscription that honors the pre-fetch option of the ConsumerInfo.
052 */
053 public abstract class PrefetchSubscription extends AbstractSubscription {
054
055 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056 protected final Scheduler scheduler;
057
058 protected PendingMessageCursor pending;
059 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
060 protected final AtomicInteger prefetchExtension = new AtomicInteger();
061 protected boolean usePrefetchExtension = true;
062 protected long enqueueCounter;
063 protected long dispatchCounter;
064 protected long dequeueCounter;
065 private int maxProducersToAudit=32;
066 private int maxAuditDepth=2048;
067 protected final SystemUsage usageManager;
068 protected final Object pendingLock = new Object();
069 protected final Object dispatchLock = new Object();
070 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
071
072 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
073 super(broker,context, info);
074 this.usageManager=usageManager;
075 pending = cursor;
076 this.scheduler = broker.getScheduler();
077 }
078
079 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
080 this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
081 }
082
083 /**
084 * Allows a message to be pulled on demand by a client
085 */
086 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
087 // The slave should not deliver pull messages.
088 // TODO: when the slave becomes a master, He should send a NULL message to all the
089 // consumers to 'wake them up' in case they were waiting for a message.
090 if (getPrefetchSize() == 0 && !isSlave()) {
091
092 prefetchExtension.incrementAndGet();
093 final long dispatchCounterBeforePull = dispatchCounter;
094
095 // Have the destination push us some messages.
096 for (Destination dest : destinations) {
097 dest.iterate();
098 }
099 dispatchPending();
100
101 synchronized(this) {
102 // If there was nothing dispatched.. we may need to setup a timeout.
103 if (dispatchCounterBeforePull == dispatchCounter) {
104 // immediate timeout used by receiveNoWait()
105 if (pull.getTimeout() == -1) {
106 // Send a NULL message.
107 add(QueueMessageReference.NULL_MESSAGE);
108 dispatchPending();
109 }
110 if (pull.getTimeout() > 0) {
111 scheduler.executeAfterDelay(new Runnable() {
112 @Override
113 public void run() {
114 pullTimeout(dispatchCounterBeforePull);
115 }
116 }, pull.getTimeout());
117 }
118 }
119 }
120 }
121 return null;
122 }
123
124 /**
125 * Occurs when a pull times out. If nothing has been dispatched since the
126 * timeout was setup, then send the NULL message.
127 */
128 final void pullTimeout(long dispatchCounterBeforePull) {
129 synchronized (pendingLock) {
130 if (dispatchCounterBeforePull == dispatchCounter) {
131 try {
132 add(QueueMessageReference.NULL_MESSAGE);
133 dispatchPending();
134 } catch (Exception e) {
135 context.getConnection().serviceException(e);
136 }
137 }
138 }
139 }
140
141 public void add(MessageReference node) throws Exception {
142 synchronized (pendingLock) {
143 // The destination may have just been removed...
144 if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
145 // perhaps we should inform the caller that we are no longer valid to dispatch to?
146 return;
147 }
148 enqueueCounter++;
149 pending.addMessageLast(node);
150 }
151 dispatchPending();
152 }
153
154 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
155 synchronized(pendingLock) {
156 try {
157 pending.reset();
158 while (pending.hasNext()) {
159 MessageReference node = pending.next();
160 node.decrementReferenceCount();
161 if (node.getMessageId().equals(mdn.getMessageId())) {
162 // Synchronize between dispatched list and removal of messages from pending list
163 // related to remove subscription action
164 synchronized(dispatchLock) {
165 pending.remove();
166 createMessageDispatch(node, node.getMessage());
167 dispatched.add(node);
168 onDispatch(node, node.getMessage());
169 }
170 return;
171 }
172 }
173 } finally {
174 pending.release();
175 }
176 }
177 throw new JMSException(
178 "Slave broker out of sync with master: Dispatched message ("
179 + mdn.getMessageId() + ") was not in the pending list for "
180 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
181 }
182
183 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
184 // Handle the standard acknowledgment case.
185 boolean callDispatchMatched = false;
186 Destination destination = null;
187
188 if (!isSlave()) {
189 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
190 // suppress unexpected ack exception in this expected case
191 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
192 return;
193 }
194 }
195 if (LOG.isTraceEnabled()) {
196 LOG.trace("ack:" + ack);
197 }
198 synchronized(dispatchLock) {
199 if (ack.isStandardAck()) {
200 // First check if the ack matches the dispatched. When using failover this might
201 // not be the case. We don't ever want to ack the wrong messages.
202 assertAckMatchesDispatched(ack);
203
204 // Acknowledge all dispatched messages up till the message id of
205 // the acknowledgment.
206 int index = 0;
207 boolean inAckRange = false;
208 List<MessageReference> removeList = new ArrayList<MessageReference>();
209 for (final MessageReference node : dispatched) {
210 MessageId messageId = node.getMessageId();
211 if (ack.getFirstMessageId() == null
212 || ack.getFirstMessageId().equals(messageId)) {
213 inAckRange = true;
214 }
215 if (inAckRange) {
216 // Don't remove the nodes until we are committed.
217 if (!context.isInTransaction()) {
218 dequeueCounter++;
219 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
220 removeList.add(node);
221 } else {
222 registerRemoveSync(context, node);
223 }
224 index++;
225 acknowledge(context, ack, node);
226 if (ack.getLastMessageId().equals(messageId)) {
227 // contract prefetch if dispatch required a pull
228 if (getPrefetchSize() == 0) {
229 // Protect extension update against parallel updates.
230 while (true) {
231 int currentExtension = prefetchExtension.get();
232 int newExtension = Math.max(0, currentExtension - index);
233 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
234 break;
235 }
236 }
237 } else if (usePrefetchExtension && context.isInTransaction()) {
238 // extend prefetch window only if not a pulling consumer
239 while (true) {
240 int currentExtension = prefetchExtension.get();
241 int newExtension = Math.max(currentExtension, index);
242 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
243 break;
244 }
245 }
246 }
247 destination = node.getRegionDestination();
248 callDispatchMatched = true;
249 break;
250 }
251 }
252 }
253 for (final MessageReference node : removeList) {
254 dispatched.remove(node);
255 }
256 // this only happens after a reconnect - get an ack which is not
257 // valid
258 if (!callDispatchMatched) {
259 LOG.warn("Could not correlate acknowledgment with dispatched message: "
260 + ack);
261 }
262 } else if (ack.isIndividualAck()) {
263 // Message was delivered and acknowledge - but only delete the
264 // individual message
265 for (final MessageReference node : dispatched) {
266 MessageId messageId = node.getMessageId();
267 if (ack.getLastMessageId().equals(messageId)) {
268 // Don't remove the nodes until we are committed - immediateAck option
269 if (!context.isInTransaction()) {
270 dequeueCounter++;
271 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
272 dispatched.remove(node);
273 } else {
274 registerRemoveSync(context, node);
275 }
276
277 // Protect extension update against parallel updates.
278 while (true) {
279 int currentExtension = prefetchExtension.get();
280 int newExtension = Math.max(0, currentExtension - 1);
281 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
282 break;
283 }
284 }
285 acknowledge(context, ack, node);
286 destination = node.getRegionDestination();
287 callDispatchMatched = true;
288 break;
289 }
290 }
291 }else if (ack.isDeliveredAck()) {
292 // Message was delivered but not acknowledged: update pre-fetch
293 // counters.
294 int index = 0;
295 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
296 final MessageReference node = iter.next();
297 if (node.isExpired()) {
298 if (broker.isExpired(node)) {
299 node.getRegionDestination().messageExpired(context, this, node);
300 }
301 iter.remove();
302 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
303 }
304 if (ack.getLastMessageId().equals(node.getMessageId())) {
305 if (usePrefetchExtension) {
306 while (true) {
307 int currentExtension = prefetchExtension.get();
308 int newExtension = Math.max(currentExtension, index + 1);
309 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
310 break;
311 }
312 }
313 }
314 destination = node.getRegionDestination();
315 callDispatchMatched = true;
316 break;
317 }
318 }
319 if (!callDispatchMatched) {
320 throw new JMSException(
321 "Could not correlate acknowledgment with dispatched message: "
322 + ack);
323 }
324 } else if (ack.isRedeliveredAck()) {
325 // Message was re-delivered but it was not yet considered to be
326 // a DLQ message.
327 boolean inAckRange = false;
328 for (final MessageReference node : dispatched) {
329 MessageId messageId = node.getMessageId();
330 if (ack.getFirstMessageId() == null
331 || ack.getFirstMessageId().equals(messageId)) {
332 inAckRange = true;
333 }
334 if (inAckRange) {
335 if (ack.getLastMessageId().equals(messageId)) {
336 destination = node.getRegionDestination();
337 callDispatchMatched = true;
338 break;
339 }
340 }
341 }
342 if (!callDispatchMatched) {
343 throw new JMSException(
344 "Could not correlate acknowledgment with dispatched message: "
345 + ack);
346 }
347 } else if (ack.isPoisonAck()) {
348 // TODO: what if the message is already in a DLQ???
349 // Handle the poison ACK case: we need to send the message to a
350 // DLQ
351 if (ack.isInTransaction()) {
352 throw new JMSException("Poison ack cannot be transacted: "
353 + ack);
354 }
355 int index = 0;
356 boolean inAckRange = false;
357 List<MessageReference> removeList = new ArrayList<MessageReference>();
358 for (final MessageReference node : dispatched) {
359 MessageId messageId = node.getMessageId();
360 if (ack.getFirstMessageId() == null
361 || ack.getFirstMessageId().equals(messageId)) {
362 inAckRange = true;
363 }
364 if (inAckRange) {
365 if (ack.getPoisonCause() != null) {
366 node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
367 ack.getPoisonCause().toString());
368 }
369 sendToDLQ(context, node);
370 node.getRegionDestination().getDestinationStatistics()
371 .getInflight().decrement();
372 removeList.add(node);
373 dequeueCounter++;
374 index++;
375 acknowledge(context, ack, node);
376 if (ack.getLastMessageId().equals(messageId)) {
377 while (true) {
378 int currentExtension = prefetchExtension.get();
379 int newExtension = Math.max(0, currentExtension - (index + 1));
380 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
381 break;
382 }
383 }
384 destination = node.getRegionDestination();
385 callDispatchMatched = true;
386 break;
387 }
388 }
389 }
390 for (final MessageReference node : removeList) {
391 dispatched.remove(node);
392 }
393 if (!callDispatchMatched) {
394 throw new JMSException(
395 "Could not correlate acknowledgment with dispatched message: "
396 + ack);
397 }
398 }
399 }
400 if (callDispatchMatched && destination != null) {
401 destination.wakeup();
402 dispatchPending();
403 } else {
404 if (isSlave()) {
405 throw new JMSException(
406 "Slave broker out of sync with master: Acknowledgment ("
407 + ack + ") was not in the dispatch list: "
408 + dispatched);
409 } else {
410 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
411 + ack);
412 }
413 }
414 }
415
416 private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
417 // setup a Synchronization to remove nodes from the
418 // dispatched list.
419 context.getTransaction().addSynchronization(
420 new Synchronization() {
421
422 @Override
423 public void afterCommit()
424 throws Exception {
425 synchronized(dispatchLock) {
426 dequeueCounter++;
427 dispatched.remove(node);
428 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
429 }
430 }
431
432 @Override
433 public void afterRollback() throws Exception {
434 synchronized(dispatchLock) {
435 if (isSlave()) {
436 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
437 } else {
438 // poisionAck will decrement - otherwise still inflight on client
439 }
440 }
441 }
442 });
443 }
444
445 /**
446 * Checks an ack versus the contents of the dispatched list.
447 * called with dispatchLock held
448 * @param ack
449 * @throws JMSException if it does not match
450 */
451 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
452 MessageId firstAckedMsg = ack.getFirstMessageId();
453 MessageId lastAckedMsg = ack.getLastMessageId();
454 int checkCount = 0;
455 boolean checkFoundStart = false;
456 boolean checkFoundEnd = false;
457 for (MessageReference node : dispatched) {
458
459 if (firstAckedMsg == null) {
460 checkFoundStart = true;
461 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
462 checkFoundStart = true;
463 }
464
465 if (checkFoundStart) {
466 checkCount++;
467 }
468
469 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
470 checkFoundEnd = true;
471 break;
472 }
473 }
474 if (!checkFoundStart && firstAckedMsg != null)
475 throw new JMSException("Unmatched acknowledge: " + ack
476 + "; Could not find Message-ID " + firstAckedMsg
477 + " in dispatched-list (start of ack)");
478 if (!checkFoundEnd && lastAckedMsg != null)
479 throw new JMSException("Unmatched acknowledge: " + ack
480 + "; Could not find Message-ID " + lastAckedMsg
481 + " in dispatched-list (end of ack)");
482 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
483 throw new JMSException("Unmatched acknowledge: " + ack
484 + "; Expected message count (" + ack.getMessageCount()
485 + ") differs from count in dispatched-list (" + checkCount
486 + ")");
487 }
488 }
489
490 /**
491 * @param context
492 * @param node
493 * @throws IOException
494 * @throws Exception
495 */
496 protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
497 broker.getRoot().sendToDeadLetterQueue(context, node, this);
498 }
499
500 public int getInFlightSize() {
501 return dispatched.size();
502 }
503
504 /**
505 * Used to determine if the broker can dispatch to the consumer.
506 *
507 * @return
508 */
509 public boolean isFull() {
510 return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
511 }
512
513 /**
514 * @return true when 60% or more room is left for dispatching messages
515 */
516 public boolean isLowWaterMark() {
517 return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
518 }
519
520 /**
521 * @return true when 10% or less room is left for dispatching messages
522 */
523 public boolean isHighWaterMark() {
524 return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
525 }
526
527 @Override
528 public int countBeforeFull() {
529 return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
530 }
531
532 public int getPendingQueueSize() {
533 return pending.size();
534 }
535
536 public int getDispatchedQueueSize() {
537 return dispatched.size();
538 }
539
540 public long getDequeueCounter() {
541 return dequeueCounter;
542 }
543
544 public long getDispatchedCounter() {
545 return dispatchCounter;
546 }
547
548 public long getEnqueueCounter() {
549 return enqueueCounter;
550 }
551
552 @Override
553 public boolean isRecoveryRequired() {
554 return pending.isRecoveryRequired();
555 }
556
557 public PendingMessageCursor getPending() {
558 return this.pending;
559 }
560
561 public void setPending(PendingMessageCursor pending) {
562 this.pending = pending;
563 if (this.pending!=null) {
564 this.pending.setSystemUsage(usageManager);
565 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
566 }
567 }
568
569 @Override
570 public void add(ConnectionContext context, Destination destination) throws Exception {
571 synchronized(pendingLock) {
572 super.add(context, destination);
573 pending.add(context, destination);
574 }
575 }
576
577 @Override
578 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
579 List<MessageReference> rc = new ArrayList<MessageReference>();
580 synchronized(pendingLock) {
581 super.remove(context, destination);
582 // Here is a potential problem concerning Inflight stat:
583 // Messages not already committed or rolled back may not be removed from dispatched list at the moment
584 // Except if each commit or rollback callback action comes before remove of subscriber.
585 rc.addAll(pending.remove(context, destination));
586
587 // Synchronized to DispatchLock
588 synchronized(dispatchLock) {
589 ArrayList<MessageReference> references = new ArrayList<MessageReference>();
590 for (MessageReference r : dispatched) {
591 if( r.getRegionDestination() == destination) {
592 references.add(r);
593 }
594 }
595 rc.addAll(references);
596 destination.getDestinationStatistics().getDispatched().subtract(references.size());
597 destination.getDestinationStatistics().getInflight().subtract(references.size());
598 dispatched.removeAll(references);
599 }
600 }
601 return rc;
602 }
603
604 protected void dispatchPending() throws IOException {
605 if (!isSlave()) {
606 synchronized(pendingLock) {
607 try {
608 int numberToDispatch = countBeforeFull();
609 if (numberToDispatch > 0) {
610 setSlowConsumer(false);
611 setPendingBatchSize(pending, numberToDispatch);
612 int count = 0;
613 pending.reset();
614 while (pending.hasNext() && !isFull()
615 && count < numberToDispatch) {
616 MessageReference node = pending.next();
617 if (node == null) {
618 break;
619 }
620
621 // Synchronize between dispatched list and remove of message from pending list
622 // related to remove subscription action
623 synchronized(dispatchLock) {
624 pending.remove();
625 node.decrementReferenceCount();
626 if( !isDropped(node) && canDispatch(node)) {
627
628 // Message may have been sitting in the pending
629 // list a while waiting for the consumer to ak the message.
630 if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
631 //increment number to dispatch
632 numberToDispatch++;
633 if (broker.isExpired(node)) {
634 node.getRegionDestination().messageExpired(context, this, node);
635 }
636 continue;
637 }
638 dispatch(node);
639 count++;
640 }
641 }
642 }
643 } else if (!isSlowConsumer()) {
644 setSlowConsumer(true);
645 for (Destination dest :destinations) {
646 dest.slowConsumer(context, this);
647 }
648 }
649 } finally {
650 pending.release();
651 }
652 }
653 }
654 }
655
656 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
657 pending.setMaxBatchSize(numberToDispatch);
658 }
659
660 // called with dispatchLock held
661 protected boolean dispatch(final MessageReference node) throws IOException {
662 final Message message = node.getMessage();
663 if (message == null) {
664 return false;
665 }
666
667 okForAckAsDispatchDone.countDown();
668
669 // No reentrant lock - Patch needed to IndirectMessageReference on method lock
670 if (!isSlave()) {
671
672 MessageDispatch md = createMessageDispatch(node, message);
673 // NULL messages don't count... they don't get Acked.
674 if (node != QueueMessageReference.NULL_MESSAGE) {
675 dispatchCounter++;
676 dispatched.add(node);
677 } else {
678 while (true) {
679 int currentExtension = prefetchExtension.get();
680 int newExtension = Math.max(0, currentExtension - 1);
681 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
682 break;
683 }
684 }
685 }
686 if (info.isDispatchAsync()) {
687 md.setTransmitCallback(new Runnable() {
688
689 public void run() {
690 // Since the message gets queued up in async dispatch,
691 // we don't want to
692 // decrease the reference count until it gets put on the
693 // wire.
694 onDispatch(node, message);
695 }
696 });
697 context.getConnection().dispatchAsync(md);
698 } else {
699 context.getConnection().dispatchSync(md);
700 onDispatch(node, message);
701 }
702 return true;
703 } else {
704 return false;
705 }
706 }
707
708 protected void onDispatch(final MessageReference node, final Message message) {
709 if (node.getRegionDestination() != null) {
710 if (node != QueueMessageReference.NULL_MESSAGE) {
711 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
712 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
713 if (LOG.isTraceEnabled()) {
714 LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
715 + message.getDestination() + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
716 }
717 }
718 }
719
720 if (info.isDispatchAsync()) {
721 try {
722 dispatchPending();
723 } catch (IOException e) {
724 context.getConnection().serviceExceptionAsync(e);
725 }
726 }
727 }
728
729 /**
730 * inform the MessageConsumer on the client to change it's prefetch
731 *
732 * @param newPrefetch
733 */
734 public void updateConsumerPrefetch(int newPrefetch) {
735 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
736 ConsumerControl cc = new ConsumerControl();
737 cc.setConsumerId(info.getConsumerId());
738 cc.setPrefetch(newPrefetch);
739 context.getConnection().dispatchAsync(cc);
740 }
741 }
742
743 /**
744 * @param node
745 * @param message
746 * @return MessageDispatch
747 */
748 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
749 MessageDispatch md = new MessageDispatch();
750 md.setConsumerId(info.getConsumerId());
751
752 if (node == QueueMessageReference.NULL_MESSAGE) {
753 md.setMessage(null);
754 md.setDestination(null);
755 } else {
756 md.setDestination(node.getRegionDestination().getActiveMQDestination());
757 md.setMessage(message);
758 md.setRedeliveryCounter(node.getRedeliveryCounter());
759 }
760
761 return md;
762 }
763
764 /**
765 * Use when a matched message is about to be dispatched to the client.
766 *
767 * @param node
768 * @return false if the message should not be dispatched to the client
769 * (another sub may have already dispatched it for example).
770 * @throws IOException
771 */
772 protected abstract boolean canDispatch(MessageReference node) throws IOException;
773
774 protected abstract boolean isDropped(MessageReference node);
775
776 /**
777 * Used during acknowledgment to remove the message.
778 *
779 * @throws IOException
780 */
781 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
782
783
784 public int getMaxProducersToAudit() {
785 return maxProducersToAudit;
786 }
787
788 public void setMaxProducersToAudit(int maxProducersToAudit) {
789 this.maxProducersToAudit = maxProducersToAudit;
790 }
791
792 public int getMaxAuditDepth() {
793 return maxAuditDepth;
794 }
795
796 public void setMaxAuditDepth(int maxAuditDepth) {
797 this.maxAuditDepth = maxAuditDepth;
798 }
799
800 public boolean isUsePrefetchExtension() {
801 return usePrefetchExtension;
802 }
803
804 public void setUsePrefetchExtension(boolean usePrefetchExtension) {
805 this.usePrefetchExtension = usePrefetchExtension;
806 }
807
808 protected int getPrefetchExtension() {
809 return this.prefetchExtension.get();
810 }
811 }