001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.File;
020 import java.io.InputStream;
021 import java.io.Serializable;
022 import java.net.URL;
023 import java.util.Collections;
024 import java.util.Iterator;
025 import java.util.List;
026 import java.util.concurrent.CopyOnWriteArrayList;
027 import java.util.concurrent.ThreadPoolExecutor;
028 import java.util.concurrent.atomic.AtomicBoolean;
029
030 import javax.jms.BytesMessage;
031 import javax.jms.Destination;
032 import javax.jms.IllegalStateException;
033 import javax.jms.InvalidDestinationException;
034 import javax.jms.InvalidSelectorException;
035 import javax.jms.JMSException;
036 import javax.jms.MapMessage;
037 import javax.jms.Message;
038 import javax.jms.MessageConsumer;
039 import javax.jms.MessageListener;
040 import javax.jms.MessageProducer;
041 import javax.jms.ObjectMessage;
042 import javax.jms.Queue;
043 import javax.jms.QueueBrowser;
044 import javax.jms.QueueReceiver;
045 import javax.jms.QueueSender;
046 import javax.jms.QueueSession;
047 import javax.jms.Session;
048 import javax.jms.StreamMessage;
049 import javax.jms.TemporaryQueue;
050 import javax.jms.TemporaryTopic;
051 import javax.jms.TextMessage;
052 import javax.jms.Topic;
053 import javax.jms.TopicPublisher;
054 import javax.jms.TopicSession;
055 import javax.jms.TopicSubscriber;
056 import javax.jms.TransactionRolledBackException;
057
058 import org.apache.activemq.blob.BlobDownloader;
059 import org.apache.activemq.blob.BlobTransferPolicy;
060 import org.apache.activemq.blob.BlobUploader;
061 import org.apache.activemq.command.ActiveMQBlobMessage;
062 import org.apache.activemq.command.ActiveMQBytesMessage;
063 import org.apache.activemq.command.ActiveMQDestination;
064 import org.apache.activemq.command.ActiveMQMapMessage;
065 import org.apache.activemq.command.ActiveMQMessage;
066 import org.apache.activemq.command.ActiveMQObjectMessage;
067 import org.apache.activemq.command.ActiveMQQueue;
068 import org.apache.activemq.command.ActiveMQStreamMessage;
069 import org.apache.activemq.command.ActiveMQTempDestination;
070 import org.apache.activemq.command.ActiveMQTempQueue;
071 import org.apache.activemq.command.ActiveMQTempTopic;
072 import org.apache.activemq.command.ActiveMQTextMessage;
073 import org.apache.activemq.command.ActiveMQTopic;
074 import org.apache.activemq.command.Command;
075 import org.apache.activemq.command.ConsumerId;
076 import org.apache.activemq.command.MessageAck;
077 import org.apache.activemq.command.MessageDispatch;
078 import org.apache.activemq.command.MessageId;
079 import org.apache.activemq.command.ProducerId;
080 import org.apache.activemq.command.RemoveInfo;
081 import org.apache.activemq.command.Response;
082 import org.apache.activemq.command.SessionId;
083 import org.apache.activemq.command.SessionInfo;
084 import org.apache.activemq.command.TransactionId;
085 import org.apache.activemq.management.JMSSessionStatsImpl;
086 import org.apache.activemq.management.StatsCapable;
087 import org.apache.activemq.management.StatsImpl;
088 import org.apache.activemq.thread.Scheduler;
089 import org.apache.activemq.transaction.Synchronization;
090 import org.apache.activemq.usage.MemoryUsage;
091 import org.apache.activemq.util.Callback;
092 import org.apache.activemq.util.JMSExceptionSupport;
093 import org.apache.activemq.util.LongSequenceGenerator;
094 import org.slf4j.Logger;
095 import org.slf4j.LoggerFactory;
096
097 /**
098 * <P>
099 * A <CODE>Session</CODE> object is a single-threaded context for producing
100 * and consuming messages. Although it may allocate provider resources outside
101 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102 * <P>
103 * A session serves several purposes:
104 * <UL>
105 * <LI>It is a factory for its message producers and consumers.
106 * <LI>It supplies provider-optimized message factories.
107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108 * <CODE>TemporaryQueues</CODE>.
109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110 * objects for those clients that need to dynamically manipulate
111 * provider-specific destination names.
112 * <LI>It supports a single series of transactions that combine work spanning
113 * its producers and consumers into atomic units.
114 * <LI>It defines a serial order for the messages it consumes and the messages
115 * it produces.
116 * <LI>It retains messages it consumes until they have been acknowledged.
117 * <LI>It serializes execution of message listeners registered with its message
118 * consumers.
119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120 * </UL>
121 * <P>
122 * A session can create and service multiple message producers and consumers.
123 * <P>
124 * One typical use is to have a thread block on a synchronous
125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127 * <P>
128 * If a client desires to have one thread produce messages while others consume
129 * them, the client should use a separate session for its producing thread.
130 * <P>
131 * Once a connection has been started, any session with one or more registered
132 * message listeners is dedicated to the thread of control that delivers
133 * messages to it. It is erroneous for client code to use this session or any of
134 * its constituent objects from another thread of control. The only exception to
135 * this rule is the use of the session or connection <CODE>close</CODE>
136 * method.
137 * <P>
138 * It should be easy for most clients to partition their work naturally into
139 * sessions. This model allows clients to start simply and incrementally add
140 * message processing complexity as their need for concurrency grows.
141 * <P>
142 * The <CODE>close</CODE> method is the only session method that can be called
143 * while some other session method is being executed in another thread.
144 * <P>
145 * A session may be specified as transacted. Each transacted session supports a
146 * single series of transactions. Each transaction groups a set of message sends
147 * and a set of message receives into an atomic unit of work. In effect,
148 * transactions organize a session's input message stream and output message
149 * stream into series of atomic units. When a transaction commits, its atomic
150 * unit of input is acknowledged and its associated atomic unit of output is
151 * sent. If a transaction rollback is done, the transaction's sent messages are
152 * destroyed and the session's input is automatically recovered.
153 * <P>
154 * The content of a transaction's input and output units is simply those
155 * messages that have been produced and consumed within the session's current
156 * transaction.
157 * <P>
158 * A transaction is completed using either its session's <CODE>commit</CODE>
159 * method or its session's <CODE>rollback </CODE> method. The completion of a
160 * session's current transaction automatically begins the next. The result is
161 * that a transacted session always has a current transaction within which its
162 * work is done.
163 * <P>
164 * The Java Transaction Service (JTS) or some other transaction monitor may be
165 * used to combine a session's transaction with transactions on other resources
166 * (databases, other JMS sessions, etc.). Since Java distributed transactions
167 * are controlled via the Java Transaction API (JTA), use of the session's
168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169 * prohibited.
170 * <P>
171 * The JMS API does not require support for JTA; however, it does define how a
172 * provider supplies this support.
173 * <P>
174 * Although it is also possible for a JMS client to handle distributed
175 * transactions directly, it is unlikely that many JMS clients will do this.
176 * Support for JTA in the JMS API is targeted at systems vendors who will be
177 * integrating the JMS API into their application server products.
178 *
179 *
180 * @see javax.jms.Session
181 * @see javax.jms.QueueSession
182 * @see javax.jms.TopicSession
183 * @see javax.jms.XASession
184 */
185 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186
187 /**
188 * Only acknowledge an individual message - using message.acknowledge()
189 * as opposed to CLIENT_ACKNOWLEDGE which
190 * acknowledges all messages consumed by a session at when acknowledge()
191 * is called
192 */
193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195
196 public static interface DeliveryListener {
197 void beforeDelivery(ActiveMQSession session, Message msg);
198
199 void afterDelivery(ActiveMQSession session, Message msg);
200 }
201
202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203 private final ThreadPoolExecutor connectionExecutor;
204
205 protected int acknowledgementMode;
206 protected final ActiveMQConnection connection;
207 protected final SessionInfo info;
208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211 protected final ActiveMQSessionExecutor executor;
212 protected final AtomicBoolean started = new AtomicBoolean(false);
213
214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216
217 protected boolean closed;
218 private volatile boolean synchronizationRegistered;
219 protected boolean asyncDispatch;
220 protected boolean sessionAsyncDispatch;
221 protected final boolean debug;
222 protected Object sendMutex = new Object();
223
224 private MessageListener messageListener;
225 private final JMSSessionStatsImpl stats;
226 private TransactionContext transactionContext;
227 private DeliveryListener deliveryListener;
228 private MessageTransformer transformer;
229 private BlobTransferPolicy blobTransferPolicy;
230 private long lastDeliveredSequenceId;
231
232 /**
233 * Construct the Session
234 *
235 * @param connection
236 * @param sessionId
237 * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
238 * Session.SESSION_TRANSACTED
239 * @param asyncDispatch
240 * @param sessionAsyncDispatch
241 * @throws JMSException on internal error
242 */
243 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
244 this.debug = LOG.isDebugEnabled();
245 this.connection = connection;
246 this.acknowledgementMode = acknowledgeMode;
247 this.asyncDispatch = asyncDispatch;
248 this.sessionAsyncDispatch = sessionAsyncDispatch;
249 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
250 setTransactionContext(new TransactionContext(connection));
251 stats = new JMSSessionStatsImpl(producers, consumers);
252 this.connection.asyncSendPacket(info);
253 setTransformer(connection.getTransformer());
254 setBlobTransferPolicy(connection.getBlobTransferPolicy());
255 this.connectionExecutor=connection.getExecutor();
256 this.executor = new ActiveMQSessionExecutor(this);
257 connection.addSession(this);
258 if (connection.isStarted()) {
259 start();
260 }
261
262 }
263
264 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
265 this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
266 }
267
268 /**
269 * Sets the transaction context of the session.
270 *
271 * @param transactionContext - provides the means to control a JMS
272 * transaction.
273 */
274 public void setTransactionContext(TransactionContext transactionContext) {
275 this.transactionContext = transactionContext;
276 }
277
278 /**
279 * Returns the transaction context of the session.
280 *
281 * @return transactionContext - session's transaction context.
282 */
283 public TransactionContext getTransactionContext() {
284 return transactionContext;
285 }
286
287 /*
288 * (non-Javadoc)
289 *
290 * @see org.apache.activemq.management.StatsCapable#getStats()
291 */
292 public StatsImpl getStats() {
293 return stats;
294 }
295
296 /**
297 * Returns the session's statistics.
298 *
299 * @return stats - session's statistics.
300 */
301 public JMSSessionStatsImpl getSessionStats() {
302 return stats;
303 }
304
305 /**
306 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
307 * object is used to send a message containing a stream of uninterpreted
308 * bytes.
309 *
310 * @return the an ActiveMQBytesMessage
311 * @throws JMSException if the JMS provider fails to create this message due
312 * to some internal error.
313 */
314 public BytesMessage createBytesMessage() throws JMSException {
315 ActiveMQBytesMessage message = new ActiveMQBytesMessage();
316 configureMessage(message);
317 return message;
318 }
319
320 /**
321 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
322 * object is used to send a self-defining set of name-value pairs, where
323 * names are <CODE>String</CODE> objects and values are primitive values
324 * in the Java programming language.
325 *
326 * @return an ActiveMQMapMessage
327 * @throws JMSException if the JMS provider fails to create this message due
328 * to some internal error.
329 */
330 public MapMessage createMapMessage() throws JMSException {
331 ActiveMQMapMessage message = new ActiveMQMapMessage();
332 configureMessage(message);
333 return message;
334 }
335
336 /**
337 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
338 * interface is the root interface of all JMS messages. A
339 * <CODE>Message</CODE> object holds all the standard message header
340 * information. It can be sent when a message containing only header
341 * information is sufficient.
342 *
343 * @return an ActiveMQMessage
344 * @throws JMSException if the JMS provider fails to create this message due
345 * to some internal error.
346 */
347 public Message createMessage() throws JMSException {
348 ActiveMQMessage message = new ActiveMQMessage();
349 configureMessage(message);
350 return message;
351 }
352
353 /**
354 * Creates an <CODE>ObjectMessage</CODE> object. An
355 * <CODE>ObjectMessage</CODE> object is used to send a message that
356 * contains a serializable Java object.
357 *
358 * @return an ActiveMQObjectMessage
359 * @throws JMSException if the JMS provider fails to create this message due
360 * to some internal error.
361 */
362 public ObjectMessage createObjectMessage() throws JMSException {
363 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
364 configureMessage(message);
365 return message;
366 }
367
368 /**
369 * Creates an initialized <CODE>ObjectMessage</CODE> object. An
370 * <CODE>ObjectMessage</CODE> object is used to send a message that
371 * contains a serializable Java object.
372 *
373 * @param object the object to use to initialize this message
374 * @return an ActiveMQObjectMessage
375 * @throws JMSException if the JMS provider fails to create this message due
376 * to some internal error.
377 */
378 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
379 ActiveMQObjectMessage message = new ActiveMQObjectMessage();
380 configureMessage(message);
381 message.setObject(object);
382 return message;
383 }
384
385 /**
386 * Creates a <CODE>StreamMessage</CODE> object. A
387 * <CODE>StreamMessage</CODE> object is used to send a self-defining
388 * stream of primitive values in the Java programming language.
389 *
390 * @return an ActiveMQStreamMessage
391 * @throws JMSException if the JMS provider fails to create this message due
392 * to some internal error.
393 */
394 public StreamMessage createStreamMessage() throws JMSException {
395 ActiveMQStreamMessage message = new ActiveMQStreamMessage();
396 configureMessage(message);
397 return message;
398 }
399
400 /**
401 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
402 * object is used to send a message containing a <CODE>String</CODE>
403 * object.
404 *
405 * @return an ActiveMQTextMessage
406 * @throws JMSException if the JMS provider fails to create this message due
407 * to some internal error.
408 */
409 public TextMessage createTextMessage() throws JMSException {
410 ActiveMQTextMessage message = new ActiveMQTextMessage();
411 configureMessage(message);
412 return message;
413 }
414
415 /**
416 * Creates an initialized <CODE>TextMessage</CODE> object. A
417 * <CODE>TextMessage</CODE> object is used to send a message containing a
418 * <CODE>String</CODE>.
419 *
420 * @param text the string used to initialize this message
421 * @return an ActiveMQTextMessage
422 * @throws JMSException if the JMS provider fails to create this message due
423 * to some internal error.
424 */
425 public TextMessage createTextMessage(String text) throws JMSException {
426 ActiveMQTextMessage message = new ActiveMQTextMessage();
427 message.setText(text);
428 configureMessage(message);
429 return message;
430 }
431
432 /**
433 * Creates an initialized <CODE>BlobMessage</CODE> object. A
434 * <CODE>BlobMessage</CODE> object is used to send a message containing a
435 * <CODE>URL</CODE> which points to some network addressible BLOB.
436 *
437 * @param url the network addressable URL used to pass directly to the
438 * consumer
439 * @return a BlobMessage
440 * @throws JMSException if the JMS provider fails to create this message due
441 * to some internal error.
442 */
443 public BlobMessage createBlobMessage(URL url) throws JMSException {
444 return createBlobMessage(url, false);
445 }
446
447 /**
448 * Creates an initialized <CODE>BlobMessage</CODE> object. A
449 * <CODE>BlobMessage</CODE> object is used to send a message containing a
450 * <CODE>URL</CODE> which points to some network addressible BLOB.
451 *
452 * @param url the network addressable URL used to pass directly to the
453 * consumer
454 * @param deletedByBroker indicates whether or not the resource is deleted
455 * by the broker when the message is acknowledged
456 * @return a BlobMessage
457 * @throws JMSException if the JMS provider fails to create this message due
458 * to some internal error.
459 */
460 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
461 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
462 configureMessage(message);
463 message.setURL(url);
464 message.setDeletedByBroker(deletedByBroker);
465 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
466 return message;
467 }
468
469 /**
470 * Creates an initialized <CODE>BlobMessage</CODE> object. A
471 * <CODE>BlobMessage</CODE> object is used to send a message containing
472 * the <CODE>File</CODE> content. Before the message is sent the file
473 * conent will be uploaded to the broker or some other remote repository
474 * depending on the {@link #getBlobTransferPolicy()}.
475 *
476 * @param file the file to be uploaded to some remote repo (or the broker)
477 * depending on the strategy
478 * @return a BlobMessage
479 * @throws JMSException if the JMS provider fails to create this message due
480 * to some internal error.
481 */
482 public BlobMessage createBlobMessage(File file) throws JMSException {
483 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
484 configureMessage(message);
485 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
486 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
487 message.setDeletedByBroker(true);
488 message.setName(file.getName());
489 return message;
490 }
491
492 /**
493 * Creates an initialized <CODE>BlobMessage</CODE> object. A
494 * <CODE>BlobMessage</CODE> object is used to send a message containing
495 * the <CODE>File</CODE> content. Before the message is sent the file
496 * conent will be uploaded to the broker or some other remote repository
497 * depending on the {@link #getBlobTransferPolicy()}.
498 *
499 * @param in the stream to be uploaded to some remote repo (or the broker)
500 * depending on the strategy
501 * @return a BlobMessage
502 * @throws JMSException if the JMS provider fails to create this message due
503 * to some internal error.
504 */
505 public BlobMessage createBlobMessage(InputStream in) throws JMSException {
506 ActiveMQBlobMessage message = new ActiveMQBlobMessage();
507 configureMessage(message);
508 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
509 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
510 message.setDeletedByBroker(true);
511 return message;
512 }
513
514 /**
515 * Indicates whether the session is in transacted mode.
516 *
517 * @return true if the session is in transacted mode
518 * @throws JMSException if there is some internal error.
519 */
520 public boolean getTransacted() throws JMSException {
521 checkClosed();
522 return isTransacted();
523 }
524
525 /**
526 * Returns the acknowledgement mode of the session. The acknowledgement mode
527 * is set at the time that the session is created. If the session is
528 * transacted, the acknowledgement mode is ignored.
529 *
530 * @return If the session is not transacted, returns the current
531 * acknowledgement mode for the session. If the session is
532 * transacted, returns SESSION_TRANSACTED.
533 * @throws JMSException
534 * @see javax.jms.Connection#createSession(boolean,int)
535 * @since 1.1 exception JMSException if there is some internal error.
536 */
537 public int getAcknowledgeMode() throws JMSException {
538 checkClosed();
539 return this.acknowledgementMode;
540 }
541
542 /**
543 * Commits all messages done in this transaction and releases any locks
544 * currently held.
545 *
546 * @throws JMSException if the JMS provider fails to commit the transaction
547 * due to some internal error.
548 * @throws TransactionRolledBackException if the transaction is rolled back
549 * due to some internal error during commit.
550 * @throws javax.jms.IllegalStateException if the method is not called by a
551 * transacted session.
552 */
553 public void commit() throws JMSException {
554 checkClosed();
555 if (!getTransacted()) {
556 throw new javax.jms.IllegalStateException("Not a transacted session");
557 }
558 if (LOG.isDebugEnabled()) {
559 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
560 }
561 transactionContext.commit();
562 }
563
564 /**
565 * Rolls back any messages done in this transaction and releases any locks
566 * currently held.
567 *
568 * @throws JMSException if the JMS provider fails to roll back the
569 * transaction due to some internal error.
570 * @throws javax.jms.IllegalStateException if the method is not called by a
571 * transacted session.
572 */
573 public void rollback() throws JMSException {
574 checkClosed();
575 if (!getTransacted()) {
576 throw new javax.jms.IllegalStateException("Not a transacted session");
577 }
578 if (LOG.isDebugEnabled()) {
579 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId());
580 }
581 transactionContext.rollback();
582 }
583
584 /**
585 * Closes the session.
586 * <P>
587 * Since a provider may allocate some resources on behalf of a session
588 * outside the JVM, clients should close the resources when they are not
589 * needed. Relying on garbage collection to eventually reclaim these
590 * resources may not be timely enough.
591 * <P>
592 * There is no need to close the producers and consumers of a closed
593 * session.
594 * <P>
595 * This call will block until a <CODE>receive</CODE> call or message
596 * listener in progress has completed. A blocked message consumer
597 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
598 * is closed.
599 * <P>
600 * Closing a transacted session must roll back the transaction in progress.
601 * <P>
602 * This method is the only <CODE>Session</CODE> method that can be called
603 * concurrently.
604 * <P>
605 * Invoking any other <CODE>Session</CODE> method on a closed session must
606 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
607 * closed session must <I>not </I> throw an exception.
608 *
609 * @throws JMSException if the JMS provider fails to close the session due
610 * to some internal error.
611 */
612 public void close() throws JMSException {
613 if (!closed) {
614 if (getTransactionContext().isInXATransaction()) {
615 if (!synchronizationRegistered) {
616 synchronizationRegistered = true;
617 getTransactionContext().addSynchronization(new Synchronization() {
618
619 @Override
620 public void afterCommit() throws Exception {
621 doClose();
622 synchronizationRegistered = false;
623 }
624
625 @Override
626 public void afterRollback() throws Exception {
627 doClose();
628 synchronizationRegistered = false;
629 }
630 });
631 }
632
633 } else {
634 doClose();
635 }
636 }
637 }
638
639 private void doClose() throws JMSException {
640 boolean interrupted = Thread.interrupted();
641 dispose();
642 RemoveInfo removeCommand = info.createRemoveCommand();
643 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
644 connection.asyncSendPacket(removeCommand);
645 if (interrupted) {
646 Thread.currentThread().interrupt();
647 }
648 }
649
650 void clearMessagesInProgress() {
651 executor.clearMessagesInProgress();
652 // we are called from inside the transport reconnection logic
653 // which involves us clearing all the connections' consumers
654 // dispatch and delivered lists. So rather than trying to
655 // grab a mutex (which could be already owned by the message
656 // listener calling the send or an ack) we allow it to complete in
657 // a separate thread via the scheduler and notify us via
658 // connection.transportInterruptionProcessingComplete()
659 //
660 for (final ActiveMQMessageConsumer consumer : consumers) {
661 consumer.inProgressClearRequired();
662 try {
663 connection.getScheduler().executeAfterDelay(new Runnable() {
664 public void run() {
665 consumer.clearMessagesInProgress();
666 }}, 0l);
667 } catch (JMSException e) {
668 connection.onClientInternalException(e);
669 }
670 }
671 }
672
673 void deliverAcks() {
674 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
675 ActiveMQMessageConsumer consumer = iter.next();
676 consumer.deliverAcks();
677 }
678 }
679
680 public synchronized void dispose() throws JMSException {
681 if (!closed) {
682
683 try {
684 executor.stop();
685
686 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
687 ActiveMQMessageConsumer consumer = iter.next();
688 consumer.setFailureError(connection.getFirstFailureError());
689 consumer.dispose();
690 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
691 }
692 consumers.clear();
693
694 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
695 ActiveMQMessageProducer producer = iter.next();
696 producer.dispose();
697 }
698 producers.clear();
699
700 try {
701 if (getTransactionContext().isInLocalTransaction()) {
702 rollback();
703 }
704 } catch (JMSException e) {
705 }
706
707 } finally {
708 connection.removeSession(this);
709 this.transactionContext = null;
710 closed = true;
711 }
712 }
713 }
714
715 /**
716 * Checks that the session is not closed then configures the message
717 */
718 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
719 checkClosed();
720 message.setConnection(connection);
721 }
722
723 /**
724 * Check if the session is closed. It is used for ensuring that the session
725 * is open before performing various operations.
726 *
727 * @throws IllegalStateException if the Session is closed
728 */
729 protected void checkClosed() throws IllegalStateException {
730 if (closed) {
731 throw new IllegalStateException("The Session is closed");
732 }
733 }
734
735 /**
736 * Checks if the session is closed.
737 *
738 * @return true if the session is closed, false otherwise.
739 */
740 public boolean isClosed() {
741 return closed;
742 }
743
744 /**
745 * Stops message delivery in this session, and restarts message delivery
746 * with the oldest unacknowledged message.
747 * <P>
748 * All consumers deliver messages in a serial order. Acknowledging a
749 * received message automatically acknowledges all messages that have been
750 * delivered to the client.
751 * <P>
752 * Restarting a session causes it to take the following actions:
753 * <UL>
754 * <LI>Stop message delivery
755 * <LI>Mark all messages that might have been delivered but not
756 * acknowledged as "redelivered"
757 * <LI>Restart the delivery sequence including all unacknowledged messages
758 * that had been previously delivered. Redelivered messages do not have to
759 * be delivered in exactly their original delivery order.
760 * </UL>
761 *
762 * @throws JMSException if the JMS provider fails to stop and restart
763 * message delivery due to some internal error.
764 * @throws IllegalStateException if the method is called by a transacted
765 * session.
766 */
767 public void recover() throws JMSException {
768
769 checkClosed();
770 if (getTransacted()) {
771 throw new IllegalStateException("This session is transacted");
772 }
773
774 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
775 ActiveMQMessageConsumer c = iter.next();
776 c.rollback();
777 }
778
779 }
780
781 /**
782 * Returns the session's distinguished message listener (optional).
783 *
784 * @return the message listener associated with this session
785 * @throws JMSException if the JMS provider fails to get the message
786 * listener due to an internal error.
787 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
788 * @see javax.jms.ServerSessionPool
789 * @see javax.jms.ServerSession
790 */
791 public MessageListener getMessageListener() throws JMSException {
792 checkClosed();
793 return this.messageListener;
794 }
795
796 /**
797 * Sets the session's distinguished message listener (optional).
798 * <P>
799 * When the distinguished message listener is set, no other form of message
800 * receipt in the session can be used; however, all forms of sending
801 * messages are still supported.
802 * <P>
803 * This is an expert facility not used by regular JMS clients.
804 *
805 * @param listener the message listener to associate with this session
806 * @throws JMSException if the JMS provider fails to set the message
807 * listener due to an internal error.
808 * @see javax.jms.Session#getMessageListener()
809 * @see javax.jms.ServerSessionPool
810 * @see javax.jms.ServerSession
811 */
812 public void setMessageListener(MessageListener listener) throws JMSException {
813 checkClosed();
814 this.messageListener = listener;
815
816 if (listener != null) {
817 executor.setDispatchedBySessionPool(true);
818 }
819 }
820
821 /**
822 * Optional operation, intended to be used only by Application Servers, not
823 * by ordinary JMS clients.
824 *
825 * @see javax.jms.ServerSession
826 */
827 public void run() {
828 MessageDispatch messageDispatch;
829 while ((messageDispatch = executor.dequeueNoWait()) != null) {
830 final MessageDispatch md = messageDispatch;
831 ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
832 if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
833 // TODO: Ack it without delivery to client
834 continue;
835 }
836
837 if (isClientAcknowledge()||isIndividualAcknowledge()) {
838 message.setAcknowledgeCallback(new Callback() {
839 public void execute() throws Exception {
840 }
841 });
842 }
843
844 if (deliveryListener != null) {
845 deliveryListener.beforeDelivery(this, message);
846 }
847
848 md.setDeliverySequenceId(getNextDeliveryId());
849
850 try {
851 messageListener.onMessage(message);
852 } catch (RuntimeException e) {
853 LOG.error("error dispatching message: ", e);
854 // A problem while invoking the MessageListener does not
855 // in general indicate a problem with the connection to the broker, i.e.
856 // it will usually be sufficient to let the afterDelivery() method either
857 // commit or roll back in order to deal with the exception.
858 // However, we notify any registered client internal exception listener
859 // of the problem.
860 connection.onClientInternalException(e);
861 }
862
863 try {
864 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
865 ack.setFirstMessageId(md.getMessage().getMessageId());
866 doStartTransaction();
867 ack.setTransactionId(getTransactionContext().getTransactionId());
868 if (ack.getTransactionId() != null) {
869 getTransactionContext().addSynchronization(new Synchronization() {
870
871 @Override
872 public void afterRollback() throws Exception {
873 md.getMessage().onMessageRolledBack();
874 // ensure we don't filter this as a duplicate
875 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
876 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
877 int redeliveryCounter = md.getMessage().getRedeliveryCounter();
878 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
879 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
880 // We need to NACK the messages so that they get
881 // sent to the
882 // DLQ.
883 // Acknowledge the last message.
884 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
885 ack.setFirstMessageId(md.getMessage().getMessageId());
886 asyncSendPacket(ack);
887 } else {
888
889 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
890 ack.setFirstMessageId(md.getMessage().getMessageId());
891 asyncSendPacket(ack);
892
893 // Figure out how long we should wait to resend
894 // this message.
895 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
896 for (int i = 0; i < redeliveryCounter; i++) {
897 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
898 }
899 connection.getScheduler().executeAfterDelay(new Runnable() {
900
901 public void run() {
902 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
903 }
904 }, redeliveryDelay);
905 }
906 }
907 });
908 }
909 asyncSendPacket(ack);
910 } catch (Throwable e) {
911 connection.onClientInternalException(e);
912 }
913
914 if (deliveryListener != null) {
915 deliveryListener.afterDelivery(this, message);
916 }
917 }
918 }
919
920 /**
921 * Creates a <CODE>MessageProducer</CODE> to send messages to the
922 * specified destination.
923 * <P>
924 * A client uses a <CODE>MessageProducer</CODE> object to send messages to
925 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
926 * inherit from <CODE>Destination</CODE>, they can be used in the
927 * destination parameter to create a <CODE>MessageProducer</CODE> object.
928 *
929 * @param destination the <CODE>Destination</CODE> to send to, or null if
930 * this is a producer which does not have a specified
931 * destination.
932 * @return the MessageProducer
933 * @throws JMSException if the session fails to create a MessageProducer due
934 * to some internal error.
935 * @throws InvalidDestinationException if an invalid destination is
936 * specified.
937 * @since 1.1
938 */
939 public MessageProducer createProducer(Destination destination) throws JMSException {
940 checkClosed();
941 if (destination instanceof CustomDestination) {
942 CustomDestination customDestination = (CustomDestination)destination;
943 return customDestination.createProducer(this);
944 }
945 int timeSendOut = connection.getSendTimeout();
946 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
947 }
948
949 /**
950 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
951 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
952 * <CODE>Destination</CODE>, they can be used in the destination
953 * parameter to create a <CODE>MessageConsumer</CODE>.
954 *
955 * @param destination the <CODE>Destination</CODE> to access.
956 * @return the MessageConsumer
957 * @throws JMSException if the session fails to create a consumer due to
958 * some internal error.
959 * @throws InvalidDestinationException if an invalid destination is
960 * specified.
961 * @since 1.1
962 */
963 public MessageConsumer createConsumer(Destination destination) throws JMSException {
964 return createConsumer(destination, (String) null);
965 }
966
967 /**
968 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
969 * using a message selector. Since <CODE> Queue</CODE> and
970 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
971 * can be used in the destination parameter to create a
972 * <CODE>MessageConsumer</CODE>.
973 * <P>
974 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
975 * that have been sent to a destination.
976 *
977 * @param destination the <CODE>Destination</CODE> to access
978 * @param messageSelector only messages with properties matching the message
979 * selector expression are delivered. A value of null or an
980 * empty string indicates that there is no message selector
981 * for the message consumer.
982 * @return the MessageConsumer
983 * @throws JMSException if the session fails to create a MessageConsumer due
984 * to some internal error.
985 * @throws InvalidDestinationException if an invalid destination is
986 * specified.
987 * @throws InvalidSelectorException if the message selector is invalid.
988 * @since 1.1
989 */
990 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
991 return createConsumer(destination, messageSelector, false);
992 }
993
994 /**
995 * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
996 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
997 * <CODE>Destination</CODE>, they can be used in the destination
998 * parameter to create a <CODE>MessageConsumer</CODE>.
999 *
1000 * @param destination the <CODE>Destination</CODE> to access.
1001 * @param messageListener the listener to use for async consumption of messages
1002 * @return the MessageConsumer
1003 * @throws JMSException if the session fails to create a consumer due to
1004 * some internal error.
1005 * @throws InvalidDestinationException if an invalid destination is
1006 * specified.
1007 * @since 1.1
1008 */
1009 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1010 return createConsumer(destination, null, messageListener);
1011 }
1012
1013 /**
1014 * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1015 * using a message selector. Since <CODE> Queue</CODE> and
1016 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1017 * can be used in the destination parameter to create a
1018 * <CODE>MessageConsumer</CODE>.
1019 * <P>
1020 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1021 * that have been sent to a destination.
1022 *
1023 * @param destination the <CODE>Destination</CODE> to access
1024 * @param messageSelector only messages with properties matching the message
1025 * selector expression are delivered. A value of null or an
1026 * empty string indicates that there is no message selector
1027 * for the message consumer.
1028 * @param messageListener the listener to use for async consumption of messages
1029 * @return the MessageConsumer
1030 * @throws JMSException if the session fails to create a MessageConsumer due
1031 * to some internal error.
1032 * @throws InvalidDestinationException if an invalid destination is
1033 * specified.
1034 * @throws InvalidSelectorException if the message selector is invalid.
1035 * @since 1.1
1036 */
1037 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1038 return createConsumer(destination, messageSelector, false, messageListener);
1039 }
1040
1041 /**
1042 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1043 * using a message selector. This method can specify whether messages
1044 * published by its own connection should be delivered to it, if the
1045 * destination is a topic.
1046 * <P>
1047 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1048 * <CODE>Destination</CODE>, they can be used in the destination
1049 * parameter to create a <CODE>MessageConsumer</CODE>.
1050 * <P>
1051 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1052 * that have been published to a destination.
1053 * <P>
1054 * In some cases, a connection may both publish and subscribe to a topic.
1055 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1056 * inhibit the delivery of messages published by its own connection. The
1057 * default value for this attribute is False. The <CODE>noLocal</CODE>
1058 * value must be supported by destinations that are topics.
1059 *
1060 * @param destination the <CODE>Destination</CODE> to access
1061 * @param messageSelector only messages with properties matching the message
1062 * selector expression are delivered. A value of null or an
1063 * empty string indicates that there is no message selector
1064 * for the message consumer.
1065 * @param noLocal - if true, and the destination is a topic, inhibits the
1066 * delivery of messages published by its own connection. The
1067 * behavior for <CODE>NoLocal</CODE> is not specified if
1068 * the destination is a queue.
1069 * @return the MessageConsumer
1070 * @throws JMSException if the session fails to create a MessageConsumer due
1071 * to some internal error.
1072 * @throws InvalidDestinationException if an invalid destination is
1073 * specified.
1074 * @throws InvalidSelectorException if the message selector is invalid.
1075 * @since 1.1
1076 */
1077 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1078 return createConsumer(destination, messageSelector, noLocal, null);
1079 }
1080
1081 /**
1082 * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1083 * using a message selector. This method can specify whether messages
1084 * published by its own connection should be delivered to it, if the
1085 * destination is a topic.
1086 * <P>
1087 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1088 * <CODE>Destination</CODE>, they can be used in the destination
1089 * parameter to create a <CODE>MessageConsumer</CODE>.
1090 * <P>
1091 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1092 * that have been published to a destination.
1093 * <P>
1094 * In some cases, a connection may both publish and subscribe to a topic.
1095 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1096 * inhibit the delivery of messages published by its own connection. The
1097 * default value for this attribute is False. The <CODE>noLocal</CODE>
1098 * value must be supported by destinations that are topics.
1099 *
1100 * @param destination the <CODE>Destination</CODE> to access
1101 * @param messageSelector only messages with properties matching the message
1102 * selector expression are delivered. A value of null or an
1103 * empty string indicates that there is no message selector
1104 * for the message consumer.
1105 * @param noLocal - if true, and the destination is a topic, inhibits the
1106 * delivery of messages published by its own connection. The
1107 * behavior for <CODE>NoLocal</CODE> is not specified if
1108 * the destination is a queue.
1109 * @param messageListener the listener to use for async consumption of messages
1110 * @return the MessageConsumer
1111 * @throws JMSException if the session fails to create a MessageConsumer due
1112 * to some internal error.
1113 * @throws InvalidDestinationException if an invalid destination is
1114 * specified.
1115 * @throws InvalidSelectorException if the message selector is invalid.
1116 * @since 1.1
1117 */
1118 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1119 checkClosed();
1120
1121 if (destination instanceof CustomDestination) {
1122 CustomDestination customDestination = (CustomDestination)destination;
1123 return customDestination.createConsumer(this, messageSelector, noLocal);
1124 }
1125
1126 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1127 int prefetch = 0;
1128 if (destination instanceof Topic) {
1129 prefetch = prefetchPolicy.getTopicPrefetch();
1130 } else {
1131 prefetch = prefetchPolicy.getQueuePrefetch();
1132 }
1133 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1134 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1135 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1136 }
1137
1138 /**
1139 * Creates a queue identity given a <CODE>Queue</CODE> name.
1140 * <P>
1141 * This facility is provided for the rare cases where clients need to
1142 * dynamically manipulate queue identity. It allows the creation of a queue
1143 * identity with a provider-specific name. Clients that depend on this
1144 * ability are not portable.
1145 * <P>
1146 * Note that this method is not for creating the physical queue. The
1147 * physical creation of queues is an administrative task and is not to be
1148 * initiated by the JMS API. The one exception is the creation of temporary
1149 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1150 * method.
1151 *
1152 * @param queueName the name of this <CODE>Queue</CODE>
1153 * @return a <CODE>Queue</CODE> with the given name
1154 * @throws JMSException if the session fails to create a queue due to some
1155 * internal error.
1156 * @since 1.1
1157 */
1158 public Queue createQueue(String queueName) throws JMSException {
1159 checkClosed();
1160 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1161 return new ActiveMQTempQueue(queueName);
1162 }
1163 return new ActiveMQQueue(queueName);
1164 }
1165
1166 /**
1167 * Creates a topic identity given a <CODE>Topic</CODE> name.
1168 * <P>
1169 * This facility is provided for the rare cases where clients need to
1170 * dynamically manipulate topic identity. This allows the creation of a
1171 * topic identity with a provider-specific name. Clients that depend on this
1172 * ability are not portable.
1173 * <P>
1174 * Note that this method is not for creating the physical topic. The
1175 * physical creation of topics is an administrative task and is not to be
1176 * initiated by the JMS API. The one exception is the creation of temporary
1177 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1178 * method.
1179 *
1180 * @param topicName the name of this <CODE>Topic</CODE>
1181 * @return a <CODE>Topic</CODE> with the given name
1182 * @throws JMSException if the session fails to create a topic due to some
1183 * internal error.
1184 * @since 1.1
1185 */
1186 public Topic createTopic(String topicName) throws JMSException {
1187 checkClosed();
1188 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1189 return new ActiveMQTempTopic(topicName);
1190 }
1191 return new ActiveMQTopic(topicName);
1192 }
1193
1194 /**
1195 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1196 * the specified queue.
1197 *
1198 * @param queue the <CODE>queue</CODE> to access
1199 * @exception InvalidDestinationException if an invalid destination is
1200 * specified
1201 * @since 1.1
1202 */
1203 /**
1204 * Creates a durable subscriber to the specified topic.
1205 * <P>
1206 * If a client needs to receive all the messages published on a topic,
1207 * including the ones published while the subscriber is inactive, it uses a
1208 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1209 * record of this durable subscription and insures that all messages from
1210 * the topic's publishers are retained until they are acknowledged by this
1211 * durable subscriber or they have expired.
1212 * <P>
1213 * Sessions with durable subscribers must always provide the same client
1214 * identifier. In addition, each client must specify a name that uniquely
1215 * identifies (within client identifier) each durable subscription it
1216 * creates. Only one session at a time can have a
1217 * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1218 * <P>
1219 * A client can change an existing durable subscription by creating a
1220 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1221 * and/or message selector. Changing a durable subscriber is equivalent to
1222 * unsubscribing (deleting) the old one and creating a new one.
1223 * <P>
1224 * In some cases, a connection may both publish and subscribe to a topic.
1225 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1226 * inhibit the delivery of messages published by its own connection. The
1227 * default value for this attribute is false.
1228 *
1229 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1230 * @param name the name used to identify this subscription
1231 * @return the TopicSubscriber
1232 * @throws JMSException if the session fails to create a subscriber due to
1233 * some internal error.
1234 * @throws InvalidDestinationException if an invalid topic is specified.
1235 * @since 1.1
1236 */
1237 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1238 checkClosed();
1239 return createDurableSubscriber(topic, name, null, false);
1240 }
1241
1242 /**
1243 * Creates a durable subscriber to the specified topic, using a message
1244 * selector and specifying whether messages published by its own connection
1245 * should be delivered to it.
1246 * <P>
1247 * If a client needs to receive all the messages published on a topic,
1248 * including the ones published while the subscriber is inactive, it uses a
1249 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1250 * record of this durable subscription and insures that all messages from
1251 * the topic's publishers are retained until they are acknowledged by this
1252 * durable subscriber or they have expired.
1253 * <P>
1254 * Sessions with durable subscribers must always provide the same client
1255 * identifier. In addition, each client must specify a name which uniquely
1256 * identifies (within client identifier) each durable subscription it
1257 * creates. Only one session at a time can have a
1258 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1259 * inactive durable subscriber is one that exists but does not currently
1260 * have a message consumer associated with it.
1261 * <P>
1262 * A client can change an existing durable subscription by creating a
1263 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1264 * and/or message selector. Changing a durable subscriber is equivalent to
1265 * unsubscribing (deleting) the old one and creating a new one.
1266 *
1267 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1268 * @param name the name used to identify this subscription
1269 * @param messageSelector only messages with properties matching the message
1270 * selector expression are delivered. A value of null or an
1271 * empty string indicates that there is no message selector
1272 * for the message consumer.
1273 * @param noLocal if set, inhibits the delivery of messages published by its
1274 * own connection
1275 * @return the Queue Browser
1276 * @throws JMSException if the session fails to create a subscriber due to
1277 * some internal error.
1278 * @throws InvalidDestinationException if an invalid topic is specified.
1279 * @throws InvalidSelectorException if the message selector is invalid.
1280 * @since 1.1
1281 */
1282 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1283 checkClosed();
1284
1285 if (isIndividualAcknowledge()) {
1286 throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+
1287 "INDIVIDUAL_ACKNOWLEDGE mode.", null);
1288 }
1289
1290 if (topic instanceof CustomDestination) {
1291 CustomDestination customDestination = (CustomDestination)topic;
1292 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1293 }
1294
1295 connection.checkClientIDWasManuallySpecified();
1296 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1297 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1298 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1299 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1300 noLocal, false, asyncDispatch);
1301 }
1302
1303 /**
1304 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1305 * the specified queue.
1306 *
1307 * @param queue the <CODE>queue</CODE> to access
1308 * @return the Queue Browser
1309 * @throws JMSException if the session fails to create a browser due to some
1310 * internal error.
1311 * @throws InvalidDestinationException if an invalid destination is
1312 * specified
1313 * @since 1.1
1314 */
1315 public QueueBrowser createBrowser(Queue queue) throws JMSException {
1316 checkClosed();
1317 return createBrowser(queue, null);
1318 }
1319
1320 /**
1321 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1322 * the specified queue using a message selector.
1323 *
1324 * @param queue the <CODE>queue</CODE> to access
1325 * @param messageSelector only messages with properties matching the message
1326 * selector expression are delivered. A value of null or an
1327 * empty string indicates that there is no message selector
1328 * for the message consumer.
1329 * @return the Queue Browser
1330 * @throws JMSException if the session fails to create a browser due to some
1331 * internal error.
1332 * @throws InvalidDestinationException if an invalid destination is
1333 * specified
1334 * @throws InvalidSelectorException if the message selector is invalid.
1335 * @since 1.1
1336 */
1337 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1338 checkClosed();
1339 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1340 }
1341
1342 /**
1343 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1344 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1345 *
1346 * @return a temporary queue identity
1347 * @throws JMSException if the session fails to create a temporary queue due
1348 * to some internal error.
1349 * @since 1.1
1350 */
1351 public TemporaryQueue createTemporaryQueue() throws JMSException {
1352 checkClosed();
1353 return (TemporaryQueue)connection.createTempDestination(false);
1354 }
1355
1356 /**
1357 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1358 * of the <CODE>Connection</CODE> unless it is deleted earlier.
1359 *
1360 * @return a temporary topic identity
1361 * @throws JMSException if the session fails to create a temporary topic due
1362 * to some internal error.
1363 * @since 1.1
1364 */
1365 public TemporaryTopic createTemporaryTopic() throws JMSException {
1366 checkClosed();
1367 return (TemporaryTopic)connection.createTempDestination(true);
1368 }
1369
1370 /**
1371 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1372 * the specified queue.
1373 *
1374 * @param queue the <CODE>Queue</CODE> to access
1375 * @return
1376 * @throws JMSException if the session fails to create a receiver due to
1377 * some internal error.
1378 * @throws JMSException
1379 * @throws InvalidDestinationException if an invalid queue is specified.
1380 */
1381 public QueueReceiver createReceiver(Queue queue) throws JMSException {
1382 checkClosed();
1383 return createReceiver(queue, null);
1384 }
1385
1386 /**
1387 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1388 * the specified queue using a message selector.
1389 *
1390 * @param queue the <CODE>Queue</CODE> to access
1391 * @param messageSelector only messages with properties matching the message
1392 * selector expression are delivered. A value of null or an
1393 * empty string indicates that there is no message selector
1394 * for the message consumer.
1395 * @return QueueReceiver
1396 * @throws JMSException if the session fails to create a receiver due to
1397 * some internal error.
1398 * @throws InvalidDestinationException if an invalid queue is specified.
1399 * @throws InvalidSelectorException if the message selector is invalid.
1400 */
1401 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1402 checkClosed();
1403
1404 if (queue instanceof CustomDestination) {
1405 CustomDestination customDestination = (CustomDestination)queue;
1406 return customDestination.createReceiver(this, messageSelector);
1407 }
1408
1409 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1410 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1411 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1412 }
1413
1414 /**
1415 * Creates a <CODE>QueueSender</CODE> object to send messages to the
1416 * specified queue.
1417 *
1418 * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1419 * unidentified producer
1420 * @return QueueSender
1421 * @throws JMSException if the session fails to create a sender due to some
1422 * internal error.
1423 * @throws InvalidDestinationException if an invalid queue is specified.
1424 */
1425 public QueueSender createSender(Queue queue) throws JMSException {
1426 checkClosed();
1427 if (queue instanceof CustomDestination) {
1428 CustomDestination customDestination = (CustomDestination)queue;
1429 return customDestination.createSender(this);
1430 }
1431 int timeSendOut = connection.getSendTimeout();
1432 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1433 }
1434
1435 /**
1436 * Creates a nondurable subscriber to the specified topic. <p/>
1437 * <P>
1438 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1439 * that have been published to a topic. <p/>
1440 * <P>
1441 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1442 * receive only messages that are published while they are active. <p/>
1443 * <P>
1444 * In some cases, a connection may both publish and subscribe to a topic.
1445 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1446 * inhibit the delivery of messages published by its own connection. The
1447 * default value for this attribute is false.
1448 *
1449 * @param topic the <CODE>Topic</CODE> to subscribe to
1450 * @return TopicSubscriber
1451 * @throws JMSException if the session fails to create a subscriber due to
1452 * some internal error.
1453 * @throws InvalidDestinationException if an invalid topic is specified.
1454 */
1455 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1456 checkClosed();
1457 return createSubscriber(topic, null, false);
1458 }
1459
1460 /**
1461 * Creates a nondurable subscriber to the specified topic, using a message
1462 * selector or specifying whether messages published by its own connection
1463 * should be delivered to it. <p/>
1464 * <P>
1465 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1466 * that have been published to a topic. <p/>
1467 * <P>
1468 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1469 * receive only messages that are published while they are active. <p/>
1470 * <P>
1471 * Messages filtered out by a subscriber's message selector will never be
1472 * delivered to the subscriber. From the subscriber's perspective, they do
1473 * not exist. <p/>
1474 * <P>
1475 * In some cases, a connection may both publish and subscribe to a topic.
1476 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1477 * inhibit the delivery of messages published by its own connection. The
1478 * default value for this attribute is false.
1479 *
1480 * @param topic the <CODE>Topic</CODE> to subscribe to
1481 * @param messageSelector only messages with properties matching the message
1482 * selector expression are delivered. A value of null or an
1483 * empty string indicates that there is no message selector
1484 * for the message consumer.
1485 * @param noLocal if set, inhibits the delivery of messages published by its
1486 * own connection
1487 * @return TopicSubscriber
1488 * @throws JMSException if the session fails to create a subscriber due to
1489 * some internal error.
1490 * @throws InvalidDestinationException if an invalid topic is specified.
1491 * @throws InvalidSelectorException if the message selector is invalid.
1492 */
1493 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1494 checkClosed();
1495
1496 if (topic instanceof CustomDestination) {
1497 CustomDestination customDestination = (CustomDestination)topic;
1498 return customDestination.createSubscriber(this, messageSelector, noLocal);
1499 }
1500
1501 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1502 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1503 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1504 }
1505
1506 /**
1507 * Creates a publisher for the specified topic. <p/>
1508 * <P>
1509 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1510 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1511 * a topic, it defines a new sequence of messages that have no ordering
1512 * relationship with the messages it has previously sent.
1513 *
1514 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1515 * an unidentified producer
1516 * @return TopicPublisher
1517 * @throws JMSException if the session fails to create a publisher due to
1518 * some internal error.
1519 * @throws InvalidDestinationException if an invalid topic is specified.
1520 */
1521 public TopicPublisher createPublisher(Topic topic) throws JMSException {
1522 checkClosed();
1523
1524 if (topic instanceof CustomDestination) {
1525 CustomDestination customDestination = (CustomDestination)topic;
1526 return customDestination.createPublisher(this);
1527 }
1528 int timeSendOut = connection.getSendTimeout();
1529 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1530 }
1531
1532 /**
1533 * Unsubscribes a durable subscription that has been created by a client.
1534 * <P>
1535 * This method deletes the state being maintained on behalf of the
1536 * subscriber by its provider.
1537 * <P>
1538 * It is erroneous for a client to delete a durable subscription while there
1539 * is an active <CODE>MessageConsumer </CODE> or
1540 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1541 * message is part of a pending transaction or has not been acknowledged in
1542 * the session.
1543 *
1544 * @param name the name used to identify this subscription
1545 * @throws JMSException if the session fails to unsubscribe to the durable
1546 * subscription due to some internal error.
1547 * @throws InvalidDestinationException if an invalid subscription name is
1548 * specified.
1549 * @since 1.1
1550 */
1551 public void unsubscribe(String name) throws JMSException {
1552 checkClosed();
1553 connection.unsubscribe(name);
1554 }
1555
1556 public void dispatch(MessageDispatch messageDispatch) {
1557 try {
1558 executor.execute(messageDispatch);
1559 } catch (InterruptedException e) {
1560 Thread.currentThread().interrupt();
1561 connection.onClientInternalException(e);
1562 }
1563 }
1564
1565 /**
1566 * Acknowledges all consumed messages of the session of this consumed
1567 * message.
1568 * <P>
1569 * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1570 * for use when a client has specified that its JMS session's consumed
1571 * messages are to be explicitly acknowledged. By invoking
1572 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1573 * all messages consumed by the session that the message was delivered to.
1574 * <P>
1575 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1576 * sessions and sessions specified to use implicit acknowledgement modes.
1577 * <P>
1578 * A client may individually acknowledge each message as it is consumed, or
1579 * it may choose to acknowledge messages as an application-defined group
1580 * (which is done by calling acknowledge on the last received message of the
1581 * group, thereby acknowledging all messages consumed by the session.)
1582 * <P>
1583 * Messages that have been received but not acknowledged may be redelivered.
1584 *
1585 * @throws JMSException if the JMS provider fails to acknowledge the
1586 * messages due to some internal error.
1587 * @throws javax.jms.IllegalStateException if this method is called on a
1588 * closed session.
1589 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1590 */
1591 public void acknowledge() throws JMSException {
1592 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1593 ActiveMQMessageConsumer c = iter.next();
1594 c.acknowledge();
1595 }
1596 }
1597
1598 /**
1599 * Add a message consumer.
1600 *
1601 * @param consumer - message consumer.
1602 * @throws JMSException
1603 */
1604 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1605 this.consumers.add(consumer);
1606 if (consumer.isDurableSubscriber()) {
1607 stats.onCreateDurableSubscriber();
1608 }
1609 this.connection.addDispatcher(consumer.getConsumerId(), this);
1610 }
1611
1612 /**
1613 * Remove the message consumer.
1614 *
1615 * @param consumer - consumer to be removed.
1616 * @throws JMSException
1617 */
1618 protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1619 this.connection.removeDispatcher(consumer.getConsumerId());
1620 if (consumer.isDurableSubscriber()) {
1621 stats.onRemoveDurableSubscriber();
1622 }
1623 this.consumers.remove(consumer);
1624 this.connection.removeDispatcher(consumer);
1625 }
1626
1627 /**
1628 * Adds a message producer.
1629 *
1630 * @param producer - message producer to be added.
1631 * @throws JMSException
1632 */
1633 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1634 this.producers.add(producer);
1635 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1636 }
1637
1638 /**
1639 * Removes a message producer.
1640 *
1641 * @param producer - message producer to be removed.
1642 * @throws JMSException
1643 */
1644 protected void removeProducer(ActiveMQMessageProducer producer) {
1645 this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1646 this.producers.remove(producer);
1647 }
1648
1649 /**
1650 * Start this Session.
1651 *
1652 * @throws JMSException
1653 */
1654 protected void start() throws JMSException {
1655 started.set(true);
1656 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1657 ActiveMQMessageConsumer c = iter.next();
1658 c.start();
1659 }
1660 executor.start();
1661 }
1662
1663 /**
1664 * Stops this session.
1665 *
1666 * @throws JMSException
1667 */
1668 protected void stop() throws JMSException {
1669
1670 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1671 ActiveMQMessageConsumer c = iter.next();
1672 c.stop();
1673 }
1674
1675 started.set(false);
1676 executor.stop();
1677 }
1678
1679 /**
1680 * Returns the session id.
1681 *
1682 * @return value - session id.
1683 */
1684 protected SessionId getSessionId() {
1685 return info.getSessionId();
1686 }
1687
1688 /**
1689 * @return
1690 */
1691 protected ConsumerId getNextConsumerId() {
1692 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1693 }
1694
1695 /**
1696 * @return
1697 */
1698 protected ProducerId getNextProducerId() {
1699 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1700 }
1701
1702 /**
1703 * Sends the message for dispatch by the broker.
1704 *
1705 *
1706 * @param producer - message producer.
1707 * @param destination - message destination.
1708 * @param message - message to be sent.
1709 * @param deliveryMode - JMS messsage delivery mode.
1710 * @param priority - message priority.
1711 * @param timeToLive - message expiration.
1712 * @param producerWindow
1713 * @param onComplete
1714 * @throws JMSException
1715 */
1716 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1717 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1718
1719 checkClosed();
1720 if (destination.isTemporary() && connection.isDeleted(destination)) {
1721 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1722 }
1723 synchronized (sendMutex) {
1724 // tell the Broker we are about to start a new transaction
1725 doStartTransaction();
1726 TransactionId txid = transactionContext.getTransactionId();
1727 long sequenceNumber = producer.getMessageSequence();
1728
1729 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1730 message.setJMSDeliveryMode(deliveryMode);
1731 long expiration = 0L;
1732 if (!producer.getDisableMessageTimestamp()) {
1733 long timeStamp = System.currentTimeMillis();
1734 message.setJMSTimestamp(timeStamp);
1735 if (timeToLive > 0) {
1736 expiration = timeToLive + timeStamp;
1737 }
1738 }
1739 message.setJMSExpiration(expiration);
1740 message.setJMSPriority(priority);
1741 message.setJMSRedelivered(false);
1742
1743 // transform to our own message format here
1744 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1745
1746 // Set the message id.
1747 if (msg == message) {
1748 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1749 } else {
1750 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1751 message.setJMSMessageID(msg.getMessageId().toString());
1752 }
1753 //clear the brokerPath in case we are re-sending this message
1754 msg.setBrokerPath(null);
1755 // destination format is provider specific so only set on transformed message
1756 msg.setJMSDestination(destination);
1757
1758 msg.setTransactionId(txid);
1759 if (connection.isCopyMessageOnSend()) {
1760 msg = (ActiveMQMessage)msg.copy();
1761 }
1762 msg.setConnection(connection);
1763 msg.onSend();
1764 msg.setProducerId(msg.getMessageId().getProducerId());
1765 if (LOG.isTraceEnabled()) {
1766 LOG.trace(getSessionId() + " sending message: " + msg);
1767 }
1768 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1769 this.connection.asyncSendPacket(msg);
1770 if (producerWindow != null) {
1771 // Since we defer lots of the marshaling till we hit the
1772 // wire, this might not
1773 // provide and accurate size. We may change over to doing
1774 // more aggressive marshaling,
1775 // to get more accurate sizes.. this is more important once
1776 // users start using producer window
1777 // flow control.
1778 int size = msg.getSize();
1779 producerWindow.increaseUsage(size);
1780 }
1781 } else {
1782 if (sendTimeout > 0 && onComplete==null) {
1783 this.connection.syncSendPacket(msg,sendTimeout);
1784 }else {
1785 this.connection.syncSendPacket(msg, onComplete);
1786 }
1787 }
1788
1789 }
1790 }
1791
1792 /**
1793 * Send TransactionInfo to indicate transaction has started
1794 *
1795 * @throws JMSException if some internal error occurs
1796 */
1797 protected void doStartTransaction() throws JMSException {
1798 if (getTransacted() && !transactionContext.isInXATransaction()) {
1799 transactionContext.begin();
1800 }
1801 }
1802
1803 /**
1804 * Checks whether the session has unconsumed messages.
1805 *
1806 * @return true - if there are unconsumed messages.
1807 */
1808 public boolean hasUncomsumedMessages() {
1809 return executor.hasUncomsumedMessages();
1810 }
1811
1812 /**
1813 * Checks whether the session uses transactions.
1814 *
1815 * @return true - if the session uses transactions.
1816 */
1817 public boolean isTransacted() {
1818 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1819 }
1820
1821 /**
1822 * Checks whether the session used client acknowledgment.
1823 *
1824 * @return true - if the session uses client acknowledgment.
1825 */
1826 protected boolean isClientAcknowledge() {
1827 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1828 }
1829
1830 /**
1831 * Checks whether the session used auto acknowledgment.
1832 *
1833 * @return true - if the session uses client acknowledgment.
1834 */
1835 public boolean isAutoAcknowledge() {
1836 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1837 }
1838
1839 /**
1840 * Checks whether the session used dup ok acknowledgment.
1841 *
1842 * @return true - if the session uses client acknowledgment.
1843 */
1844 public boolean isDupsOkAcknowledge() {
1845 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1846 }
1847
1848 public boolean isIndividualAcknowledge(){
1849 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1850 }
1851
1852 /**
1853 * Returns the message delivery listener.
1854 *
1855 * @return deliveryListener - message delivery listener.
1856 */
1857 public DeliveryListener getDeliveryListener() {
1858 return deliveryListener;
1859 }
1860
1861 /**
1862 * Sets the message delivery listener.
1863 *
1864 * @param deliveryListener - message delivery listener.
1865 */
1866 public void setDeliveryListener(DeliveryListener deliveryListener) {
1867 this.deliveryListener = deliveryListener;
1868 }
1869
1870 /**
1871 * Returns the SessionInfo bean.
1872 *
1873 * @return info - SessionInfo bean.
1874 * @throws JMSException
1875 */
1876 protected SessionInfo getSessionInfo() throws JMSException {
1877 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1878 return info;
1879 }
1880
1881 /**
1882 * Send the asynchronus command.
1883 *
1884 * @param command - command to be executed.
1885 * @throws JMSException
1886 */
1887 public void asyncSendPacket(Command command) throws JMSException {
1888 connection.asyncSendPacket(command);
1889 }
1890
1891 /**
1892 * Send the synchronus command.
1893 *
1894 * @param command - command to be executed.
1895 * @return Response
1896 * @throws JMSException
1897 */
1898 public Response syncSendPacket(Command command) throws JMSException {
1899 return connection.syncSendPacket(command);
1900 }
1901
1902 public long getNextDeliveryId() {
1903 return deliveryIdGenerator.getNextSequenceId();
1904 }
1905
1906 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1907
1908 List<MessageDispatch> c = unconsumedMessages.removeAll();
1909 for (MessageDispatch md : c) {
1910 this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1911 }
1912 Collections.reverse(c);
1913
1914 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1915 MessageDispatch md = iter.next();
1916 executor.executeFirst(md);
1917 }
1918
1919 }
1920
1921 public boolean isRunning() {
1922 return started.get();
1923 }
1924
1925 public boolean isAsyncDispatch() {
1926 return asyncDispatch;
1927 }
1928
1929 public void setAsyncDispatch(boolean asyncDispatch) {
1930 this.asyncDispatch = asyncDispatch;
1931 }
1932
1933 /**
1934 * @return Returns the sessionAsyncDispatch.
1935 */
1936 public boolean isSessionAsyncDispatch() {
1937 return sessionAsyncDispatch;
1938 }
1939
1940 /**
1941 * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1942 */
1943 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1944 this.sessionAsyncDispatch = sessionAsyncDispatch;
1945 }
1946
1947 public MessageTransformer getTransformer() {
1948 return transformer;
1949 }
1950
1951 public ActiveMQConnection getConnection() {
1952 return connection;
1953 }
1954
1955 /**
1956 * Sets the transformer used to transform messages before they are sent on
1957 * to the JMS bus or when they are received from the bus but before they are
1958 * delivered to the JMS client
1959 */
1960 public void setTransformer(MessageTransformer transformer) {
1961 this.transformer = transformer;
1962 }
1963
1964 public BlobTransferPolicy getBlobTransferPolicy() {
1965 return blobTransferPolicy;
1966 }
1967
1968 /**
1969 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1970 * OBjects) are transferred from producers to brokers to consumers
1971 */
1972 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1973 this.blobTransferPolicy = blobTransferPolicy;
1974 }
1975
1976 public List<MessageDispatch> getUnconsumedMessages() {
1977 return executor.getUnconsumedMessages();
1978 }
1979
1980 @Override
1981 public String toString() {
1982 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1983 }
1984
1985 public void checkMessageListener() throws JMSException {
1986 if (messageListener != null) {
1987 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1988 }
1989 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
1990 ActiveMQMessageConsumer consumer = i.next();
1991 if (consumer.getMessageListener() != null) {
1992 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1993 }
1994 }
1995 }
1996
1997 protected void setOptimizeAcknowledge(boolean value) {
1998 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1999 ActiveMQMessageConsumer c = iter.next();
2000 c.setOptimizeAcknowledge(value);
2001 }
2002 }
2003
2004 protected void setPrefetchSize(ConsumerId id, int prefetch) {
2005 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2006 ActiveMQMessageConsumer c = iter.next();
2007 if (c.getConsumerId().equals(id)) {
2008 c.setPrefetchSize(prefetch);
2009 break;
2010 }
2011 }
2012 }
2013
2014 protected void close(ConsumerId id) {
2015 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2016 ActiveMQMessageConsumer c = iter.next();
2017 if (c.getConsumerId().equals(id)) {
2018 try {
2019 c.close();
2020 } catch (JMSException e) {
2021 LOG.warn("Exception closing consumer", e);
2022 }
2023 LOG.warn("Closed consumer on Command");
2024 break;
2025 }
2026 }
2027 }
2028
2029 public boolean isInUse(ActiveMQTempDestination destination) {
2030 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2031 ActiveMQMessageConsumer c = iter.next();
2032 if (c.isInUse(destination)) {
2033 return true;
2034 }
2035 }
2036 return false;
2037 }
2038
2039 /**
2040 * highest sequence id of the last message delivered by this session.
2041 * Passed to the broker in the close command, maintained by dispose()
2042 * @return lastDeliveredSequenceId
2043 */
2044 public long getLastDeliveredSequenceId() {
2045 return lastDeliveredSequenceId;
2046 }
2047
2048 protected void sendAck(MessageAck ack) throws JMSException {
2049 sendAck(ack,false);
2050 }
2051
2052 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2053 if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2054 asyncSendPacket(ack);
2055 } else {
2056 syncSendPacket(ack);
2057 }
2058 }
2059
2060 protected Scheduler getScheduler() throws JMSException {
2061 return this.connection.getScheduler();
2062 }
2063
2064 protected ThreadPoolExecutor getConnectionExecutor() {
2065 return this.connectionExecutor;
2066 }
2067 }