001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.LinkedList;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027 import java.util.concurrent.ExecutorService;
028 import java.util.concurrent.Executors;
029 import java.util.concurrent.TimeUnit;
030 import java.util.concurrent.atomic.AtomicBoolean;
031 import java.util.concurrent.atomic.AtomicReference;
032
033 import javax.jms.IllegalStateException;
034 import javax.jms.InvalidDestinationException;
035 import javax.jms.JMSException;
036 import javax.jms.Message;
037 import javax.jms.MessageConsumer;
038 import javax.jms.MessageListener;
039 import javax.jms.TransactionRolledBackException;
040
041 import org.apache.activemq.blob.BlobDownloader;
042 import org.apache.activemq.command.ActiveMQBlobMessage;
043 import org.apache.activemq.command.ActiveMQDestination;
044 import org.apache.activemq.command.ActiveMQMessage;
045 import org.apache.activemq.command.ActiveMQTempDestination;
046 import org.apache.activemq.command.CommandTypes;
047 import org.apache.activemq.command.ConsumerId;
048 import org.apache.activemq.command.ConsumerInfo;
049 import org.apache.activemq.command.MessageAck;
050 import org.apache.activemq.command.MessageDispatch;
051 import org.apache.activemq.command.MessageId;
052 import org.apache.activemq.command.MessagePull;
053 import org.apache.activemq.command.RemoveInfo;
054 import org.apache.activemq.command.TransactionId;
055 import org.apache.activemq.management.JMSConsumerStatsImpl;
056 import org.apache.activemq.management.StatsCapable;
057 import org.apache.activemq.management.StatsImpl;
058 import org.apache.activemq.selector.SelectorParser;
059 import org.apache.activemq.transaction.Synchronization;
060 import org.apache.activemq.util.Callback;
061 import org.apache.activemq.util.IntrospectionSupport;
062 import org.apache.activemq.util.JMSExceptionSupport;
063 import org.slf4j.Logger;
064 import org.slf4j.LoggerFactory;
065
066 /**
067 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
068 * from a destination. A <CODE> MessageConsumer</CODE> object is created by
069 * passing a <CODE>Destination</CODE> object to a message-consumer creation
070 * method supplied by a session.
071 * <P>
072 * <CODE>MessageConsumer</CODE> is the parent interface for all message
073 * consumers.
074 * <P>
075 * A message consumer can be created with a message selector. A message selector
076 * allows the client to restrict the messages delivered to the message consumer
077 * to those that match the selector.
078 * <P>
079 * A client may either synchronously receive a message consumer's messages or
080 * have the consumer asynchronously deliver them as they arrive.
081 * <P>
082 * For synchronous receipt, a client can request the next message from a message
083 * consumer using one of its <CODE> receive</CODE> methods. There are several
084 * variations of <CODE>receive</CODE> that allow a client to poll or wait for
085 * the next message.
086 * <P>
087 * For asynchronous delivery, a client can register a
088 * <CODE>MessageListener</CODE> object with a message consumer. As messages
089 * arrive at the message consumer, it delivers them by calling the
090 * <CODE>MessageListener</CODE>'s<CODE>
091 * onMessage</CODE> method.
092 * <P>
093 * It is a client programming error for a <CODE>MessageListener</CODE> to
094 * throw an exception.
095 *
096 *
097 * @see javax.jms.MessageConsumer
098 * @see javax.jms.QueueReceiver
099 * @see javax.jms.TopicSubscriber
100 * @see javax.jms.Session
101 */
102 public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
103
104 @SuppressWarnings("serial")
105 class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
106 final TransactionId transactionId;
107 public PreviouslyDeliveredMap(TransactionId transactionId) {
108 this.transactionId = transactionId;
109 }
110 }
111
112 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
113 protected final ActiveMQSession session;
114 protected final ConsumerInfo info;
115
116 // These are the messages waiting to be delivered to the client
117 protected final MessageDispatchChannel unconsumedMessages;
118
119 // The are the messages that were delivered to the consumer but that have
120 // not been acknowledged. It's kept in reverse order since we
121 // Always walk list in reverse order.
122 private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
123 // track duplicate deliveries in a transaction such that the tx integrity can be validated
124 private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
125 private int deliveredCounter;
126 private int additionalWindowSize;
127 private long redeliveryDelay;
128 private int ackCounter;
129 private int dispatchedCount;
130 private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
131 private final JMSConsumerStatsImpl stats;
132
133 private final String selector;
134 private boolean synchronizationRegistered;
135 private final AtomicBoolean started = new AtomicBoolean(false);
136
137 private MessageAvailableListener availableListener;
138
139 private RedeliveryPolicy redeliveryPolicy;
140 private boolean optimizeAcknowledge;
141 private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
142 private ExecutorService executorService;
143 private MessageTransformer transformer;
144 private boolean clearDispatchList;
145 boolean inProgressClearRequiredFlag;
146
147 private MessageAck pendingAck;
148 private long lastDeliveredSequenceId;
149
150 private IOException failureError;
151
152 private long optimizeAckTimestamp = System.currentTimeMillis();
153 private long optimizeAcknowledgeTimeOut = 0;
154 private long failoverRedeliveryWaitPeriod = 0;
155 private boolean transactedIndividualAck = false;
156 private boolean nonBlockingRedelivery = false;
157
158 /**
159 * Create a MessageConsumer
160 *
161 * @param session
162 * @param dest
163 * @param name
164 * @param selector
165 * @param prefetch
166 * @param maximumPendingMessageCount
167 * @param noLocal
168 * @param browser
169 * @param dispatchAsync
170 * @param messageListener
171 * @throws JMSException
172 */
173 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
174 String name, String selector, int prefetch,
175 int maximumPendingMessageCount, boolean noLocal, boolean browser,
176 boolean dispatchAsync, MessageListener messageListener) throws JMSException {
177 if (dest == null) {
178 throw new InvalidDestinationException("Don't understand null destinations");
179 } else if (dest.getPhysicalName() == null) {
180 throw new InvalidDestinationException("The destination object was not given a physical name.");
181 } else if (dest.isTemporary()) {
182 String physicalName = dest.getPhysicalName();
183
184 if (physicalName == null) {
185 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
186 }
187
188 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
189
190 if (physicalName.indexOf(connectionID) < 0) {
191 throw new InvalidDestinationException(
192 "Cannot use a Temporary destination from another Connection");
193 }
194
195 if (session.connection.isDeleted(dest)) {
196 throw new InvalidDestinationException(
197 "Cannot use a Temporary destination that has been deleted");
198 }
199 if (prefetch < 0) {
200 throw new JMSException("Cannot have a prefetch size less than zero");
201 }
202 }
203 if (session.connection.isMessagePrioritySupported()) {
204 this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
205 }else {
206 this.unconsumedMessages = new FifoMessageDispatchChannel();
207 }
208
209 this.session = session;
210 this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
211 setTransformer(session.getTransformer());
212
213 this.info = new ConsumerInfo(consumerId);
214 this.info.setExclusive(this.session.connection.isExclusiveConsumer());
215 this.info.setSubscriptionName(name);
216 this.info.setPrefetchSize(prefetch);
217 this.info.setCurrentPrefetchSize(prefetch);
218 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
219 this.info.setNoLocal(noLocal);
220 this.info.setDispatchAsync(dispatchAsync);
221 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
222 this.info.setSelector(null);
223
224 // Allows the options on the destination to configure the consumerInfo
225 if (dest.getOptions() != null) {
226 Map<String, Object> options = IntrospectionSupport.extractProperties(
227 new HashMap<String, Object>(dest.getOptions()), "consumer.");
228 IntrospectionSupport.setProperties(this.info, options);
229 if (options.size() > 0) {
230 String msg = "There are " + options.size()
231 + " consumer options that couldn't be set on the consumer."
232 + " Check the options are spelled correctly."
233 + " Unknown parameters=[" + options + "]."
234 + " This consumer cannot be started.";
235 LOG.warn(msg);
236 throw new ConfigurationException(msg);
237 }
238 }
239
240 this.info.setDestination(dest);
241 this.info.setBrowser(browser);
242 if (selector != null && selector.trim().length() != 0) {
243 // Validate the selector
244 SelectorParser.parse(selector);
245 this.info.setSelector(selector);
246 this.selector = selector;
247 } else if (info.getSelector() != null) {
248 // Validate the selector
249 SelectorParser.parse(this.info.getSelector());
250 this.selector = this.info.getSelector();
251 } else {
252 this.selector = null;
253 }
254
255 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
256 this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
257 && !info.isBrowser();
258 if (this.optimizeAcknowledge) {
259 this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
260 }
261 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
262 this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
263 this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
264 this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
265 if (messageListener != null) {
266 setMessageListener(messageListener);
267 }
268 try {
269 this.session.addConsumer(this);
270 this.session.syncSendPacket(info);
271 } catch (JMSException e) {
272 this.session.removeConsumer(this);
273 throw e;
274 }
275
276 if (session.connection.isStarted()) {
277 start();
278 }
279 }
280
281 private boolean isAutoAcknowledgeEach() {
282 return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
283 }
284
285 private boolean isAutoAcknowledgeBatch() {
286 return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
287 }
288
289 public StatsImpl getStats() {
290 return stats;
291 }
292
293 public JMSConsumerStatsImpl getConsumerStats() {
294 return stats;
295 }
296
297 public RedeliveryPolicy getRedeliveryPolicy() {
298 return redeliveryPolicy;
299 }
300
301 /**
302 * Sets the redelivery policy used when messages are redelivered
303 */
304 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
305 this.redeliveryPolicy = redeliveryPolicy;
306 }
307
308 public MessageTransformer getTransformer() {
309 return transformer;
310 }
311
312 /**
313 * Sets the transformer used to transform messages before they are sent on
314 * to the JMS bus
315 */
316 public void setTransformer(MessageTransformer transformer) {
317 this.transformer = transformer;
318 }
319
320 /**
321 * @return Returns the value.
322 */
323 public ConsumerId getConsumerId() {
324 return info.getConsumerId();
325 }
326
327 /**
328 * @return the consumer name - used for durable consumers
329 */
330 public String getConsumerName() {
331 return this.info.getSubscriptionName();
332 }
333
334 /**
335 * @return true if this consumer does not accept locally produced messages
336 */
337 protected boolean isNoLocal() {
338 return info.isNoLocal();
339 }
340
341 /**
342 * Retrieve is a browser
343 *
344 * @return true if a browser
345 */
346 protected boolean isBrowser() {
347 return info.isBrowser();
348 }
349
350 /**
351 * @return ActiveMQDestination
352 */
353 protected ActiveMQDestination getDestination() {
354 return info.getDestination();
355 }
356
357 /**
358 * @return Returns the prefetchNumber.
359 */
360 public int getPrefetchNumber() {
361 return info.getPrefetchSize();
362 }
363
364 /**
365 * @return true if this is a durable topic subscriber
366 */
367 public boolean isDurableSubscriber() {
368 return info.getSubscriptionName() != null && info.getDestination().isTopic();
369 }
370
371 /**
372 * Gets this message consumer's message selector expression.
373 *
374 * @return this message consumer's message selector, or null if no message
375 * selector exists for the message consumer (that is, if the message
376 * selector was not set or was set to null or the empty string)
377 * @throws JMSException if the JMS provider fails to receive the next
378 * message due to some internal error.
379 */
380 public String getMessageSelector() throws JMSException {
381 checkClosed();
382 return selector;
383 }
384
385 /**
386 * Gets the message consumer's <CODE>MessageListener</CODE>.
387 *
388 * @return the listener for the message consumer, or null if no listener is
389 * set
390 * @throws JMSException if the JMS provider fails to get the message
391 * listener due to some internal error.
392 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
393 */
394 public MessageListener getMessageListener() throws JMSException {
395 checkClosed();
396 return this.messageListener.get();
397 }
398
399 /**
400 * Sets the message consumer's <CODE>MessageListener</CODE>.
401 * <P>
402 * Setting the message listener to null is the equivalent of unsetting the
403 * message listener for the message consumer.
404 * <P>
405 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
406 * while messages are being consumed by an existing listener or the consumer
407 * is being used to consume messages synchronously is undefined.
408 *
409 * @param listener the listener to which the messages are to be delivered
410 * @throws JMSException if the JMS provider fails to receive the next
411 * message due to some internal error.
412 * @see javax.jms.MessageConsumer#getMessageListener
413 */
414 public void setMessageListener(MessageListener listener) throws JMSException {
415 checkClosed();
416 if (info.getPrefetchSize() == 0) {
417 throw new JMSException(
418 "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
419 }
420 if (listener != null) {
421 boolean wasRunning = session.isRunning();
422 if (wasRunning) {
423 session.stop();
424 }
425
426 this.messageListener.set(listener);
427 session.redispatch(this, unconsumedMessages);
428
429 if (wasRunning) {
430 session.start();
431 }
432 } else {
433 this.messageListener.set(null);
434 }
435 }
436
437 public MessageAvailableListener getAvailableListener() {
438 return availableListener;
439 }
440
441 /**
442 * Sets the listener used to notify synchronous consumers that there is a
443 * message available so that the {@link MessageConsumer#receiveNoWait()} can
444 * be called.
445 */
446 public void setAvailableListener(MessageAvailableListener availableListener) {
447 this.availableListener = availableListener;
448 }
449
450 /**
451 * Used to get an enqueued message from the unconsumedMessages list. The
452 * amount of time this method blocks is based on the timeout value. - if
453 * timeout==-1 then it blocks until a message is received. - if timeout==0
454 * then it it tries to not block at all, it returns a message if it is
455 * available - if timeout>0 then it blocks up to timeout amount of time.
456 * Expired messages will consumed by this method.
457 *
458 * @throws JMSException
459 * @return null if we timeout or if the consumer is closed.
460 */
461 private MessageDispatch dequeue(long timeout) throws JMSException {
462 try {
463 long deadline = 0;
464 if (timeout > 0) {
465 deadline = System.currentTimeMillis() + timeout;
466 }
467 while (true) {
468 MessageDispatch md = unconsumedMessages.dequeue(timeout);
469 if (md == null) {
470 if (timeout > 0 && !unconsumedMessages.isClosed()) {
471 timeout = Math.max(deadline - System.currentTimeMillis(), 0);
472 } else {
473 if (failureError != null) {
474 throw JMSExceptionSupport.create(failureError);
475 } else {
476 return null;
477 }
478 }
479 } else if (md.getMessage() == null) {
480 return null;
481 } else if (md.getMessage().isExpired()) {
482 if (LOG.isDebugEnabled()) {
483 LOG.debug(getConsumerId() + " received expired message: " + md);
484 }
485 beforeMessageIsConsumed(md);
486 afterMessageIsConsumed(md, true);
487 if (timeout > 0) {
488 timeout = Math.max(deadline - System.currentTimeMillis(), 0);
489 }
490 } else {
491 if (LOG.isTraceEnabled()) {
492 LOG.trace(getConsumerId() + " received message: " + md);
493 }
494 return md;
495 }
496 }
497 } catch (InterruptedException e) {
498 Thread.currentThread().interrupt();
499 throw JMSExceptionSupport.create(e);
500 }
501 }
502
503 /**
504 * Receives the next message produced for this message consumer.
505 * <P>
506 * This call blocks indefinitely until a message is produced or until this
507 * message consumer is closed.
508 * <P>
509 * If this <CODE>receive</CODE> is done within a transaction, the consumer
510 * retains the message until the transaction commits.
511 *
512 * @return the next message produced for this message consumer, or null if
513 * this message consumer is concurrently closed
514 */
515 public Message receive() throws JMSException {
516 checkClosed();
517 checkMessageListener();
518
519 sendPullCommand(0);
520 MessageDispatch md = dequeue(-1);
521 if (md == null) {
522 return null;
523 }
524
525 beforeMessageIsConsumed(md);
526 afterMessageIsConsumed(md, false);
527
528 return createActiveMQMessage(md);
529 }
530
531 /**
532 * @param md
533 * @return
534 */
535 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
536 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
537 if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
538 ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
539 }
540 if (transformer != null) {
541 Message transformedMessage = transformer.consumerTransform(session, this, m);
542 if (transformedMessage != null) {
543 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
544 }
545 }
546 if (session.isClientAcknowledge()) {
547 m.setAcknowledgeCallback(new Callback() {
548 public void execute() throws Exception {
549 session.checkClosed();
550 session.acknowledge();
551 }
552 });
553 }else if (session.isIndividualAcknowledge()) {
554 m.setAcknowledgeCallback(new Callback() {
555 public void execute() throws Exception {
556 session.checkClosed();
557 acknowledge(md);
558 }
559 });
560 }
561 return m;
562 }
563
564 /**
565 * Receives the next message that arrives within the specified timeout
566 * interval.
567 * <P>
568 * This call blocks until a message arrives, the timeout expires, or this
569 * message consumer is closed. A <CODE>timeout</CODE> of zero never
570 * expires, and the call blocks indefinitely.
571 *
572 * @param timeout the timeout value (in milliseconds), a time out of zero
573 * never expires.
574 * @return the next message produced for this message consumer, or null if
575 * the timeout expires or this message consumer is concurrently
576 * closed
577 */
578 public Message receive(long timeout) throws JMSException {
579 checkClosed();
580 checkMessageListener();
581 if (timeout == 0) {
582 return this.receive();
583 }
584
585 sendPullCommand(timeout);
586 while (timeout > 0) {
587
588 MessageDispatch md;
589 if (info.getPrefetchSize() == 0) {
590 md = dequeue(-1); // We let the broker let us know when we timeout.
591 } else {
592 md = dequeue(timeout);
593 }
594
595 if (md == null) {
596 return null;
597 }
598
599 beforeMessageIsConsumed(md);
600 afterMessageIsConsumed(md, false);
601 return createActiveMQMessage(md);
602 }
603 return null;
604 }
605
606 /**
607 * Receives the next message if one is immediately available.
608 *
609 * @return the next message produced for this message consumer, or null if
610 * one is not available
611 * @throws JMSException if the JMS provider fails to receive the next
612 * message due to some internal error.
613 */
614 public Message receiveNoWait() throws JMSException {
615 checkClosed();
616 checkMessageListener();
617 sendPullCommand(-1);
618
619 MessageDispatch md;
620 if (info.getPrefetchSize() == 0) {
621 md = dequeue(-1); // We let the broker let us know when we
622 // timeout.
623 } else {
624 md = dequeue(0);
625 }
626
627 if (md == null) {
628 return null;
629 }
630
631 beforeMessageIsConsumed(md);
632 afterMessageIsConsumed(md, false);
633 return createActiveMQMessage(md);
634 }
635
636 /**
637 * Closes the message consumer.
638 * <P>
639 * Since a provider may allocate some resources on behalf of a <CODE>
640 * MessageConsumer</CODE>
641 * outside the Java virtual machine, clients should close them when they are
642 * not needed. Relying on garbage collection to eventually reclaim these
643 * resources may not be timely enough.
644 * <P>
645 * This call blocks until a <CODE>receive</CODE> or message listener in
646 * progress has completed. A blocked message consumer <CODE>receive </CODE>
647 * call returns null when this message consumer is closed.
648 *
649 * @throws JMSException if the JMS provider fails to close the consumer due
650 * to some internal error.
651 */
652 public void close() throws JMSException {
653 if (!unconsumedMessages.isClosed()) {
654 if (session.getTransactionContext().isInTransaction()) {
655 session.getTransactionContext().addSynchronization(new Synchronization() {
656 @Override
657 public void afterCommit() throws Exception {
658 doClose();
659 }
660
661 @Override
662 public void afterRollback() throws Exception {
663 doClose();
664 }
665 });
666 } else {
667 doClose();
668 }
669 }
670 }
671
672 void doClose() throws JMSException {
673 // Store interrupted state and clear so that Transport operations don't
674 // throw InterruptedException and we ensure that resources are clened up.
675 boolean interrupted = Thread.interrupted();
676 dispose();
677 RemoveInfo removeCommand = info.createRemoveCommand();
678 if (LOG.isDebugEnabled()) {
679 LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
680 }
681 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
682 this.session.asyncSendPacket(removeCommand);
683 if (interrupted) {
684 Thread.currentThread().interrupt();
685 } }
686
687 void inProgressClearRequired() {
688 inProgressClearRequiredFlag = true;
689 // deal with delivered messages async to avoid lock contention with in progress acks
690 clearDispatchList = true;
691 }
692
693 void clearMessagesInProgress() {
694 if (inProgressClearRequiredFlag) {
695 synchronized (unconsumedMessages.getMutex()) {
696 if (inProgressClearRequiredFlag) {
697 if (LOG.isDebugEnabled()) {
698 LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
699 }
700 // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
701 List<MessageDispatch> list = unconsumedMessages.removeAll();
702 if (!this.info.isBrowser()) {
703 for (MessageDispatch old : list) {
704 session.connection.rollbackDuplicate(this, old.getMessage());
705 }
706 }
707 // allow dispatch on this connection to resume
708 session.connection.transportInterruptionProcessingComplete();
709 inProgressClearRequiredFlag = false;
710 }
711 }
712 }
713 }
714
715 void deliverAcks() {
716 MessageAck ack = null;
717 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
718 if (isAutoAcknowledgeEach()) {
719 synchronized(deliveredMessages) {
720 ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
721 if (ack != null) {
722 deliveredMessages.clear();
723 ackCounter = 0;
724 } else {
725 ack = pendingAck;
726 pendingAck = null;
727 }
728 }
729 } else if (pendingAck != null && pendingAck.isStandardAck()) {
730 ack = pendingAck;
731 pendingAck = null;
732 }
733 if (ack != null) {
734 final MessageAck ackToSend = ack;
735
736 if (executorService == null) {
737 executorService = Executors.newSingleThreadExecutor();
738 }
739 executorService.submit(new Runnable() {
740 public void run() {
741 try {
742 session.sendAck(ackToSend,true);
743 } catch (JMSException e) {
744 LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
745 } finally {
746 deliveryingAcknowledgements.set(false);
747 }
748 }
749 });
750 } else {
751 deliveryingAcknowledgements.set(false);
752 }
753 }
754 }
755
756 public void dispose() throws JMSException {
757 if (!unconsumedMessages.isClosed()) {
758
759 // Do we have any acks we need to send out before closing?
760 // Ack any delivered messages now.
761 if (!session.getTransacted()) {
762 deliverAcks();
763 if (isAutoAcknowledgeBatch()) {
764 acknowledge();
765 }
766 }
767 if (executorService != null) {
768 executorService.shutdown();
769 try {
770 executorService.awaitTermination(60, TimeUnit.SECONDS);
771 } catch (InterruptedException e) {
772 Thread.currentThread().interrupt();
773 }
774 }
775
776 if (session.isClientAcknowledge()) {
777 if (!this.info.isBrowser()) {
778 // rollback duplicates that aren't acknowledged
779 List<MessageDispatch> tmp = null;
780 synchronized (this.deliveredMessages) {
781 tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
782 }
783 for (MessageDispatch old : tmp) {
784 this.session.connection.rollbackDuplicate(this, old.getMessage());
785 }
786 tmp.clear();
787 }
788 }
789 if (!session.isTransacted()) {
790 synchronized(deliveredMessages) {
791 deliveredMessages.clear();
792 }
793 }
794 unconsumedMessages.close();
795 this.session.removeConsumer(this);
796 List<MessageDispatch> list = unconsumedMessages.removeAll();
797 if (!this.info.isBrowser()) {
798 for (MessageDispatch old : list) {
799 // ensure we don't filter this as a duplicate
800 session.connection.rollbackDuplicate(this, old.getMessage());
801 }
802 }
803 }
804 }
805
806 /**
807 * @throws IllegalStateException
808 */
809 protected void checkClosed() throws IllegalStateException {
810 if (unconsumedMessages.isClosed()) {
811 throw new IllegalStateException("The Consumer is closed");
812 }
813 }
814
815 /**
816 * If we have a zero prefetch specified then send a pull command to the
817 * broker to pull a message we are about to receive
818 */
819 protected void sendPullCommand(long timeout) throws JMSException {
820 clearDispatchList();
821 if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
822 MessagePull messagePull = new MessagePull();
823 messagePull.configure(info);
824 messagePull.setTimeout(timeout);
825 session.asyncSendPacket(messagePull);
826 }
827 }
828
829 protected void checkMessageListener() throws JMSException {
830 session.checkMessageListener();
831 }
832
833 protected void setOptimizeAcknowledge(boolean value) {
834 if (optimizeAcknowledge && !value) {
835 deliverAcks();
836 }
837 optimizeAcknowledge = value;
838 }
839
840 protected void setPrefetchSize(int prefetch) {
841 deliverAcks();
842 this.info.setCurrentPrefetchSize(prefetch);
843 }
844
845 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
846 md.setDeliverySequenceId(session.getNextDeliveryId());
847 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
848 if (!isAutoAcknowledgeBatch()) {
849 synchronized(deliveredMessages) {
850 deliveredMessages.addFirst(md);
851 }
852 if (session.getTransacted()) {
853 if (transactedIndividualAck) {
854 immediateIndividualTransactedAck(md);
855 } else {
856 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
857 }
858 }
859 }
860 }
861
862 private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException {
863 // acks accumulate on the broker pending transaction completion to indicate
864 // delivery status
865 registerSync();
866 MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
867 ack.setTransactionId(session.getTransactionContext().getTransactionId());
868 session.syncSendPacket(ack);
869 }
870
871 private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
872 if (unconsumedMessages.isClosed()) {
873 return;
874 }
875 if (messageExpired) {
876 synchronized (deliveredMessages) {
877 deliveredMessages.remove(md);
878 }
879 stats.getExpiredMessageCount().increment();
880 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
881 } else {
882 stats.onMessage();
883 if (session.getTransacted()) {
884 // Do nothing.
885 } else if (isAutoAcknowledgeEach()) {
886 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
887 synchronized (deliveredMessages) {
888 if (!deliveredMessages.isEmpty()) {
889 if (optimizeAcknowledge) {
890 ackCounter++;
891 if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
892 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
893 if (ack != null) {
894 deliveredMessages.clear();
895 ackCounter = 0;
896 session.sendAck(ack);
897 optimizeAckTimestamp = System.currentTimeMillis();
898 }
899 }
900 } else {
901 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
902 if (ack!=null) {
903 deliveredMessages.clear();
904 session.sendAck(ack);
905 }
906 }
907 }
908 }
909 deliveryingAcknowledgements.set(false);
910 }
911 } else if (isAutoAcknowledgeBatch()) {
912 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
913 } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
914 boolean messageUnackedByConsumer = false;
915 synchronized (deliveredMessages) {
916 messageUnackedByConsumer = deliveredMessages.contains(md);
917 }
918 if (messageUnackedByConsumer) {
919 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
920 }
921 }
922 else {
923 throw new IllegalStateException("Invalid session state.");
924 }
925 }
926 }
927
928 /**
929 * Creates a MessageAck for all messages contained in deliveredMessages.
930 * Caller should hold the lock for deliveredMessages.
931 *
932 * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
933 * @return <code>null</code> if nothing to ack.
934 */
935 private MessageAck makeAckForAllDeliveredMessages(byte type) {
936 synchronized (deliveredMessages) {
937 if (deliveredMessages.isEmpty())
938 return null;
939
940 MessageDispatch md = deliveredMessages.getFirst();
941 MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
942 ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
943 return ack;
944 }
945 }
946
947 private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
948
949 // Don't acknowledge now, but we may need to let the broker know the
950 // consumer got the message to expand the pre-fetch window
951 if (session.getTransacted()) {
952 registerSync();
953 }
954
955 deliveredCounter++;
956
957 MessageAck oldPendingAck = pendingAck;
958 pendingAck = new MessageAck(md, ackType, deliveredCounter);
959 pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
960 if( oldPendingAck==null ) {
961 pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
962 } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
963 pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
964 } else {
965 // old pending ack being superseded by ack of another type, if is is not a delivered
966 // ack and hence important, send it now so it is not lost.
967 if ( !oldPendingAck.isDeliveredAck()) {
968 if (LOG.isDebugEnabled()) {
969 LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
970 }
971 session.sendAck(oldPendingAck);
972 } else {
973 if (LOG.isDebugEnabled()) {
974 LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
975 }
976 }
977 }
978
979 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
980 session.sendAck(pendingAck);
981 pendingAck=null;
982 deliveredCounter = 0;
983 additionalWindowSize = 0;
984 }
985 }
986
987 private void registerSync() throws JMSException {
988 session.doStartTransaction();
989 if (!synchronizationRegistered) {
990 synchronizationRegistered = true;
991 session.getTransactionContext().addSynchronization(new Synchronization() {
992 @Override
993 public void beforeEnd() throws Exception {
994 if (transactedIndividualAck) {
995 clearDispatchList();
996 waitForRedeliveries();
997 synchronized(deliveredMessages) {
998 rollbackOnFailedRecoveryRedelivery();
999 }
1000 } else {
1001 acknowledge();
1002 }
1003 synchronizationRegistered = false;
1004 }
1005
1006 @Override
1007 public void afterCommit() throws Exception {
1008 commit();
1009 synchronizationRegistered = false;
1010 }
1011
1012 @Override
1013 public void afterRollback() throws Exception {
1014 rollback();
1015 synchronizationRegistered = false;
1016 }
1017 });
1018 }
1019 }
1020
1021 /**
1022 * Acknowledge all the messages that have been delivered to the client up to
1023 * this point.
1024 *
1025 * @throws JMSException
1026 */
1027 public void acknowledge() throws JMSException {
1028 clearDispatchList();
1029 waitForRedeliveries();
1030 synchronized(deliveredMessages) {
1031 // Acknowledge all messages so far.
1032 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
1033 if (ack == null)
1034 return; // no msgs
1035
1036 if (session.getTransacted()) {
1037 rollbackOnFailedRecoveryRedelivery();
1038 session.doStartTransaction();
1039 ack.setTransactionId(session.getTransactionContext().getTransactionId());
1040 }
1041 session.sendAck(ack);
1042 pendingAck = null;
1043
1044 // Adjust the counters
1045 deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1046 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1047
1048 if (!session.getTransacted()) {
1049 deliveredMessages.clear();
1050 }
1051 }
1052 }
1053
1054 private void waitForRedeliveries() {
1055 if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1056 long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1057 int numberNotReplayed;
1058 do {
1059 numberNotReplayed = 0;
1060 synchronized(deliveredMessages) {
1061 if (previouslyDeliveredMessages != null) {
1062 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1063 if (!entry.getValue()) {
1064 numberNotReplayed++;
1065 }
1066 }
1067 }
1068 }
1069 if (numberNotReplayed > 0) {
1070 LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
1071 + previouslyDeliveredMessages.transactionId + ", to consumer :" + this.getConsumerId());
1072 try {
1073 Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1074 } catch (InterruptedException outOfhere) {
1075 break;
1076 }
1077 }
1078 } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1079 }
1080 }
1081
1082 /*
1083 * called with deliveredMessages locked
1084 */
1085 private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1086 if (previouslyDeliveredMessages != null) {
1087 // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1088 // as messages have been dispatched else where.
1089 int numberNotReplayed = 0;
1090 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1091 if (!entry.getValue()) {
1092 numberNotReplayed++;
1093 if (LOG.isDebugEnabled()) {
1094 LOG.debug("previously delivered message has not been replayed in transaction: "
1095 + previouslyDeliveredMessages.transactionId
1096 + " , messageId: " + entry.getKey());
1097 }
1098 }
1099 }
1100 if (numberNotReplayed > 0) {
1101 String message = "rolling back transaction ("
1102 + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1103 + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1104 LOG.warn(message);
1105 throw new TransactionRolledBackException(message);
1106 }
1107 }
1108 }
1109
1110 void acknowledge(MessageDispatch md) throws JMSException {
1111 MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
1112 session.sendAck(ack);
1113 synchronized(deliveredMessages){
1114 deliveredMessages.remove(md);
1115 }
1116 }
1117
1118 public void commit() throws JMSException {
1119 synchronized (deliveredMessages) {
1120 deliveredMessages.clear();
1121 clearPreviouslyDelivered();
1122 }
1123 redeliveryDelay = 0;
1124 }
1125
1126 public void rollback() throws JMSException {
1127 synchronized (unconsumedMessages.getMutex()) {
1128 if (optimizeAcknowledge) {
1129 // remove messages read but not acked at the broker yet through
1130 // optimizeAcknowledge
1131 if (!this.info.isBrowser()) {
1132 synchronized(deliveredMessages) {
1133 for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1134 // ensure we don't filter this as a duplicate
1135 MessageDispatch md = deliveredMessages.removeLast();
1136 session.connection.rollbackDuplicate(this, md.getMessage());
1137 }
1138 }
1139 }
1140 }
1141 synchronized(deliveredMessages) {
1142 rollbackPreviouslyDeliveredAndNotRedelivered();
1143 if (deliveredMessages.isEmpty()) {
1144 return;
1145 }
1146
1147 // use initial delay for first redelivery
1148 MessageDispatch lastMd = deliveredMessages.getFirst();
1149 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1150 if (currentRedeliveryCount > 0) {
1151 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1152 } else {
1153 redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1154 }
1155 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1156
1157 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1158 MessageDispatch md = iter.next();
1159 md.getMessage().onMessageRolledBack();
1160 // ensure we don't filter this as a duplicate
1161 session.connection.rollbackDuplicate(this, md.getMessage());
1162 }
1163
1164 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1165 && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1166 // We need to NACK the messages so that they get sent to the
1167 // DLQ.
1168 // Acknowledge the last message.
1169
1170 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1171 ack.setPoisonCause(lastMd.getRollbackCause());
1172 ack.setFirstMessageId(firstMsgId);
1173 session.sendAck(ack,true);
1174 // Adjust the window size.
1175 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1176 redeliveryDelay = 0;
1177 } else {
1178
1179 // only redelivery_ack after first delivery
1180 if (currentRedeliveryCount > 0) {
1181 MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1182 ack.setFirstMessageId(firstMsgId);
1183 session.sendAck(ack,true);
1184 }
1185
1186 // stop the delivery of messages.
1187 if (nonBlockingRedelivery) {
1188 if (!unconsumedMessages.isClosed()) {
1189
1190 final LinkedList<MessageDispatch> pendingRedeliveries =
1191 new LinkedList<MessageDispatch>(deliveredMessages);
1192
1193 // Start up the delivery again a little later.
1194 session.getScheduler().executeAfterDelay(new Runnable() {
1195 public void run() {
1196 try {
1197 if (!unconsumedMessages.isClosed()) {
1198 for(MessageDispatch dispatch : pendingRedeliveries) {
1199 session.dispatch(dispatch);
1200 }
1201 }
1202 } catch (Exception e) {
1203 session.connection.onAsyncException(e);
1204 }
1205 }
1206 }, redeliveryDelay);
1207 }
1208
1209 } else {
1210 unconsumedMessages.stop();
1211
1212 for (MessageDispatch md : deliveredMessages) {
1213 unconsumedMessages.enqueueFirst(md);
1214 }
1215
1216 if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1217 // Start up the delivery again a little later.
1218 session.getScheduler().executeAfterDelay(new Runnable() {
1219 public void run() {
1220 try {
1221 if (started.get()) {
1222 start();
1223 }
1224 } catch (JMSException e) {
1225 session.connection.onAsyncException(e);
1226 }
1227 }
1228 }, redeliveryDelay);
1229 } else {
1230 start();
1231 }
1232 }
1233 }
1234 deliveredCounter -= deliveredMessages.size();
1235 deliveredMessages.clear();
1236 }
1237 }
1238 if (messageListener.get() != null) {
1239 session.redispatch(this, unconsumedMessages);
1240 }
1241 }
1242
1243 /*
1244 * called with unconsumedMessages && deliveredMessages locked
1245 * remove any message not re-delivered as they can't be replayed to this
1246 * consumer on rollback
1247 */
1248 private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1249 if (previouslyDeliveredMessages != null) {
1250 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1251 if (!entry.getValue()) {
1252 removeFromDeliveredMessages(entry.getKey());
1253 }
1254 }
1255 clearPreviouslyDelivered();
1256 }
1257 }
1258
1259 /*
1260 * called with deliveredMessages locked
1261 */
1262 private void removeFromDeliveredMessages(MessageId key) {
1263 Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1264 while (iterator.hasNext()) {
1265 MessageDispatch candidate = iterator.next();
1266 if (key.equals(candidate.getMessage().getMessageId())) {
1267 session.connection.rollbackDuplicate(this, candidate.getMessage());
1268 iterator.remove();
1269 break;
1270 }
1271 }
1272 }
1273
1274 /*
1275 * called with deliveredMessages locked
1276 */
1277 private void clearPreviouslyDelivered() {
1278 if (previouslyDeliveredMessages != null) {
1279 previouslyDeliveredMessages.clear();
1280 previouslyDeliveredMessages = null;
1281 }
1282 }
1283
1284 public void dispatch(MessageDispatch md) {
1285 MessageListener listener = this.messageListener.get();
1286 try {
1287 clearMessagesInProgress();
1288 clearDispatchList();
1289 synchronized (unconsumedMessages.getMutex()) {
1290 if (!unconsumedMessages.isClosed()) {
1291 if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1292 if (listener != null && unconsumedMessages.isRunning()) {
1293 ActiveMQMessage message = createActiveMQMessage(md);
1294 beforeMessageIsConsumed(md);
1295 try {
1296 boolean expired = message.isExpired();
1297 if (!expired) {
1298 listener.onMessage(message);
1299 }
1300 afterMessageIsConsumed(md, expired);
1301 } catch (RuntimeException e) {
1302 LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
1303 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1304 // schedual redelivery and possible dlq processing
1305 md.setRollbackCause(e);
1306 rollback();
1307 } else {
1308 // Transacted or Client ack: Deliver the
1309 // next message.
1310 afterMessageIsConsumed(md, false);
1311 }
1312 }
1313 } else {
1314 if (!unconsumedMessages.isRunning()) {
1315 // delayed redelivery, ensure it can be re delivered
1316 session.connection.rollbackDuplicate(this, md.getMessage());
1317 }
1318 unconsumedMessages.enqueue(md);
1319 if (availableListener != null) {
1320 availableListener.onMessageAvailable(this);
1321 }
1322 }
1323 } else {
1324 if (!session.isTransacted()) {
1325 LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
1326 + " to consumer: " + getConsumerId() + ", ignoring (auto acking) duplicate: " + md);
1327 MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
1328 session.sendAck(ack);
1329 } else {
1330 if (LOG.isDebugEnabled()) {
1331 LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
1332 }
1333 boolean needsPoisonAck = false;
1334 synchronized (deliveredMessages) {
1335 if (previouslyDeliveredMessages != null) {
1336 previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1337 } else {
1338 // delivery while pending redelivery to another consumer on the same connection
1339 // not waiting for redelivery will help here
1340 needsPoisonAck = true;
1341 }
1342 }
1343 if (needsPoisonAck) {
1344 MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
1345 poisonAck.setFirstMessageId(md.getMessage().getMessageId());
1346 poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
1347 + session.getConnection().getConnectionInfo().getConnectionId()));
1348 LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
1349 + " consumer on this connection, failoverRedeliveryWaitPeriod="
1350 + failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck);
1351 session.sendAck(poisonAck);
1352 } else {
1353 if (transactedIndividualAck) {
1354 immediateIndividualTransactedAck(md);
1355 } else {
1356 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
1357 }
1358 }
1359 }
1360 }
1361 }
1362 }
1363 if (++dispatchedCount % 1000 == 0) {
1364 dispatchedCount = 0;
1365 Thread.yield();
1366 }
1367 } catch (Exception e) {
1368 session.connection.onClientInternalException(e);
1369 }
1370 }
1371
1372 // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1373 private void clearDispatchList() {
1374 if (clearDispatchList) {
1375 synchronized (deliveredMessages) {
1376 if (clearDispatchList) {
1377 if (!deliveredMessages.isEmpty()) {
1378 if (session.isTransacted()) {
1379 if (LOG.isDebugEnabled()) {
1380 LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1381 }
1382 if (previouslyDeliveredMessages == null) {
1383 previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1384 }
1385 for (MessageDispatch delivered : deliveredMessages) {
1386 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1387 }
1388 } else {
1389 if (LOG.isDebugEnabled()) {
1390 LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1391 }
1392 deliveredMessages.clear();
1393 pendingAck = null;
1394 }
1395 }
1396 clearDispatchList = false;
1397 }
1398 }
1399 }
1400 }
1401
1402 public int getMessageSize() {
1403 return unconsumedMessages.size();
1404 }
1405
1406 public void start() throws JMSException {
1407 if (unconsumedMessages.isClosed()) {
1408 return;
1409 }
1410 started.set(true);
1411 unconsumedMessages.start();
1412 session.executor.wakeup();
1413 }
1414
1415 public void stop() {
1416 started.set(false);
1417 unconsumedMessages.stop();
1418 }
1419
1420 @Override
1421 public String toString() {
1422 return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1423 + " }";
1424 }
1425
1426 /**
1427 * Delivers a message to the message listener.
1428 *
1429 * @return
1430 * @throws JMSException
1431 */
1432 public boolean iterate() {
1433 MessageListener listener = this.messageListener.get();
1434 if (listener != null) {
1435 MessageDispatch md = unconsumedMessages.dequeueNoWait();
1436 if (md != null) {
1437 dispatch(md);
1438 return true;
1439 }
1440 }
1441 return false;
1442 }
1443
1444 public boolean isInUse(ActiveMQTempDestination destination) {
1445 return info.getDestination().equals(destination);
1446 }
1447
1448 public long getLastDeliveredSequenceId() {
1449 return lastDeliveredSequenceId;
1450 }
1451
1452 public IOException getFailureError() {
1453 return failureError;
1454 }
1455
1456 public void setFailureError(IOException failureError) {
1457 this.failureError = failureError;
1458 }
1459 }