001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.io.OutputStream;
022 import java.net.URI;
023 import java.net.URISyntaxException;
024 import java.util.HashMap;
025 import java.util.Iterator;
026 import java.util.Map;
027 import java.util.concurrent.*;
028 import java.util.concurrent.atomic.AtomicBoolean;
029 import java.util.concurrent.atomic.AtomicInteger;
030
031 import javax.jms.Connection;
032 import javax.jms.ConnectionConsumer;
033 import javax.jms.ConnectionMetaData;
034 import javax.jms.DeliveryMode;
035 import javax.jms.Destination;
036 import javax.jms.ExceptionListener;
037 import javax.jms.IllegalStateException;
038 import javax.jms.InvalidDestinationException;
039 import javax.jms.JMSException;
040 import javax.jms.Queue;
041 import javax.jms.QueueConnection;
042 import javax.jms.QueueSession;
043 import javax.jms.ServerSessionPool;
044 import javax.jms.Session;
045 import javax.jms.Topic;
046 import javax.jms.TopicConnection;
047 import javax.jms.TopicSession;
048 import javax.jms.XAConnection;
049
050 import org.apache.activemq.advisory.DestinationSource;
051 import org.apache.activemq.blob.BlobTransferPolicy;
052 import org.apache.activemq.command.ActiveMQDestination;
053 import org.apache.activemq.command.ActiveMQMessage;
054 import org.apache.activemq.command.ActiveMQTempDestination;
055 import org.apache.activemq.command.ActiveMQTempQueue;
056 import org.apache.activemq.command.ActiveMQTempTopic;
057 import org.apache.activemq.command.BrokerInfo;
058 import org.apache.activemq.command.Command;
059 import org.apache.activemq.command.CommandTypes;
060 import org.apache.activemq.command.ConnectionControl;
061 import org.apache.activemq.command.ConnectionError;
062 import org.apache.activemq.command.ConnectionId;
063 import org.apache.activemq.command.ConnectionInfo;
064 import org.apache.activemq.command.ConsumerControl;
065 import org.apache.activemq.command.ConsumerId;
066 import org.apache.activemq.command.ConsumerInfo;
067 import org.apache.activemq.command.ControlCommand;
068 import org.apache.activemq.command.DestinationInfo;
069 import org.apache.activemq.command.ExceptionResponse;
070 import org.apache.activemq.command.Message;
071 import org.apache.activemq.command.MessageDispatch;
072 import org.apache.activemq.command.MessageId;
073 import org.apache.activemq.command.ProducerAck;
074 import org.apache.activemq.command.ProducerId;
075 import org.apache.activemq.command.RemoveInfo;
076 import org.apache.activemq.command.RemoveSubscriptionInfo;
077 import org.apache.activemq.command.Response;
078 import org.apache.activemq.command.SessionId;
079 import org.apache.activemq.command.ShutdownInfo;
080 import org.apache.activemq.command.WireFormatInfo;
081 import org.apache.activemq.management.JMSConnectionStatsImpl;
082 import org.apache.activemq.management.JMSStatsImpl;
083 import org.apache.activemq.management.StatsCapable;
084 import org.apache.activemq.management.StatsImpl;
085 import org.apache.activemq.state.CommandVisitorAdapter;
086 import org.apache.activemq.thread.Scheduler;
087 import org.apache.activemq.thread.TaskRunnerFactory;
088 import org.apache.activemq.transport.FutureResponse;
089 import org.apache.activemq.transport.ResponseCallback;
090 import org.apache.activemq.transport.Transport;
091 import org.apache.activemq.transport.TransportListener;
092 import org.apache.activemq.transport.failover.FailoverTransport;
093 import org.apache.activemq.util.IdGenerator;
094 import org.apache.activemq.util.IntrospectionSupport;
095 import org.apache.activemq.util.JMSExceptionSupport;
096 import org.apache.activemq.util.LongSequenceGenerator;
097 import org.apache.activemq.util.ServiceSupport;
098 import org.slf4j.Logger;
099 import org.slf4j.LoggerFactory;
100
101 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
102
103 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
104 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
105 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
106
107 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
108
109 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
110
111 protected boolean dispatchAsync=true;
112 protected boolean alwaysSessionAsync = true;
113
114 private TaskRunnerFactory sessionTaskRunner;
115 private final ThreadPoolExecutor executor;
116
117 // Connection state variables
118 private final ConnectionInfo info;
119 private ExceptionListener exceptionListener;
120 private ClientInternalExceptionListener clientInternalExceptionListener;
121 private boolean clientIDSet;
122 private boolean isConnectionInfoSentToBroker;
123 private boolean userSpecifiedClientID;
124
125 // Configuration options variables
126 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
127 private BlobTransferPolicy blobTransferPolicy;
128 private RedeliveryPolicy redeliveryPolicy;
129 private MessageTransformer transformer;
130
131 private boolean disableTimeStampsByDefault;
132 private boolean optimizedMessageDispatch = true;
133 private boolean copyMessageOnSend = true;
134 private boolean useCompression;
135 private boolean objectMessageSerializationDefered;
136 private boolean useAsyncSend;
137 private boolean optimizeAcknowledge;
138 private long optimizeAcknowledgeTimeOut = 0;
139 private boolean nestedMapAndListEnabled = true;
140 private boolean useRetroactiveConsumer;
141 private boolean exclusiveConsumer;
142 private boolean alwaysSyncSend;
143 private int closeTimeout = 15000;
144 private boolean watchTopicAdvisories = true;
145 private long warnAboutUnstartedConnectionTimeout = 500L;
146 private int sendTimeout =0;
147 private boolean sendAcksAsync=true;
148 private boolean checkForDuplicates = true;
149
150 private final Transport transport;
151 private final IdGenerator clientIdGenerator;
152 private final JMSStatsImpl factoryStats;
153 private final JMSConnectionStatsImpl stats;
154
155 private final AtomicBoolean started = new AtomicBoolean(false);
156 private final AtomicBoolean closing = new AtomicBoolean(false);
157 private final AtomicBoolean closed = new AtomicBoolean(false);
158 private final AtomicBoolean transportFailed = new AtomicBoolean(false);
159 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
160 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
161 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
162 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
163 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
164
165 // Maps ConsumerIds to ActiveMQConsumer objects
166 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
167 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
168 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
169 private final SessionId connectionSessionId;
170 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
171 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
172 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
173 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
174
175 private AdvisoryConsumer advisoryConsumer;
176 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
177 private BrokerInfo brokerInfo;
178 private IOException firstFailureError;
179 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
180
181 // Assume that protocol is the latest. Change to the actual protocol
182 // version when a WireFormatInfo is received.
183 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
184 private final long timeCreated;
185 private final ConnectionAudit connectionAudit = new ConnectionAudit();
186 private DestinationSource destinationSource;
187 private final Object ensureConnectionInfoSentMutex = new Object();
188 private boolean useDedicatedTaskRunner;
189 protected volatile CountDownLatch transportInterruptionProcessingComplete;
190 private long consumerFailoverRedeliveryWaitPeriod;
191 private Scheduler scheduler;
192 private boolean messagePrioritySupported = true;
193 private boolean transactedIndividualAck = false;
194 private boolean nonBlockingRedelivery = false;
195
196 /**
197 * Construct an <code>ActiveMQConnection</code>
198 *
199 * @param transport
200 * @param factoryStats
201 * @throws Exception
202 */
203 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
204
205 this.transport = transport;
206 this.clientIdGenerator = clientIdGenerator;
207 this.factoryStats = factoryStats;
208
209 // Configure a single threaded executor who's core thread can timeout if
210 // idle
211 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
212 public Thread newThread(Runnable r) {
213 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
214 //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
215 //thread.setDaemon(true);
216 return thread;
217 }
218 });
219 // asyncConnectionThread.allowCoreThreadTimeOut(true);
220 String uniqueId = connectionIdGenerator.generateId();
221 this.info = new ConnectionInfo(new ConnectionId(uniqueId));
222 this.info.setManageable(true);
223 this.info.setFaultTolerant(transport.isFaultTolerant());
224 this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
225
226 this.transport.setTransportListener(this);
227
228 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
229 this.factoryStats.addConnection(this);
230 this.timeCreated = System.currentTimeMillis();
231 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
232 }
233
234 protected void setUserName(String userName) {
235 this.info.setUserName(userName);
236 }
237
238 protected void setPassword(String password) {
239 this.info.setPassword(password);
240 }
241
242 /**
243 * A static helper method to create a new connection
244 *
245 * @return an ActiveMQConnection
246 * @throws JMSException
247 */
248 public static ActiveMQConnection makeConnection() throws JMSException {
249 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
250 return (ActiveMQConnection)factory.createConnection();
251 }
252
253 /**
254 * A static helper method to create a new connection
255 *
256 * @param uri
257 * @return and ActiveMQConnection
258 * @throws JMSException
259 */
260 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
261 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
262 return (ActiveMQConnection)factory.createConnection();
263 }
264
265 /**
266 * A static helper method to create a new connection
267 *
268 * @param user
269 * @param password
270 * @param uri
271 * @return an ActiveMQConnection
272 * @throws JMSException
273 */
274 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
275 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
276 return (ActiveMQConnection)factory.createConnection();
277 }
278
279 /**
280 * @return a number unique for this connection
281 */
282 public JMSConnectionStatsImpl getConnectionStats() {
283 return stats;
284 }
285
286 /**
287 * Creates a <CODE>Session</CODE> object.
288 *
289 * @param transacted indicates whether the session is transacted
290 * @param acknowledgeMode indicates whether the consumer or the client will
291 * acknowledge any messages it receives; ignored if the
292 * session is transacted. Legal values are
293 * <code>Session.AUTO_ACKNOWLEDGE</code>,
294 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
295 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
296 * @return a newly created session
297 * @throws JMSException if the <CODE>Connection</CODE> object fails to
298 * create a session due to some internal error or lack of
299 * support for the specific transaction and acknowledgement
300 * mode.
301 * @see Session#AUTO_ACKNOWLEDGE
302 * @see Session#CLIENT_ACKNOWLEDGE
303 * @see Session#DUPS_OK_ACKNOWLEDGE
304 * @since 1.1
305 */
306 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
307 checkClosedOrFailed();
308 ensureConnectionInfoSent();
309 if(!transacted) {
310 if (acknowledgeMode==Session.SESSION_TRANSACTED) {
311 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
312 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
313 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
314 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
315 }
316 }
317 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
318 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
319 }
320
321 /**
322 * @return sessionId
323 */
324 protected SessionId getNextSessionId() {
325 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
326 }
327
328 /**
329 * Gets the client identifier for this connection.
330 * <P>
331 * This value is specific to the JMS provider. It is either preconfigured by
332 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
333 * dynamically by the application by calling the <code>setClientID</code>
334 * method.
335 *
336 * @return the unique client identifier
337 * @throws JMSException if the JMS provider fails to return the client ID
338 * for this connection due to some internal error.
339 */
340 public String getClientID() throws JMSException {
341 checkClosedOrFailed();
342 return this.info.getClientId();
343 }
344
345 /**
346 * Sets the client identifier for this connection.
347 * <P>
348 * The preferred way to assign a JMS client's client identifier is for it to
349 * be configured in a client-specific <CODE>ConnectionFactory</CODE>
350 * object and transparently assigned to the <CODE>Connection</CODE> object
351 * it creates.
352 * <P>
353 * Alternatively, a client can set a connection's client identifier using a
354 * provider-specific value. The facility to set a connection's client
355 * identifier explicitly is not a mechanism for overriding the identifier
356 * that has been administratively configured. It is provided for the case
357 * where no administratively specified identifier exists. If one does exist,
358 * an attempt to change it by setting it must throw an
359 * <CODE>IllegalStateException</CODE>. If a client sets the client
360 * identifier explicitly, it must do so immediately after it creates the
361 * connection and before any other action on the connection is taken. After
362 * this point, setting the client identifier is a programming error that
363 * should throw an <CODE>IllegalStateException</CODE>.
364 * <P>
365 * The purpose of the client identifier is to associate a connection and its
366 * objects with a state maintained on behalf of the client by a provider.
367 * The only such state identified by the JMS API is that required to support
368 * durable subscriptions.
369 * <P>
370 * If another connection with the same <code>clientID</code> is already
371 * running when this method is called, the JMS provider should detect the
372 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
373 *
374 * @param newClientID the unique client identifier
375 * @throws JMSException if the JMS provider fails to set the client ID for
376 * this connection due to some internal error.
377 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
378 * invalid or duplicate client ID.
379 * @throws javax.jms.IllegalStateException if the JMS client attempts to set
380 * a connection's client ID at the wrong time or when it has
381 * been administratively configured.
382 */
383 public void setClientID(String newClientID) throws JMSException {
384 checkClosedOrFailed();
385
386 if (this.clientIDSet) {
387 throw new IllegalStateException("The clientID has already been set");
388 }
389
390 if (this.isConnectionInfoSentToBroker) {
391 throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
392 }
393
394 this.info.setClientId(newClientID);
395 this.userSpecifiedClientID = true;
396 ensureConnectionInfoSent();
397 }
398
399 /**
400 * Sets the default client id that the connection will use if explicitly not
401 * set with the setClientId() call.
402 */
403 public void setDefaultClientID(String clientID) throws JMSException {
404 this.info.setClientId(clientID);
405 this.userSpecifiedClientID = true;
406 }
407
408 /**
409 * Gets the metadata for this connection.
410 *
411 * @return the connection metadata
412 * @throws JMSException if the JMS provider fails to get the connection
413 * metadata for this connection.
414 * @see javax.jms.ConnectionMetaData
415 */
416 public ConnectionMetaData getMetaData() throws JMSException {
417 checkClosedOrFailed();
418 return ActiveMQConnectionMetaData.INSTANCE;
419 }
420
421 /**
422 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
423 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
424 * associated with it.
425 *
426 * @return the <CODE>ExceptionListener</CODE> for this connection, or
427 * null, if no <CODE>ExceptionListener</CODE> is associated with
428 * this connection.
429 * @throws JMSException if the JMS provider fails to get the
430 * <CODE>ExceptionListener</CODE> for this connection.
431 * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
432 */
433 public ExceptionListener getExceptionListener() throws JMSException {
434 checkClosedOrFailed();
435 return this.exceptionListener;
436 }
437
438 /**
439 * Sets an exception listener for this connection.
440 * <P>
441 * If a JMS provider detects a serious problem with a connection, it informs
442 * the connection's <CODE> ExceptionListener</CODE>, if one has been
443 * registered. It does this by calling the listener's <CODE>onException
444 * </CODE>
445 * method, passing it a <CODE>JMSException</CODE> object describing the
446 * problem.
447 * <P>
448 * An exception listener allows a client to be notified of a problem
449 * asynchronously. Some connections only consume messages, so they would
450 * have no other way to learn their connection has failed.
451 * <P>
452 * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
453 * <P>
454 * A JMS provider should attempt to resolve connection problems itself
455 * before it notifies the client of them.
456 *
457 * @param listener the exception listener
458 * @throws JMSException if the JMS provider fails to set the exception
459 * listener for this connection.
460 */
461 public void setExceptionListener(ExceptionListener listener) throws JMSException {
462 checkClosedOrFailed();
463 this.exceptionListener = listener;
464 }
465
466 /**
467 * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
468 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
469 * associated with it.
470 *
471 * @return the listener or <code>null</code> if no listener is registered with the connection.
472 */
473 public ClientInternalExceptionListener getClientInternalExceptionListener()
474 {
475 return clientInternalExceptionListener;
476 }
477
478 /**
479 * Sets a client internal exception listener for this connection.
480 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
481 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
482 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
483 * describing the problem.
484 *
485 * @param listener the exception listener
486 */
487 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
488 {
489 this.clientInternalExceptionListener = listener;
490 }
491
492 /**
493 * Starts (or restarts) a connection's delivery of incoming messages. A call
494 * to <CODE>start</CODE> on a connection that has already been started is
495 * ignored.
496 *
497 * @throws JMSException if the JMS provider fails to start message delivery
498 * due to some internal error.
499 * @see javax.jms.Connection#stop()
500 */
501 public void start() throws JMSException {
502 checkClosedOrFailed();
503 ensureConnectionInfoSent();
504 if (started.compareAndSet(false, true)) {
505 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
506 ActiveMQSession session = i.next();
507 session.start();
508 }
509 }
510 }
511
512 /**
513 * Temporarily stops a connection's delivery of incoming messages. Delivery
514 * can be restarted using the connection's <CODE>start</CODE> method. When
515 * the connection is stopped, delivery to all the connection's message
516 * consumers is inhibited: synchronous receives block, and messages are not
517 * delivered to message listeners.
518 * <P>
519 * This call blocks until receives and/or message listeners in progress have
520 * completed.
521 * <P>
522 * Stopping a connection has no effect on its ability to send messages. A
523 * call to <CODE>stop</CODE> on a connection that has already been stopped
524 * is ignored.
525 * <P>
526 * A call to <CODE>stop</CODE> must not return until delivery of messages
527 * has paused. This means that a client can rely on the fact that none of
528 * its message listeners will be called and that all threads of control
529 * waiting for <CODE>receive</CODE> calls to return will not return with a
530 * message until the connection is restarted. The receive timers for a
531 * stopped connection continue to advance, so receives may time out while
532 * the connection is stopped.
533 * <P>
534 * If message listeners are running when <CODE>stop</CODE> is invoked, the
535 * <CODE>stop</CODE> call must wait until all of them have returned before
536 * it may return. While these message listeners are completing, they must
537 * have the full services of the connection available to them.
538 *
539 * @throws JMSException if the JMS provider fails to stop message delivery
540 * due to some internal error.
541 * @see javax.jms.Connection#start()
542 */
543 public void stop() throws JMSException {
544 checkClosedOrFailed();
545 if (started.compareAndSet(true, false)) {
546 synchronized(sessions) {
547 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
548 ActiveMQSession s = i.next();
549 s.stop();
550 }
551 }
552 }
553 }
554
555 /**
556 * Closes the connection.
557 * <P>
558 * Since a provider typically allocates significant resources outside the
559 * JVM on behalf of a connection, clients should close these resources when
560 * they are not needed. Relying on garbage collection to eventually reclaim
561 * these resources may not be timely enough.
562 * <P>
563 * There is no need to close the sessions, producers, and consumers of a
564 * closed connection.
565 * <P>
566 * Closing a connection causes all temporary destinations to be deleted.
567 * <P>
568 * When this method is invoked, it should not return until message
569 * processing has been shut down in an orderly fashion. This means that all
570 * message listeners that may have been running have returned, and that all
571 * pending receives have returned. A close terminates all pending message
572 * receives on the connection's sessions' consumers. The receives may return
573 * with a message or with null, depending on whether there was a message
574 * available at the time of the close. If one or more of the connection's
575 * sessions' message listeners is processing a message at the time when
576 * connection <CODE>close</CODE> is invoked, all the facilities of the
577 * connection and its sessions must remain available to those listeners
578 * until they return control to the JMS provider.
579 * <P>
580 * Closing a connection causes any of its sessions' transactions in progress
581 * to be rolled back. In the case where a session's work is coordinated by
582 * an external transaction manager, a session's <CODE>commit</CODE> and
583 * <CODE> rollback</CODE> methods are not used and the result of a closed
584 * session's work is determined later by the transaction manager. Closing a
585 * connection does NOT force an acknowledgment of client-acknowledged
586 * sessions.
587 * <P>
588 * Invoking the <CODE>acknowledge</CODE> method of a received message from
589 * a closed connection's session must throw an
590 * <CODE>IllegalStateException</CODE>. Closing a closed connection must
591 * NOT throw an exception.
592 *
593 * @throws JMSException if the JMS provider fails to close the connection
594 * due to some internal error. For example, a failure to
595 * release resources or to close a socket connection can
596 * cause this exception to be thrown.
597 */
598 public void close() throws JMSException {
599 // Store the interrupted state and clear so that cleanup happens without
600 // leaking connection resources. Reset in finally to preserve state.
601 boolean interrupted = Thread.interrupted();
602
603 try {
604
605 // If we were running, lets stop first.
606 if (!closed.get() && !transportFailed.get()) {
607 stop();
608 }
609
610 synchronized (this) {
611 if (!closed.get()) {
612 closing.set(true);
613
614 if (destinationSource != null) {
615 destinationSource.stop();
616 destinationSource = null;
617 }
618 if (advisoryConsumer != null) {
619 advisoryConsumer.dispose();
620 advisoryConsumer = null;
621 }
622
623 Scheduler scheduler = this.scheduler;
624 if (scheduler != null) {
625 try {
626 scheduler.stop();
627 } catch (Exception e) {
628 JMSException ex = JMSExceptionSupport.create(e);
629 throw ex;
630 }
631 }
632
633 long lastDeliveredSequenceId = 0;
634 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
635 ActiveMQSession s = i.next();
636 s.dispose();
637 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
638 }
639 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
640 ActiveMQConnectionConsumer c = i.next();
641 c.dispose();
642 }
643 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
644 ActiveMQInputStream c = i.next();
645 c.dispose();
646 }
647 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
648 ActiveMQOutputStream c = i.next();
649 c.dispose();
650 }
651
652 // As TemporaryQueue and TemporaryTopic instances are bound
653 // to a connection we should just delete them after the connection
654 // is closed to free up memory
655 for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
656 ActiveMQTempDestination c = i.next();
657 c.delete();
658 }
659
660 if (isConnectionInfoSentToBroker) {
661 // If we announced ourselfs to the broker.. Try to let
662 // the broker
663 // know that the connection is being shutdown.
664 RemoveInfo removeCommand = info.createRemoveCommand();
665 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
666 doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
667 doAsyncSendPacket(new ShutdownInfo());
668 }
669
670 started.set(false);
671
672 // TODO if we move the TaskRunnerFactory to the connection
673 // factory
674 // then we may need to call
675 // factory.onConnectionClose(this);
676 if (sessionTaskRunner != null) {
677 sessionTaskRunner.shutdown();
678 }
679 closed.set(true);
680 closing.set(false);
681 }
682 }
683 } finally {
684 try {
685 if (executor != null) {
686 executor.shutdown();
687 }
688 } catch (Throwable e) {
689 LOG.error("Error shutting down thread pool " + e, e);
690 }
691
692 ServiceSupport.dispose(this.transport);
693
694 factoryStats.removeConnection(this);
695 if (interrupted) {
696 Thread.currentThread().interrupt();
697 }
698 }
699 }
700
701 /**
702 * Tells the broker to terminate its VM. This can be used to cleanly
703 * terminate a broker running in a standalone java process. Server must have
704 * property enable.vm.shutdown=true defined to allow this to work.
705 */
706 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
707 // implemented.
708 /*
709 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
710 * command = new BrokerAdminCommand();
711 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
712 * asyncSendPacket(command); }
713 */
714
715 /**
716 * Create a durable connection consumer for this connection (optional
717 * operation). This is an expert facility not used by regular JMS clients.
718 *
719 * @param topic topic to access
720 * @param subscriptionName durable subscription name
721 * @param messageSelector only messages with properties matching the message
722 * selector expression are delivered. A value of null or an
723 * empty string indicates that there is no message selector
724 * for the message consumer.
725 * @param sessionPool the server session pool to associate with this durable
726 * connection consumer
727 * @param maxMessages the maximum number of messages that can be assigned to
728 * a server session at one time
729 * @return the durable connection consumer
730 * @throws JMSException if the <CODE>Connection</CODE> object fails to
731 * create a connection consumer due to some internal error
732 * or invalid arguments for <CODE>sessionPool</CODE> and
733 * <CODE>messageSelector</CODE>.
734 * @throws javax.jms.InvalidDestinationException if an invalid destination
735 * is specified.
736 * @throws javax.jms.InvalidSelectorException if the message selector is
737 * invalid.
738 * @see javax.jms.ConnectionConsumer
739 * @since 1.1
740 */
741 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
742 throws JMSException {
743 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
744 }
745
746 /**
747 * Create a durable connection consumer for this connection (optional
748 * operation). This is an expert facility not used by regular JMS clients.
749 *
750 * @param topic topic to access
751 * @param subscriptionName durable subscription name
752 * @param messageSelector only messages with properties matching the message
753 * selector expression are delivered. A value of null or an
754 * empty string indicates that there is no message selector
755 * for the message consumer.
756 * @param sessionPool the server session pool to associate with this durable
757 * connection consumer
758 * @param maxMessages the maximum number of messages that can be assigned to
759 * a server session at one time
760 * @param noLocal set true if you want to filter out messages published
761 * locally
762 * @return the durable connection consumer
763 * @throws JMSException if the <CODE>Connection</CODE> object fails to
764 * create a connection consumer due to some internal error
765 * or invalid arguments for <CODE>sessionPool</CODE> and
766 * <CODE>messageSelector</CODE>.
767 * @throws javax.jms.InvalidDestinationException if an invalid destination
768 * is specified.
769 * @throws javax.jms.InvalidSelectorException if the message selector is
770 * invalid.
771 * @see javax.jms.ConnectionConsumer
772 * @since 1.1
773 */
774 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
775 boolean noLocal) throws JMSException {
776 checkClosedOrFailed();
777 ensureConnectionInfoSent();
778 SessionId sessionId = new SessionId(info.getConnectionId(), -1);
779 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
780 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
781 info.setSubscriptionName(subscriptionName);
782 info.setSelector(messageSelector);
783 info.setPrefetchSize(maxMessages);
784 info.setDispatchAsync(isDispatchAsync());
785
786 // Allows the options on the destination to configure the consumerInfo
787 if (info.getDestination().getOptions() != null) {
788 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
789 IntrospectionSupport.setProperties(this.info, options, "consumer.");
790 }
791
792 return new ActiveMQConnectionConsumer(this, sessionPool, info);
793 }
794
795 // Properties
796 // -------------------------------------------------------------------------
797
798 /**
799 * Returns true if this connection has been started
800 *
801 * @return true if this Connection is started
802 */
803 public boolean isStarted() {
804 return started.get();
805 }
806
807 /**
808 * Returns true if the connection is closed
809 */
810 public boolean isClosed() {
811 return closed.get();
812 }
813
814 /**
815 * Returns true if the connection is in the process of being closed
816 */
817 public boolean isClosing() {
818 return closing.get();
819 }
820
821 /**
822 * Returns true if the underlying transport has failed
823 */
824 public boolean isTransportFailed() {
825 return transportFailed.get();
826 }
827
828 /**
829 * @return Returns the prefetchPolicy.
830 */
831 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
832 return prefetchPolicy;
833 }
834
835 /**
836 * Sets the <a
837 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
838 * policy</a> for consumers created by this connection.
839 */
840 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
841 this.prefetchPolicy = prefetchPolicy;
842 }
843
844 /**
845 */
846 public Transport getTransportChannel() {
847 return transport;
848 }
849
850 /**
851 * @return Returns the clientID of the connection, forcing one to be
852 * generated if one has not yet been configured.
853 */
854 public String getInitializedClientID() throws JMSException {
855 ensureConnectionInfoSent();
856 return info.getClientId();
857 }
858
859 /**
860 * @return Returns the timeStampsDisableByDefault.
861 */
862 public boolean isDisableTimeStampsByDefault() {
863 return disableTimeStampsByDefault;
864 }
865
866 /**
867 * Sets whether or not timestamps on messages should be disabled or not. If
868 * you disable them it adds a small performance boost.
869 */
870 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
871 this.disableTimeStampsByDefault = timeStampsDisableByDefault;
872 }
873
874 /**
875 * @return Returns the dispatchOptimizedMessage.
876 */
877 public boolean isOptimizedMessageDispatch() {
878 return optimizedMessageDispatch;
879 }
880
881 /**
882 * If this flag is set then an larger prefetch limit is used - only
883 * applicable for durable topic subscribers.
884 */
885 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
886 this.optimizedMessageDispatch = dispatchOptimizedMessage;
887 }
888
889 /**
890 * @return Returns the closeTimeout.
891 */
892 public int getCloseTimeout() {
893 return closeTimeout;
894 }
895
896 /**
897 * Sets the timeout before a close is considered complete. Normally a
898 * close() on a connection waits for confirmation from the broker; this
899 * allows that operation to timeout to save the client hanging if there is
900 * no broker
901 */
902 public void setCloseTimeout(int closeTimeout) {
903 this.closeTimeout = closeTimeout;
904 }
905
906 /**
907 * @return ConnectionInfo
908 */
909 public ConnectionInfo getConnectionInfo() {
910 return this.info;
911 }
912
913 public boolean isUseRetroactiveConsumer() {
914 return useRetroactiveConsumer;
915 }
916
917 /**
918 * Sets whether or not retroactive consumers are enabled. Retroactive
919 * consumers allow non-durable topic subscribers to receive old messages
920 * that were published before the non-durable subscriber started.
921 */
922 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
923 this.useRetroactiveConsumer = useRetroactiveConsumer;
924 }
925
926 public boolean isNestedMapAndListEnabled() {
927 return nestedMapAndListEnabled;
928 }
929
930 /**
931 * Enables/disables whether or not Message properties and MapMessage entries
932 * support <a
933 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
934 * Structures</a> of Map and List objects
935 */
936 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
937 this.nestedMapAndListEnabled = structuredMapsEnabled;
938 }
939
940 public boolean isExclusiveConsumer() {
941 return exclusiveConsumer;
942 }
943
944 /**
945 * Enables or disables whether or not queue consumers should be exclusive or
946 * not for example to preserve ordering when not using <a
947 * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
948 *
949 * @param exclusiveConsumer
950 */
951 public void setExclusiveConsumer(boolean exclusiveConsumer) {
952 this.exclusiveConsumer = exclusiveConsumer;
953 }
954
955 /**
956 * Adds a transport listener so that a client can be notified of events in
957 * the underlying transport
958 */
959 public void addTransportListener(TransportListener transportListener) {
960 transportListeners.add(transportListener);
961 }
962
963 public void removeTransportListener(TransportListener transportListener) {
964 transportListeners.remove(transportListener);
965 }
966
967 public boolean isUseDedicatedTaskRunner() {
968 return useDedicatedTaskRunner;
969 }
970
971 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
972 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
973 }
974
975 public TaskRunnerFactory getSessionTaskRunner() {
976 synchronized (this) {
977 if (sessionTaskRunner == null) {
978 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
979 }
980 }
981 return sessionTaskRunner;
982 }
983
984 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
985 this.sessionTaskRunner = sessionTaskRunner;
986 }
987
988 public MessageTransformer getTransformer() {
989 return transformer;
990 }
991
992 /**
993 * Sets the transformer used to transform messages before they are sent on
994 * to the JMS bus or when they are received from the bus but before they are
995 * delivered to the JMS client
996 */
997 public void setTransformer(MessageTransformer transformer) {
998 this.transformer = transformer;
999 }
1000
1001 /**
1002 * @return the statsEnabled
1003 */
1004 public boolean isStatsEnabled() {
1005 return this.stats.isEnabled();
1006 }
1007
1008 /**
1009 * @param statsEnabled the statsEnabled to set
1010 */
1011 public void setStatsEnabled(boolean statsEnabled) {
1012 this.stats.setEnabled(statsEnabled);
1013 }
1014
1015 /**
1016 * Returns the {@link DestinationSource} object which can be used to listen to destinations
1017 * being created or destroyed or to enquire about the current destinations available on the broker
1018 *
1019 * @return a lazily created destination source
1020 * @throws JMSException
1021 */
1022 public DestinationSource getDestinationSource() throws JMSException {
1023 if (destinationSource == null) {
1024 destinationSource = new DestinationSource(this);
1025 destinationSource.start();
1026 }
1027 return destinationSource;
1028 }
1029
1030 // Implementation methods
1031 // -------------------------------------------------------------------------
1032
1033 /**
1034 * Used internally for adding Sessions to the Connection
1035 *
1036 * @param session
1037 * @throws JMSException
1038 * @throws JMSException
1039 */
1040 protected void addSession(ActiveMQSession session) throws JMSException {
1041 this.sessions.add(session);
1042 if (sessions.size() > 1 || session.isTransacted()) {
1043 optimizedMessageDispatch = false;
1044 }
1045 }
1046
1047 /**
1048 * Used interanlly for removing Sessions from a Connection
1049 *
1050 * @param session
1051 */
1052 protected void removeSession(ActiveMQSession session) {
1053 this.sessions.remove(session);
1054 this.removeDispatcher(session);
1055 }
1056
1057 /**
1058 * Add a ConnectionConsumer
1059 *
1060 * @param connectionConsumer
1061 * @throws JMSException
1062 */
1063 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1064 this.connectionConsumers.add(connectionConsumer);
1065 }
1066
1067 /**
1068 * Remove a ConnectionConsumer
1069 *
1070 * @param connectionConsumer
1071 */
1072 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1073 this.connectionConsumers.remove(connectionConsumer);
1074 this.removeDispatcher(connectionConsumer);
1075 }
1076
1077 /**
1078 * Creates a <CODE>TopicSession</CODE> object.
1079 *
1080 * @param transacted indicates whether the session is transacted
1081 * @param acknowledgeMode indicates whether the consumer or the client will
1082 * acknowledge any messages it receives; ignored if the
1083 * session is transacted. Legal values are
1084 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1085 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1086 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1087 * @return a newly created topic session
1088 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1089 * to create a session due to some internal error or lack of
1090 * support for the specific transaction and acknowledgement
1091 * mode.
1092 * @see Session#AUTO_ACKNOWLEDGE
1093 * @see Session#CLIENT_ACKNOWLEDGE
1094 * @see Session#DUPS_OK_ACKNOWLEDGE
1095 */
1096 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1097 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1098 }
1099
1100 /**
1101 * Creates a connection consumer for this connection (optional operation).
1102 * This is an expert facility not used by regular JMS clients.
1103 *
1104 * @param topic the topic to access
1105 * @param messageSelector only messages with properties matching the message
1106 * selector expression are delivered. A value of null or an
1107 * empty string indicates that there is no message selector
1108 * for the message consumer.
1109 * @param sessionPool the server session pool to associate with this
1110 * connection consumer
1111 * @param maxMessages the maximum number of messages that can be assigned to
1112 * a server session at one time
1113 * @return the connection consumer
1114 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1115 * to create a connection consumer due to some internal
1116 * error or invalid arguments for <CODE>sessionPool</CODE>
1117 * and <CODE>messageSelector</CODE>.
1118 * @throws javax.jms.InvalidDestinationException if an invalid topic is
1119 * specified.
1120 * @throws javax.jms.InvalidSelectorException if the message selector is
1121 * invalid.
1122 * @see javax.jms.ConnectionConsumer
1123 */
1124 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1125 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1126 }
1127
1128 /**
1129 * Creates a connection consumer for this connection (optional operation).
1130 * This is an expert facility not used by regular JMS clients.
1131 *
1132 * @param queue the queue to access
1133 * @param messageSelector only messages with properties matching the message
1134 * selector expression are delivered. A value of null or an
1135 * empty string indicates that there is no message selector
1136 * for the message consumer.
1137 * @param sessionPool the server session pool to associate with this
1138 * connection consumer
1139 * @param maxMessages the maximum number of messages that can be assigned to
1140 * a server session at one time
1141 * @return the connection consumer
1142 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1143 * to create a connection consumer due to some internal
1144 * error or invalid arguments for <CODE>sessionPool</CODE>
1145 * and <CODE>messageSelector</CODE>.
1146 * @throws javax.jms.InvalidDestinationException if an invalid queue is
1147 * specified.
1148 * @throws javax.jms.InvalidSelectorException if the message selector is
1149 * invalid.
1150 * @see javax.jms.ConnectionConsumer
1151 */
1152 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1153 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1154 }
1155
1156 /**
1157 * Creates a connection consumer for this connection (optional operation).
1158 * This is an expert facility not used by regular JMS clients.
1159 *
1160 * @param destination the destination to access
1161 * @param messageSelector only messages with properties matching the message
1162 * selector expression are delivered. A value of null or an
1163 * empty string indicates that there is no message selector
1164 * for the message consumer.
1165 * @param sessionPool the server session pool to associate with this
1166 * connection consumer
1167 * @param maxMessages the maximum number of messages that can be assigned to
1168 * a server session at one time
1169 * @return the connection consumer
1170 * @throws JMSException if the <CODE>Connection</CODE> object fails to
1171 * create a connection consumer due to some internal error
1172 * or invalid arguments for <CODE>sessionPool</CODE> and
1173 * <CODE>messageSelector</CODE>.
1174 * @throws javax.jms.InvalidDestinationException if an invalid destination
1175 * is specified.
1176 * @throws javax.jms.InvalidSelectorException if the message selector is
1177 * invalid.
1178 * @see javax.jms.ConnectionConsumer
1179 * @since 1.1
1180 */
1181 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1182 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1183 }
1184
1185 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1186 throws JMSException {
1187
1188 checkClosedOrFailed();
1189 ensureConnectionInfoSent();
1190
1191 ConsumerId consumerId = createConsumerId();
1192 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1193 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1194 consumerInfo.setSelector(messageSelector);
1195 consumerInfo.setPrefetchSize(maxMessages);
1196 consumerInfo.setNoLocal(noLocal);
1197 consumerInfo.setDispatchAsync(isDispatchAsync());
1198
1199 // Allows the options on the destination to configure the consumerInfo
1200 if (consumerInfo.getDestination().getOptions() != null) {
1201 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1202 IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1203 }
1204
1205 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1206 }
1207
1208 /**
1209 * @return
1210 */
1211 private ConsumerId createConsumerId() {
1212 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1213 }
1214
1215 /**
1216 * @return
1217 */
1218 private ProducerId createProducerId() {
1219 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1220 }
1221
1222 /**
1223 * Creates a <CODE>QueueSession</CODE> object.
1224 *
1225 * @param transacted indicates whether the session is transacted
1226 * @param acknowledgeMode indicates whether the consumer or the client will
1227 * acknowledge any messages it receives; ignored if the
1228 * session is transacted. Legal values are
1229 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1230 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1231 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1232 * @return a newly created queue session
1233 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1234 * to create a session due to some internal error or lack of
1235 * support for the specific transaction and acknowledgement
1236 * mode.
1237 * @see Session#AUTO_ACKNOWLEDGE
1238 * @see Session#CLIENT_ACKNOWLEDGE
1239 * @see Session#DUPS_OK_ACKNOWLEDGE
1240 */
1241 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1242 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1243 }
1244
1245 /**
1246 * Ensures that the clientID was manually specified and not auto-generated.
1247 * If the clientID was not specified this method will throw an exception.
1248 * This method is used to ensure that the clientID + durableSubscriber name
1249 * are used correctly.
1250 *
1251 * @throws JMSException
1252 */
1253 public void checkClientIDWasManuallySpecified() throws JMSException {
1254 if (!userSpecifiedClientID) {
1255 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1256 }
1257 }
1258
1259 /**
1260 * send a Packet through the Connection - for internal use only
1261 *
1262 * @param command
1263 * @throws JMSException
1264 */
1265 public void asyncSendPacket(Command command) throws JMSException {
1266 if (isClosed()) {
1267 throw new ConnectionClosedException();
1268 } else {
1269 doAsyncSendPacket(command);
1270 }
1271 }
1272
1273 private void doAsyncSendPacket(Command command) throws JMSException {
1274 try {
1275 this.transport.oneway(command);
1276 } catch (IOException e) {
1277 throw JMSExceptionSupport.create(e);
1278 }
1279 }
1280
1281 /**
1282 * Send a packet through a Connection - for internal use only
1283 *
1284 * @param command
1285 * @return
1286 * @throws JMSException
1287 */
1288 public void syncSendPacket(Command command, final AsyncCallback onComplete) throws JMSException {
1289 if(onComplete==null) {
1290 syncSendPacket(command);
1291 } else {
1292 if (isClosed()) {
1293 throw new ConnectionClosedException();
1294 }
1295 try {
1296 this.transport.asyncRequest(command, new ResponseCallback() {
1297 @Override
1298 public void onCompletion(FutureResponse resp) {
1299 Response response;
1300 Throwable exception = null;
1301 try {
1302 response = resp.getResult();
1303 if (response.isException()) {
1304 ExceptionResponse er = (ExceptionResponse)response;
1305 exception = er.getException();
1306 }
1307 } catch (Exception e) {
1308 exception = e;
1309 }
1310 if(exception!=null) {
1311 if ( exception instanceof JMSException) {
1312 onComplete.onException((JMSException) exception);
1313 } else {
1314 if (isClosed()||closing.get()) {
1315 LOG.debug("Received an exception but connection is closing");
1316 }
1317 JMSException jmsEx = null;
1318 try {
1319 jmsEx = JMSExceptionSupport.create(exception);
1320 } catch(Throwable e) {
1321 LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1322 }
1323 //dispose of transport for security exceptions
1324 if (exception instanceof SecurityException){
1325 Transport t = transport;
1326 if (null != t){
1327 ServiceSupport.dispose(t);
1328 }
1329 }
1330 if (jmsEx !=null) {
1331 onComplete.onException(jmsEx);
1332 }
1333 }
1334 } else {
1335 onComplete.onSuccess();
1336 }
1337 }
1338 });
1339 } catch (IOException e) {
1340 throw JMSExceptionSupport.create(e);
1341 }
1342 }
1343 }
1344
1345 public Response syncSendPacket(Command command) throws JMSException {
1346 if (isClosed()) {
1347 throw new ConnectionClosedException();
1348 } else {
1349
1350 try {
1351 Response response = (Response)this.transport.request(command);
1352 if (response.isException()) {
1353 ExceptionResponse er = (ExceptionResponse)response;
1354 if (er.getException() instanceof JMSException) {
1355 throw (JMSException)er.getException();
1356 } else {
1357 if (isClosed()||closing.get()) {
1358 LOG.debug("Received an exception but connection is closing");
1359 }
1360 JMSException jmsEx = null;
1361 try {
1362 jmsEx = JMSExceptionSupport.create(er.getException());
1363 } catch(Throwable e) {
1364 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1365 }
1366 //dispose of transport for security exceptions
1367 if (er.getException() instanceof SecurityException){
1368 Transport t = this.transport;
1369 if (null != t){
1370 ServiceSupport.dispose(t);
1371 }
1372 }
1373 if (jmsEx !=null) {
1374 throw jmsEx;
1375 }
1376 }
1377 }
1378 return response;
1379 } catch (IOException e) {
1380 throw JMSExceptionSupport.create(e);
1381 }
1382 }
1383 }
1384
1385 /**
1386 * Send a packet through a Connection - for internal use only
1387 *
1388 * @param command
1389 * @return
1390 * @throws JMSException
1391 */
1392 public Response syncSendPacket(Command command, int timeout) throws JMSException {
1393 if (isClosed() || closing.get()) {
1394 throw new ConnectionClosedException();
1395 } else {
1396 return doSyncSendPacket(command, timeout);
1397 }
1398 }
1399
1400 private Response doSyncSendPacket(Command command, int timeout)
1401 throws JMSException {
1402 try {
1403 Response response = (Response) (timeout > 0
1404 ? this.transport.request(command, timeout)
1405 : this.transport.request(command));
1406 if (response != null && response.isException()) {
1407 ExceptionResponse er = (ExceptionResponse)response;
1408 if (er.getException() instanceof JMSException) {
1409 throw (JMSException)er.getException();
1410 } else {
1411 throw JMSExceptionSupport.create(er.getException());
1412 }
1413 }
1414 return response;
1415 } catch (IOException e) {
1416 throw JMSExceptionSupport.create(e);
1417 }
1418 }
1419
1420 /**
1421 * @return statistics for this Connection
1422 */
1423 public StatsImpl getStats() {
1424 return stats;
1425 }
1426
1427 /**
1428 * simply throws an exception if the Connection is already closed or the
1429 * Transport has failed
1430 *
1431 * @throws JMSException
1432 */
1433 protected synchronized void checkClosedOrFailed() throws JMSException {
1434 checkClosed();
1435 if (transportFailed.get()) {
1436 throw new ConnectionFailedException(firstFailureError);
1437 }
1438 }
1439
1440 /**
1441 * simply throws an exception if the Connection is already closed
1442 *
1443 * @throws JMSException
1444 */
1445 protected synchronized void checkClosed() throws JMSException {
1446 if (closed.get()) {
1447 throw new ConnectionClosedException();
1448 }
1449 }
1450
1451 /**
1452 * Send the ConnectionInfo to the Broker
1453 *
1454 * @throws JMSException
1455 */
1456 protected void ensureConnectionInfoSent() throws JMSException {
1457 synchronized(this.ensureConnectionInfoSentMutex) {
1458 // Can we skip sending the ConnectionInfo packet??
1459 if (isConnectionInfoSentToBroker || closed.get()) {
1460 return;
1461 }
1462 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1463 if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1464 info.setClientId(clientIdGenerator.generateId());
1465 }
1466 syncSendPacket(info.copy());
1467
1468 this.isConnectionInfoSentToBroker = true;
1469 // Add a temp destination advisory consumer so that
1470 // We know what the valid temporary destinations are on the
1471 // broker without having to do an RPC to the broker.
1472
1473 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1474 if (watchTopicAdvisories) {
1475 advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1476 }
1477 }
1478 }
1479
1480 public synchronized boolean isWatchTopicAdvisories() {
1481 return watchTopicAdvisories;
1482 }
1483
1484 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1485 this.watchTopicAdvisories = watchTopicAdvisories;
1486 }
1487
1488 /**
1489 * @return Returns the useAsyncSend.
1490 */
1491 public boolean isUseAsyncSend() {
1492 return useAsyncSend;
1493 }
1494
1495 /**
1496 * Forces the use of <a
1497 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1498 * adds a massive performance boost; but means that the send() method will
1499 * return immediately whether the message has been sent or not which could
1500 * lead to message loss.
1501 */
1502 public void setUseAsyncSend(boolean useAsyncSend) {
1503 this.useAsyncSend = useAsyncSend;
1504 }
1505
1506 /**
1507 * @return true if always sync send messages
1508 */
1509 public boolean isAlwaysSyncSend() {
1510 return this.alwaysSyncSend;
1511 }
1512
1513 /**
1514 * Set true if always require messages to be sync sent
1515 *
1516 * @param alwaysSyncSend
1517 */
1518 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1519 this.alwaysSyncSend = alwaysSyncSend;
1520 }
1521
1522 /**
1523 * @return the messagePrioritySupported
1524 */
1525 public boolean isMessagePrioritySupported() {
1526 return this.messagePrioritySupported;
1527 }
1528
1529 /**
1530 * @param messagePrioritySupported the messagePrioritySupported to set
1531 */
1532 public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1533 this.messagePrioritySupported = messagePrioritySupported;
1534 }
1535
1536 /**
1537 * Cleans up this connection so that it's state is as if the connection was
1538 * just created. This allows the Resource Adapter to clean up a connection
1539 * so that it can be reused without having to close and recreate the
1540 * connection.
1541 */
1542 public void cleanup() throws JMSException {
1543
1544 if (advisoryConsumer != null && !isTransportFailed()) {
1545 advisoryConsumer.dispose();
1546 advisoryConsumer = null;
1547 }
1548
1549 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1550 ActiveMQSession s = i.next();
1551 s.dispose();
1552 }
1553 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1554 ActiveMQConnectionConsumer c = i.next();
1555 c.dispose();
1556 }
1557 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1558 ActiveMQInputStream c = i.next();
1559 c.dispose();
1560 }
1561 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1562 ActiveMQOutputStream c = i.next();
1563 c.dispose();
1564 }
1565
1566 if (isConnectionInfoSentToBroker) {
1567 if (!transportFailed.get() && !closing.get()) {
1568 syncSendPacket(info.createRemoveCommand());
1569 }
1570 isConnectionInfoSentToBroker = false;
1571 }
1572 if (userSpecifiedClientID) {
1573 info.setClientId(null);
1574 userSpecifiedClientID = false;
1575 }
1576 clientIDSet = false;
1577
1578 started.set(false);
1579 }
1580
1581 public void finalize() throws Throwable{
1582 Scheduler s = this.scheduler;
1583 if (s != null){
1584 s.stop();
1585 }
1586 }
1587
1588 /**
1589 * Changes the associated username/password that is associated with this
1590 * connection. If the connection has been used, you must called cleanup()
1591 * before calling this method.
1592 *
1593 * @throws IllegalStateException if the connection is in used.
1594 */
1595 public void changeUserInfo(String userName, String password) throws JMSException {
1596 if (isConnectionInfoSentToBroker) {
1597 throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1598 }
1599 this.info.setUserName(userName);
1600 this.info.setPassword(password);
1601 }
1602
1603 /**
1604 * @return Returns the resourceManagerId.
1605 * @throws JMSException
1606 */
1607 public String getResourceManagerId() throws JMSException {
1608 waitForBrokerInfo();
1609 if (brokerInfo == null) {
1610 throw new JMSException("Connection failed before Broker info was received.");
1611 }
1612 return brokerInfo.getBrokerId().getValue();
1613 }
1614
1615 /**
1616 * Returns the broker name if one is available or null if one is not
1617 * available yet.
1618 */
1619 public String getBrokerName() {
1620 try {
1621 brokerInfoReceived.await(5, TimeUnit.SECONDS);
1622 if (brokerInfo == null) {
1623 return null;
1624 }
1625 return brokerInfo.getBrokerName();
1626 } catch (InterruptedException e) {
1627 Thread.currentThread().interrupt();
1628 return null;
1629 }
1630 }
1631
1632 /**
1633 * Returns the broker information if it is available or null if it is not
1634 * available yet.
1635 */
1636 public BrokerInfo getBrokerInfo() {
1637 return brokerInfo;
1638 }
1639
1640 /**
1641 * @return Returns the RedeliveryPolicy.
1642 * @throws JMSException
1643 */
1644 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1645 return redeliveryPolicy;
1646 }
1647
1648 /**
1649 * Sets the redelivery policy to be used when messages are rolled back
1650 */
1651 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1652 this.redeliveryPolicy = redeliveryPolicy;
1653 }
1654
1655 public BlobTransferPolicy getBlobTransferPolicy() {
1656 if (blobTransferPolicy == null) {
1657 blobTransferPolicy = createBlobTransferPolicy();
1658 }
1659 return blobTransferPolicy;
1660 }
1661
1662 /**
1663 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1664 * OBjects) are transferred from producers to brokers to consumers
1665 */
1666 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1667 this.blobTransferPolicy = blobTransferPolicy;
1668 }
1669
1670 /**
1671 * @return Returns the alwaysSessionAsync.
1672 */
1673 public boolean isAlwaysSessionAsync() {
1674 return alwaysSessionAsync;
1675 }
1676
1677 /**
1678 * If this flag is set then a separate thread is not used for dispatching
1679 * messages for each Session in the Connection. However, a separate thread
1680 * is always used if there is more than one session, or the session isn't in
1681 * auto acknowledge or duplicates ok mode
1682 */
1683 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1684 this.alwaysSessionAsync = alwaysSessionAsync;
1685 }
1686
1687 /**
1688 * @return Returns the optimizeAcknowledge.
1689 */
1690 public boolean isOptimizeAcknowledge() {
1691 return optimizeAcknowledge;
1692 }
1693
1694 /**
1695 * Enables an optimised acknowledgement mode where messages are acknowledged
1696 * in batches rather than individually
1697 *
1698 * @param optimizeAcknowledge The optimizeAcknowledge to set.
1699 */
1700 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1701 this.optimizeAcknowledge = optimizeAcknowledge;
1702 }
1703
1704 /**
1705 * The max time in milliseconds between optimized ack batches
1706 * @param optimizeAcknowledgeTimeOut
1707 */
1708 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1709 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
1710 }
1711
1712 public long getOptimizeAcknowledgeTimeOut() {
1713 return optimizeAcknowledgeTimeOut;
1714 }
1715
1716 public long getWarnAboutUnstartedConnectionTimeout() {
1717 return warnAboutUnstartedConnectionTimeout;
1718 }
1719
1720 /**
1721 * Enables the timeout from a connection creation to when a warning is
1722 * generated if the connection is not properly started via {@link #start()}
1723 * and a message is received by a consumer. It is a very common gotcha to
1724 * forget to <a
1725 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1726 * the connection</a> so this option makes the default case to create a
1727 * warning if the user forgets. To disable the warning just set the value to <
1728 * 0 (say -1).
1729 */
1730 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1731 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1732 }
1733
1734 /**
1735 * @return the sendTimeout
1736 */
1737 public int getSendTimeout() {
1738 return sendTimeout;
1739 }
1740
1741 /**
1742 * @param sendTimeout the sendTimeout to set
1743 */
1744 public void setSendTimeout(int sendTimeout) {
1745 this.sendTimeout = sendTimeout;
1746 }
1747
1748 /**
1749 * @return the sendAcksAsync
1750 */
1751 public boolean isSendAcksAsync() {
1752 return sendAcksAsync;
1753 }
1754
1755 /**
1756 * @param sendAcksAsync the sendAcksAsync to set
1757 */
1758 public void setSendAcksAsync(boolean sendAcksAsync) {
1759 this.sendAcksAsync = sendAcksAsync;
1760 }
1761
1762
1763 /**
1764 * Returns the time this connection was created
1765 */
1766 public long getTimeCreated() {
1767 return timeCreated;
1768 }
1769
1770 private void waitForBrokerInfo() throws JMSException {
1771 try {
1772 brokerInfoReceived.await();
1773 } catch (InterruptedException e) {
1774 Thread.currentThread().interrupt();
1775 throw JMSExceptionSupport.create(e);
1776 }
1777 }
1778
1779 // Package protected so that it can be used in unit tests
1780 public Transport getTransport() {
1781 return transport;
1782 }
1783
1784 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1785 producers.put(producerId, producer);
1786 }
1787
1788 public void removeProducer(ProducerId producerId) {
1789 producers.remove(producerId);
1790 }
1791
1792 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1793 dispatchers.put(consumerId, dispatcher);
1794 }
1795
1796 public void removeDispatcher(ConsumerId consumerId) {
1797 dispatchers.remove(consumerId);
1798 }
1799
1800 /**
1801 * @param o - the command to consume
1802 */
1803 public void onCommand(final Object o) {
1804 final Command command = (Command)o;
1805 if (!closed.get() && command != null) {
1806 try {
1807 command.visit(new CommandVisitorAdapter() {
1808 @Override
1809 public Response processMessageDispatch(MessageDispatch md) throws Exception {
1810 waitForTransportInterruptionProcessingToComplete();
1811 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1812 if (dispatcher != null) {
1813 // Copy in case a embedded broker is dispatching via
1814 // vm://
1815 // md.getMessage() == null to signal end of queue
1816 // browse.
1817 Message msg = md.getMessage();
1818 if (msg != null) {
1819 msg = msg.copy();
1820 msg.setReadOnlyBody(true);
1821 msg.setReadOnlyProperties(true);
1822 msg.setRedeliveryCounter(md.getRedeliveryCounter());
1823 msg.setConnection(ActiveMQConnection.this);
1824 md.setMessage(msg);
1825 }
1826 dispatcher.dispatch(md);
1827 }
1828 return null;
1829 }
1830
1831 @Override
1832 public Response processProducerAck(ProducerAck pa) throws Exception {
1833 if (pa != null && pa.getProducerId() != null) {
1834 ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1835 if (producer != null) {
1836 producer.onProducerAck(pa);
1837 }
1838 }
1839 return null;
1840 }
1841
1842 @Override
1843 public Response processBrokerInfo(BrokerInfo info) throws Exception {
1844 brokerInfo = info;
1845 brokerInfoReceived.countDown();
1846 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1847 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1848 return null;
1849 }
1850
1851 @Override
1852 public Response processConnectionError(final ConnectionError error) throws Exception {
1853 executor.execute(new Runnable() {
1854 public void run() {
1855 onAsyncException(error.getException());
1856 }
1857 });
1858 return null;
1859 }
1860
1861 @Override
1862 public Response processControlCommand(ControlCommand command) throws Exception {
1863 onControlCommand(command);
1864 return null;
1865 }
1866
1867 @Override
1868 public Response processConnectionControl(ConnectionControl control) throws Exception {
1869 onConnectionControl((ConnectionControl)command);
1870 return null;
1871 }
1872
1873 @Override
1874 public Response processConsumerControl(ConsumerControl control) throws Exception {
1875 onConsumerControl((ConsumerControl)command);
1876 return null;
1877 }
1878
1879 @Override
1880 public Response processWireFormat(WireFormatInfo info) throws Exception {
1881 onWireFormatInfo((WireFormatInfo)command);
1882 return null;
1883 }
1884 });
1885 } catch (Exception e) {
1886 onClientInternalException(e);
1887 }
1888
1889 }
1890 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1891 TransportListener listener = iter.next();
1892 listener.onCommand(command);
1893 }
1894 }
1895
1896 protected void onWireFormatInfo(WireFormatInfo info) {
1897 protocolVersion.set(info.getVersion());
1898 }
1899
1900 /**
1901 * Handles async client internal exceptions.
1902 * A client internal exception is usually one that has been thrown
1903 * by a container runtime component during asynchronous processing of a
1904 * message that does not affect the connection itself.
1905 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1906 * its <code>onException</code> method, if one has been registered with this connection.
1907 *
1908 * @param error the exception that the problem
1909 */
1910 public void onClientInternalException(final Throwable error) {
1911 if ( !closed.get() && !closing.get() ) {
1912 if ( this.clientInternalExceptionListener != null ) {
1913 executor.execute(new Runnable() {
1914 public void run() {
1915 ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1916 }
1917 });
1918 } else {
1919 LOG.debug("Async client internal exception occurred with no exception listener registered: "
1920 + error, error);
1921 }
1922 }
1923 }
1924 /**
1925 * Used for handling async exceptions
1926 *
1927 * @param error
1928 */
1929 public void onAsyncException(Throwable error) {
1930 if (!closed.get() && !closing.get()) {
1931 if (this.exceptionListener != null) {
1932
1933 if (!(error instanceof JMSException)) {
1934 error = JMSExceptionSupport.create(error);
1935 }
1936 final JMSException e = (JMSException)error;
1937
1938 executor.execute(new Runnable() {
1939 public void run() {
1940 ActiveMQConnection.this.exceptionListener.onException(e);
1941 }
1942 });
1943
1944 } else {
1945 LOG.debug("Async exception with no exception listener: " + error, error);
1946 }
1947 }
1948 }
1949
1950 public void onException(final IOException error) {
1951 onAsyncException(error);
1952 if (!closing.get() && !closed.get()) {
1953 executor.execute(new Runnable() {
1954 public void run() {
1955 transportFailed(error);
1956 ServiceSupport.dispose(ActiveMQConnection.this.transport);
1957 brokerInfoReceived.countDown();
1958 try {
1959 cleanup();
1960 } catch (JMSException e) {
1961 LOG.warn("Exception during connection cleanup, " + e, e);
1962 }
1963 for (Iterator<TransportListener> iter = transportListeners
1964 .iterator(); iter.hasNext();) {
1965 TransportListener listener = iter.next();
1966 listener.onException(error);
1967 }
1968 }
1969 });
1970 }
1971 }
1972
1973 public void transportInterupted() {
1974 this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1975 if (LOG.isDebugEnabled()) {
1976 LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1977 }
1978 signalInterruptionProcessingNeeded();
1979
1980 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1981 ActiveMQSession s = i.next();
1982 s.clearMessagesInProgress();
1983 }
1984
1985 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1986 connectionConsumer.clearMessagesInProgress();
1987 }
1988
1989 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1990 TransportListener listener = iter.next();
1991 listener.transportInterupted();
1992 }
1993 }
1994
1995 public void transportResumed() {
1996 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1997 TransportListener listener = iter.next();
1998 listener.transportResumed();
1999 }
2000 }
2001
2002 /**
2003 * Create the DestinationInfo object for the temporary destination.
2004 *
2005 * @param topic - if its true topic, else queue.
2006 * @return DestinationInfo
2007 * @throws JMSException
2008 */
2009 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2010
2011 // Check if Destination info is of temporary type.
2012 ActiveMQTempDestination dest;
2013 if (topic) {
2014 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2015 } else {
2016 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2017 }
2018
2019 DestinationInfo info = new DestinationInfo();
2020 info.setConnectionId(this.info.getConnectionId());
2021 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2022 info.setDestination(dest);
2023 syncSendPacket(info);
2024
2025 dest.setConnection(this);
2026 activeTempDestinations.put(dest, dest);
2027 return dest;
2028 }
2029
2030 /**
2031 * @param destination
2032 * @throws JMSException
2033 */
2034 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2035
2036 checkClosedOrFailed();
2037
2038 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2039 ActiveMQSession s = i.next();
2040 if (s.isInUse(destination)) {
2041 throw new JMSException("A consumer is consuming from the temporary destination");
2042 }
2043 }
2044
2045 activeTempDestinations.remove(destination);
2046
2047 DestinationInfo destInfo = new DestinationInfo();
2048 destInfo.setConnectionId(this.info.getConnectionId());
2049 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2050 destInfo.setDestination(destination);
2051 destInfo.setTimeout(0);
2052 syncSendPacket(destInfo);
2053 }
2054
2055 public boolean isDeleted(ActiveMQDestination dest) {
2056
2057 // If we are not watching the advisories.. then
2058 // we will assume that the temp destination does exist.
2059 if (advisoryConsumer == null) {
2060 return false;
2061 }
2062
2063 return !activeTempDestinations.contains(dest);
2064 }
2065
2066 public boolean isCopyMessageOnSend() {
2067 return copyMessageOnSend;
2068 }
2069
2070 public LongSequenceGenerator getLocalTransactionIdGenerator() {
2071 return localTransactionIdGenerator;
2072 }
2073
2074 public boolean isUseCompression() {
2075 return useCompression;
2076 }
2077
2078 /**
2079 * Enables the use of compression of the message bodies
2080 */
2081 public void setUseCompression(boolean useCompression) {
2082 this.useCompression = useCompression;
2083 }
2084
2085 public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2086
2087 checkClosedOrFailed();
2088 ensureConnectionInfoSent();
2089
2090 DestinationInfo info = new DestinationInfo();
2091 info.setConnectionId(this.info.getConnectionId());
2092 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2093 info.setDestination(destination);
2094 info.setTimeout(0);
2095 syncSendPacket(info);
2096
2097 }
2098
2099 public boolean isDispatchAsync() {
2100 return dispatchAsync;
2101 }
2102
2103 /**
2104 * Enables or disables the default setting of whether or not consumers have
2105 * their messages <a
2106 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2107 * synchronously or asynchronously by the broker</a>. For non-durable
2108 * topics for example we typically dispatch synchronously by default to
2109 * minimize context switches which boost performance. However sometimes its
2110 * better to go slower to ensure that a single blocked consumer socket does
2111 * not block delivery to other consumers.
2112 *
2113 * @param asyncDispatch If true then consumers created on this connection
2114 * will default to having their messages dispatched
2115 * asynchronously. The default value is true.
2116 */
2117 public void setDispatchAsync(boolean asyncDispatch) {
2118 this.dispatchAsync = asyncDispatch;
2119 }
2120
2121 public boolean isObjectMessageSerializationDefered() {
2122 return objectMessageSerializationDefered;
2123 }
2124
2125 /**
2126 * When an object is set on an ObjectMessage, the JMS spec requires the
2127 * object to be serialized by that set method. Enabling this flag causes the
2128 * object to not get serialized. The object may subsequently get serialized
2129 * if the message needs to be sent over a socket or stored to disk.
2130 */
2131 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2132 this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2133 }
2134
2135 public InputStream createInputStream(Destination dest) throws JMSException {
2136 return createInputStream(dest, null);
2137 }
2138
2139 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2140 return createInputStream(dest, messageSelector, false);
2141 }
2142
2143 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2144 return createInputStream(dest, messageSelector, noLocal, -1);
2145 }
2146
2147
2148
2149 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2150 return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2151 }
2152
2153 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2154 return createInputStream(dest, null, false);
2155 }
2156
2157 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2158 return createDurableInputStream(dest, name, messageSelector, false);
2159 }
2160
2161 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2162 return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2163 }
2164
2165 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2166 return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2167 }
2168
2169 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2170 checkClosedOrFailed();
2171 ensureConnectionInfoSent();
2172 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2173 }
2174
2175 /**
2176 * Creates a persistent output stream; individual messages will be written
2177 * to disk/database by the broker
2178 */
2179 public OutputStream createOutputStream(Destination dest) throws JMSException {
2180 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2181 }
2182
2183 /**
2184 * Creates a non persistent output stream; messages will not be written to
2185 * disk
2186 */
2187 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2188 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2189 }
2190
2191 /**
2192 * Creates an output stream allowing full control over the delivery mode,
2193 * the priority and time to live of the messages and the properties added to
2194 * messages on the stream.
2195 *
2196 * @param streamProperties defines a map of key-value pairs where the keys
2197 * are strings and the values are primitive values (numbers
2198 * and strings) which are appended to the messages similarly
2199 * to using the
2200 * {@link javax.jms.Message#setObjectProperty(String, Object)}
2201 * method
2202 */
2203 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2204 checkClosedOrFailed();
2205 ensureConnectionInfoSent();
2206 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2207 }
2208
2209 /**
2210 * Unsubscribes a durable subscription that has been created by a client.
2211 * <P>
2212 * This method deletes the state being maintained on behalf of the
2213 * subscriber by its provider.
2214 * <P>
2215 * It is erroneous for a client to delete a durable subscription while there
2216 * is an active <CODE>MessageConsumer </CODE> or
2217 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2218 * message is part of a pending transaction or has not been acknowledged in
2219 * the session.
2220 *
2221 * @param name the name used to identify this subscription
2222 * @throws JMSException if the session fails to unsubscribe to the durable
2223 * subscription due to some internal error.
2224 * @throws InvalidDestinationException if an invalid subscription name is
2225 * specified.
2226 * @since 1.1
2227 */
2228 public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2229 checkClosedOrFailed();
2230 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2231 rsi.setConnectionId(getConnectionInfo().getConnectionId());
2232 rsi.setSubscriptionName(name);
2233 rsi.setClientId(getConnectionInfo().getClientId());
2234 syncSendPacket(rsi);
2235 }
2236
2237 /**
2238 * Internal send method optimized: - It does not copy the message - It can
2239 * only handle ActiveMQ messages. - You can specify if the send is async or
2240 * sync - Does not allow you to send /w a transaction.
2241 */
2242 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2243 checkClosedOrFailed();
2244
2245 if (destination.isTemporary() && isDeleted(destination)) {
2246 throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2247 }
2248
2249 msg.setJMSDestination(destination);
2250 msg.setJMSDeliveryMode(deliveryMode);
2251 long expiration = 0L;
2252
2253 if (!isDisableTimeStampsByDefault()) {
2254 long timeStamp = System.currentTimeMillis();
2255 msg.setJMSTimestamp(timeStamp);
2256 if (timeToLive > 0) {
2257 expiration = timeToLive + timeStamp;
2258 }
2259 }
2260
2261 msg.setJMSExpiration(expiration);
2262 msg.setJMSPriority(priority);
2263
2264 msg.setJMSRedelivered(false);
2265 msg.setMessageId(messageId);
2266
2267 msg.onSend();
2268
2269 msg.setProducerId(msg.getMessageId().getProducerId());
2270
2271 if (LOG.isDebugEnabled()) {
2272 LOG.debug("Sending message: " + msg);
2273 }
2274
2275 if (async) {
2276 asyncSendPacket(msg);
2277 } else {
2278 syncSendPacket(msg);
2279 }
2280
2281 }
2282
2283 public void addOutputStream(ActiveMQOutputStream stream) {
2284 outputStreams.add(stream);
2285 }
2286
2287 public void removeOutputStream(ActiveMQOutputStream stream) {
2288 outputStreams.remove(stream);
2289 }
2290
2291 public void addInputStream(ActiveMQInputStream stream) {
2292 inputStreams.add(stream);
2293 }
2294
2295 public void removeInputStream(ActiveMQInputStream stream) {
2296 inputStreams.remove(stream);
2297 }
2298
2299 protected void onControlCommand(ControlCommand command) {
2300 String text = command.getCommand();
2301 if (text != null) {
2302 if ("shutdown".equals(text)) {
2303 LOG.info("JVM told to shutdown");
2304 System.exit(0);
2305 }
2306 if (false && "close".equals(text)){
2307 LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2308 try {
2309 close();
2310 } catch (JMSException e) {
2311 }
2312 }
2313 }
2314 }
2315
2316 protected void onConnectionControl(ConnectionControl command) {
2317 if (command.isFaultTolerant()) {
2318 this.optimizeAcknowledge = false;
2319 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2320 ActiveMQSession s = i.next();
2321 s.setOptimizeAcknowledge(false);
2322 }
2323 }
2324 }
2325
2326 protected void onConsumerControl(ConsumerControl command) {
2327 if (command.isClose()) {
2328 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2329 ActiveMQSession s = i.next();
2330 s.close(command.getConsumerId());
2331 }
2332 } else {
2333 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2334 ActiveMQSession s = i.next();
2335 s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2336 }
2337 }
2338 }
2339
2340 protected void transportFailed(IOException error) {
2341 transportFailed.set(true);
2342 if (firstFailureError == null) {
2343 firstFailureError = error;
2344 }
2345 }
2346
2347 /**
2348 * Should a JMS message be copied to a new JMS Message object as part of the
2349 * send() method in JMS. This is enabled by default to be compliant with the
2350 * JMS specification. You can disable it if you do not mutate JMS messages
2351 * after they are sent for a performance boost
2352 */
2353 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2354 this.copyMessageOnSend = copyMessageOnSend;
2355 }
2356
2357 @Override
2358 public String toString() {
2359 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2360 }
2361
2362 protected BlobTransferPolicy createBlobTransferPolicy() {
2363 return new BlobTransferPolicy();
2364 }
2365
2366 public int getProtocolVersion() {
2367 return protocolVersion.get();
2368 }
2369
2370 public int getProducerWindowSize() {
2371 return producerWindowSize;
2372 }
2373
2374 public void setProducerWindowSize(int producerWindowSize) {
2375 this.producerWindowSize = producerWindowSize;
2376 }
2377
2378 public void setAuditDepth(int auditDepth) {
2379 connectionAudit.setAuditDepth(auditDepth);
2380 }
2381
2382 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2383 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2384 }
2385
2386 protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2387 connectionAudit.removeDispatcher(dispatcher);
2388 }
2389
2390 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2391 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2392 }
2393
2394 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2395 connectionAudit.rollbackDuplicate(dispatcher, message);
2396 }
2397
2398 public IOException getFirstFailureError() {
2399 return firstFailureError;
2400 }
2401
2402 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2403 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2404 if (cdl != null) {
2405 if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2406 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2407 cdl.await(10, TimeUnit.SECONDS);
2408 }
2409 signalInterruptionProcessingComplete();
2410 }
2411 }
2412
2413 protected void transportInterruptionProcessingComplete() {
2414 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2415 if (cdl != null) {
2416 cdl.countDown();
2417 try {
2418 signalInterruptionProcessingComplete();
2419 } catch (InterruptedException ignored) {}
2420 }
2421 }
2422
2423 private void signalInterruptionProcessingComplete() throws InterruptedException {
2424 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2425 if (cdl.getCount()==0) {
2426 if (LOG.isDebugEnabled()) {
2427 LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2428 }
2429 this.transportInterruptionProcessingComplete = null;
2430
2431 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2432 if (failoverTransport != null) {
2433 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2434 if (LOG.isDebugEnabled()) {
2435 LOG.debug("notified failover transport (" + failoverTransport
2436 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2437 }
2438 }
2439
2440 }
2441 }
2442
2443 private void signalInterruptionProcessingNeeded() {
2444 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2445 if (failoverTransport != null) {
2446 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2447 if (LOG.isDebugEnabled()) {
2448 LOG.debug("notified failover transport (" + failoverTransport
2449 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2450 }
2451 }
2452 }
2453
2454 /*
2455 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2456 * will wait to receive re dispatched messages.
2457 * default value is 0 so there is no wait by default.
2458 */
2459 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2460 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2461 }
2462
2463 public long getConsumerFailoverRedeliveryWaitPeriod() {
2464 return consumerFailoverRedeliveryWaitPeriod;
2465 }
2466
2467 protected Scheduler getScheduler() throws JMSException {
2468 Scheduler result = scheduler;
2469 if (result == null) {
2470 synchronized (this) {
2471 result = scheduler;
2472 if (result == null) {
2473 checkClosed();
2474 try {
2475 result = scheduler = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2476 scheduler.start();
2477 } catch(Exception e) {
2478 throw JMSExceptionSupport.create(e);
2479 }
2480 }
2481 }
2482 }
2483 return result;
2484 }
2485
2486 protected ThreadPoolExecutor getExecutor() {
2487 return this.executor;
2488 }
2489
2490 /**
2491 * @return the checkForDuplicates
2492 */
2493 public boolean isCheckForDuplicates() {
2494 return this.checkForDuplicates;
2495 }
2496
2497 /**
2498 * @param checkForDuplicates the checkForDuplicates to set
2499 */
2500 public void setCheckForDuplicates(boolean checkForDuplicates) {
2501 this.checkForDuplicates = checkForDuplicates;
2502 }
2503
2504
2505 public boolean isTransactedIndividualAck() {
2506 return transactedIndividualAck;
2507 }
2508
2509 public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2510 this.transactedIndividualAck = transactedIndividualAck;
2511 }
2512
2513 public boolean isNonBlockingRedelivery() {
2514 return nonBlockingRedelivery;
2515 }
2516
2517 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2518 this.nonBlockingRedelivery = nonBlockingRedelivery;
2519 }
2520
2521 /**
2522 * Removes any TempDestinations that this connection has cached, ignoring
2523 * any exceptions generated because the destination is in use as they should
2524 * not be removed.
2525 */
2526 public void cleanUpTempDestinations() {
2527
2528 if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2529 return;
2530 }
2531
2532 Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2533 = this.activeTempDestinations.entrySet().iterator();
2534 while(entries.hasNext()) {
2535 ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2536 try {
2537 // Only delete this temp destination if it was created from this connection. The connection used
2538 // for the advisory consumer may also have a reference to this temp destination.
2539 ActiveMQTempDestination dest = entry.getValue();
2540 String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2541 if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2542 this.deleteTempDestination(entry.getValue());
2543 }
2544 } catch (Exception ex) {
2545 // the temp dest is in use so it can not be deleted.
2546 // it is ok to leave it to connection tear down phase
2547 }
2548 }
2549 }
2550 }