001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.net.URI;
020 import java.net.URISyntaxException;
021 import java.util.HashMap;
022 import java.util.Map;
023 import java.util.Properties;
024 import java.util.concurrent.Executor;
025 import java.util.concurrent.ScheduledThreadPoolExecutor;
026 import java.util.concurrent.ThreadFactory;
027
028 import javax.jms.Connection;
029 import javax.jms.ConnectionFactory;
030 import javax.jms.ExceptionListener;
031 import javax.jms.JMSException;
032 import javax.jms.QueueConnection;
033 import javax.jms.QueueConnectionFactory;
034 import javax.jms.TopicConnection;
035 import javax.jms.TopicConnectionFactory;
036 import javax.naming.Context;
037
038 import org.apache.activemq.blob.BlobTransferPolicy;
039 import org.apache.activemq.jndi.JNDIBaseStorable;
040 import org.apache.activemq.management.JMSStatsImpl;
041 import org.apache.activemq.management.StatsCapable;
042 import org.apache.activemq.management.StatsImpl;
043 import org.apache.activemq.transport.Transport;
044 import org.apache.activemq.transport.TransportFactory;
045 import org.apache.activemq.transport.TransportListener;
046 import org.apache.activemq.util.IdGenerator;
047 import org.apache.activemq.util.IntrospectionSupport;
048 import org.apache.activemq.util.JMSExceptionSupport;
049 import org.apache.activemq.util.URISupport;
050 import org.apache.activemq.util.URISupport.CompositeData;
051
052 /**
053 * A ConnectionFactory is an an Administered object, and is used for creating
054 * Connections. <p/> This class also implements QueueConnectionFactory and
055 * TopicConnectionFactory. You can use this connection to create both
056 * QueueConnections and TopicConnections.
057 *
058 *
059 * @see javax.jms.ConnectionFactory
060 */
061 public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
062
063 public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
064 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
065 public static final String DEFAULT_USER = null;
066 public static final String DEFAULT_PASSWORD = null;
067 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
068
069 protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
070 public Thread newThread(Runnable run) {
071 Thread thread = new Thread(run);
072 thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION);
073 return thread;
074 }
075 });
076
077 protected URI brokerURL;
078 protected String userName;
079 protected String password;
080 protected String clientID;
081 protected boolean dispatchAsync=true;
082 protected boolean alwaysSessionAsync=true;
083
084 JMSStatsImpl factoryStats = new JMSStatsImpl();
085
086 private IdGenerator clientIdGenerator;
087 private String clientIDPrefix;
088 private IdGenerator connectionIdGenerator;
089 private String connectionIDPrefix;
090
091 // client policies
092 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
093 private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
094 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
095 private MessageTransformer transformer;
096
097 private boolean disableTimeStampsByDefault;
098 private boolean optimizedMessageDispatch = true;
099 private long optimizeAcknowledgeTimeOut = 300;
100 private boolean copyMessageOnSend = true;
101 private boolean useCompression;
102 private boolean objectMessageSerializationDefered;
103 private boolean useAsyncSend;
104 private boolean optimizeAcknowledge;
105 private int closeTimeout = 15000;
106 private boolean useRetroactiveConsumer;
107 private boolean exclusiveConsumer;
108 private boolean nestedMapAndListEnabled = true;
109 private boolean alwaysSyncSend;
110 private boolean watchTopicAdvisories = true;
111 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
112 private long warnAboutUnstartedConnectionTimeout = 500L;
113 private int sendTimeout = 0;
114 private boolean sendAcksAsync=true;
115 private TransportListener transportListener;
116 private ExceptionListener exceptionListener;
117 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
118 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
119 private boolean useDedicatedTaskRunner;
120 private long consumerFailoverRedeliveryWaitPeriod = 0;
121 private boolean checkForDuplicates = true;
122 private ClientInternalExceptionListener clientInternalExceptionListener;
123 private boolean messagePrioritySupported = true;
124 private boolean transactedIndividualAck = false;
125 private boolean nonBlockingRedelivery = false;
126
127 // /////////////////////////////////////////////
128 //
129 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
130 //
131 // /////////////////////////////////////////////
132
133 public ActiveMQConnectionFactory() {
134 this(DEFAULT_BROKER_URL);
135 }
136
137 public ActiveMQConnectionFactory(String brokerURL) {
138 this(createURI(brokerURL));
139 }
140
141 public ActiveMQConnectionFactory(URI brokerURL) {
142 setBrokerURL(brokerURL.toString());
143 }
144
145 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
146 setUserName(userName);
147 setPassword(password);
148 setBrokerURL(brokerURL.toString());
149 }
150
151 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
152 setUserName(userName);
153 setPassword(password);
154 setBrokerURL(brokerURL);
155 }
156
157 /**
158 * Returns a copy of the given connection factory
159 */
160 public ActiveMQConnectionFactory copy() {
161 try {
162 return (ActiveMQConnectionFactory)super.clone();
163 } catch (CloneNotSupportedException e) {
164 throw new RuntimeException("This should never happen: " + e, e);
165 }
166 }
167
168 /**
169 * @param brokerURL
170 * @return
171 * @throws URISyntaxException
172 */
173 private static URI createURI(String brokerURL) {
174 try {
175 return new URI(brokerURL);
176 } catch (URISyntaxException e) {
177 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
178 }
179 }
180
181 /**
182 * @return Returns the Connection.
183 */
184 public Connection createConnection() throws JMSException {
185 return createActiveMQConnection();
186 }
187
188 /**
189 * @return Returns the Connection.
190 */
191 public Connection createConnection(String userName, String password) throws JMSException {
192 return createActiveMQConnection(userName, password);
193 }
194
195 /**
196 * @return Returns the QueueConnection.
197 * @throws JMSException
198 */
199 public QueueConnection createQueueConnection() throws JMSException {
200 return createActiveMQConnection();
201 }
202
203 /**
204 * @return Returns the QueueConnection.
205 */
206 public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
207 return createActiveMQConnection(userName, password);
208 }
209
210 /**
211 * @return Returns the TopicConnection.
212 * @throws JMSException
213 */
214 public TopicConnection createTopicConnection() throws JMSException {
215 return createActiveMQConnection();
216 }
217
218 /**
219 * @return Returns the TopicConnection.
220 */
221 public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
222 return createActiveMQConnection(userName, password);
223 }
224
225 /**
226 * @returns the StatsImpl associated with this ConnectionFactory.
227 */
228 public StatsImpl getStats() {
229 return this.factoryStats;
230 }
231
232 // /////////////////////////////////////////////
233 //
234 // Implementation methods.
235 //
236 // /////////////////////////////////////////////
237
238 protected ActiveMQConnection createActiveMQConnection() throws JMSException {
239 return createActiveMQConnection(userName, password);
240 }
241
242 /**
243 * Creates a Transport based on this object's connection settings. Separated
244 * from createActiveMQConnection to allow for subclasses to override.
245 *
246 * @return The newly created Transport.
247 * @throws JMSException If unable to create trasnport.
248 * @author sepandm@gmail.com
249 */
250 protected Transport createTransport() throws JMSException {
251 try {
252 return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
253 } catch (Exception e) {
254 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
255 }
256 }
257
258 /**
259 * @return Returns the Connection.
260 */
261 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
262 if (brokerURL == null) {
263 throw new ConfigurationException("brokerURL not set.");
264 }
265 ActiveMQConnection connection = null;
266 try {
267 Transport transport = createTransport();
268 connection = createActiveMQConnection(transport, factoryStats);
269
270 connection.setUserName(userName);
271 connection.setPassword(password);
272
273 configureConnection(connection);
274
275 transport.start();
276
277 if (clientID != null) {
278 connection.setDefaultClientID(clientID);
279 }
280
281 return connection;
282 } catch (JMSException e) {
283 // Clean up!
284 try {
285 connection.close();
286 } catch (Throwable ignore) {
287 }
288 throw e;
289 } catch (Exception e) {
290 // Clean up!
291 try {
292 connection.close();
293 } catch (Throwable ignore) {
294 }
295 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
296 }
297 }
298
299 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
300 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
301 getConnectionIdGenerator(), stats);
302 return connection;
303 }
304
305 protected void configureConnection(ActiveMQConnection connection) throws JMSException {
306 connection.setPrefetchPolicy(getPrefetchPolicy());
307 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
308 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
309 connection.setCopyMessageOnSend(isCopyMessageOnSend());
310 connection.setUseCompression(isUseCompression());
311 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
312 connection.setDispatchAsync(isDispatchAsync());
313 connection.setUseAsyncSend(isUseAsyncSend());
314 connection.setAlwaysSyncSend(isAlwaysSyncSend());
315 connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
316 connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
317 connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
318 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
319 connection.setExclusiveConsumer(isExclusiveConsumer());
320 connection.setRedeliveryPolicy(getRedeliveryPolicy());
321 connection.setTransformer(getTransformer());
322 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
323 connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
324 connection.setProducerWindowSize(getProducerWindowSize());
325 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
326 connection.setSendTimeout(getSendTimeout());
327 connection.setCloseTimeout(getCloseTimeout());
328 connection.setSendAcksAsync(isSendAcksAsync());
329 connection.setAuditDepth(getAuditDepth());
330 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
331 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
332 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
333 connection.setCheckForDuplicates(isCheckForDuplicates());
334 connection.setMessagePrioritySupported(isMessagePrioritySupported());
335 connection.setTransactedIndividualAck(isTransactedIndividualAck());
336 connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
337 if (transportListener != null) {
338 connection.addTransportListener(transportListener);
339 }
340 if (exceptionListener != null) {
341 connection.setExceptionListener(exceptionListener);
342 }
343 if (clientInternalExceptionListener != null) {
344 connection.setClientInternalExceptionListener(clientInternalExceptionListener);
345 }
346 }
347
348 // /////////////////////////////////////////////
349 //
350 // Property Accessors
351 //
352 // /////////////////////////////////////////////
353
354 public String getBrokerURL() {
355 return brokerURL == null ? null : brokerURL.toString();
356 }
357
358 /**
359 * Sets the <a
360 * href="http://activemq.apache.org/configuring-transports.html">connection
361 * URL</a> used to connect to the ActiveMQ broker.
362 */
363 public void setBrokerURL(String brokerURL) {
364 this.brokerURL = createURI(brokerURL);
365
366 // Use all the properties prefixed with 'jms.' to set the connection
367 // factory
368 // options.
369 if (this.brokerURL.getQuery() != null) {
370 // It might be a standard URI or...
371 try {
372
373 Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
374 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
375 if (buildFromMap(jmsOptionsMap)) {
376 if (!jmsOptionsMap.isEmpty()) {
377 String msg = "There are " + jmsOptionsMap.size()
378 + " jms options that couldn't be set on the ConnectionFactory."
379 + " Check the options are spelled correctly."
380 + " Unknown parameters=[" + jmsOptionsMap + "]."
381 + " This connection factory cannot be started.";
382 throw new IllegalArgumentException(msg);
383 }
384
385 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
386 }
387
388 } catch (URISyntaxException e) {
389 }
390
391 } else {
392
393 // It might be a composite URI.
394 try {
395 CompositeData data = URISupport.parseComposite(this.brokerURL);
396 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
397 if (buildFromMap(jmsOptionsMap)) {
398 if (!jmsOptionsMap.isEmpty()) {
399 String msg = "There are " + jmsOptionsMap.size()
400 + " jms options that couldn't be set on the ConnectionFactory."
401 + " Check the options are spelled correctly."
402 + " Unknown parameters=[" + jmsOptionsMap + "]."
403 + " This connection factory cannot be started.";
404 throw new IllegalArgumentException(msg);
405 }
406
407 this.brokerURL = data.toURI();
408 }
409 } catch (URISyntaxException e) {
410 }
411 }
412 }
413
414 public String getClientID() {
415 return clientID;
416 }
417
418 /**
419 * Sets the JMS clientID to use for the created connection. Note that this
420 * can only be used by one connection at once so generally its a better idea
421 * to set the clientID on a Connection
422 */
423 public void setClientID(String clientID) {
424 this.clientID = clientID;
425 }
426
427 public boolean isCopyMessageOnSend() {
428 return copyMessageOnSend;
429 }
430
431 /**
432 * Should a JMS message be copied to a new JMS Message object as part of the
433 * send() method in JMS. This is enabled by default to be compliant with the
434 * JMS specification. You can disable it if you do not mutate JMS messages
435 * after they are sent for a performance boost
436 */
437 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
438 this.copyMessageOnSend = copyMessageOnSend;
439 }
440
441 public boolean isDisableTimeStampsByDefault() {
442 return disableTimeStampsByDefault;
443 }
444
445 /**
446 * Sets whether or not timestamps on messages should be disabled or not. If
447 * you disable them it adds a small performance boost.
448 */
449 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
450 this.disableTimeStampsByDefault = disableTimeStampsByDefault;
451 }
452
453 public boolean isOptimizedMessageDispatch() {
454 return optimizedMessageDispatch;
455 }
456
457 /**
458 * If this flag is set then an larger prefetch limit is used - only
459 * applicable for durable topic subscribers.
460 */
461 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
462 this.optimizedMessageDispatch = optimizedMessageDispatch;
463 }
464
465 public String getPassword() {
466 return password;
467 }
468
469 /**
470 * Sets the JMS password used for connections created from this factory
471 */
472 public void setPassword(String password) {
473 this.password = password;
474 }
475
476 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
477 return prefetchPolicy;
478 }
479
480 /**
481 * Sets the <a
482 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
483 * policy</a> for consumers created by this connection.
484 */
485 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
486 this.prefetchPolicy = prefetchPolicy;
487 }
488
489 public boolean isUseAsyncSend() {
490 return useAsyncSend;
491 }
492
493 public BlobTransferPolicy getBlobTransferPolicy() {
494 return blobTransferPolicy;
495 }
496
497 /**
498 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
499 * OBjects) are transferred from producers to brokers to consumers
500 */
501 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
502 this.blobTransferPolicy = blobTransferPolicy;
503 }
504
505 /**
506 * Forces the use of <a
507 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
508 * adds a massive performance boost; but means that the send() method will
509 * return immediately whether the message has been sent or not which could
510 * lead to message loss.
511 */
512 public void setUseAsyncSend(boolean useAsyncSend) {
513 this.useAsyncSend = useAsyncSend;
514 }
515
516 public synchronized boolean isWatchTopicAdvisories() {
517 return watchTopicAdvisories;
518 }
519
520 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
521 this.watchTopicAdvisories = watchTopicAdvisories;
522 }
523
524 /**
525 * @return true if always sync send messages
526 */
527 public boolean isAlwaysSyncSend() {
528 return this.alwaysSyncSend;
529 }
530
531 /**
532 * Set true if always require messages to be sync sent
533 *
534 * @param alwaysSyncSend
535 */
536 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
537 this.alwaysSyncSend = alwaysSyncSend;
538 }
539
540 public String getUserName() {
541 return userName;
542 }
543
544 /**
545 * Sets the JMS userName used by connections created by this factory
546 */
547 public void setUserName(String userName) {
548 this.userName = userName;
549 }
550
551 public boolean isUseRetroactiveConsumer() {
552 return useRetroactiveConsumer;
553 }
554
555 /**
556 * Sets whether or not retroactive consumers are enabled. Retroactive
557 * consumers allow non-durable topic subscribers to receive old messages
558 * that were published before the non-durable subscriber started.
559 */
560 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
561 this.useRetroactiveConsumer = useRetroactiveConsumer;
562 }
563
564 public boolean isExclusiveConsumer() {
565 return exclusiveConsumer;
566 }
567
568 /**
569 * Enables or disables whether or not queue consumers should be exclusive or
570 * not for example to preserve ordering when not using <a
571 * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
572 *
573 * @param exclusiveConsumer
574 */
575 public void setExclusiveConsumer(boolean exclusiveConsumer) {
576 this.exclusiveConsumer = exclusiveConsumer;
577 }
578
579 public RedeliveryPolicy getRedeliveryPolicy() {
580 return redeliveryPolicy;
581 }
582
583 /**
584 * Sets the global redelivery policy to be used when a message is delivered
585 * but the session is rolled back
586 */
587 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
588 this.redeliveryPolicy = redeliveryPolicy;
589 }
590
591 public MessageTransformer getTransformer() {
592 return transformer;
593 }
594
595 /**
596 * @return the sendTimeout
597 */
598 public int getSendTimeout() {
599 return sendTimeout;
600 }
601
602 /**
603 * @param sendTimeout the sendTimeout to set
604 */
605 public void setSendTimeout(int sendTimeout) {
606 this.sendTimeout = sendTimeout;
607 }
608
609 /**
610 * @return the sendAcksAsync
611 */
612 public boolean isSendAcksAsync() {
613 return sendAcksAsync;
614 }
615
616 /**
617 * @param sendAcksAsync the sendAcksAsync to set
618 */
619 public void setSendAcksAsync(boolean sendAcksAsync) {
620 this.sendAcksAsync = sendAcksAsync;
621 }
622
623 /**
624 * @return the messagePrioritySupported
625 */
626 public boolean isMessagePrioritySupported() {
627 return this.messagePrioritySupported;
628 }
629
630 /**
631 * @param messagePrioritySupported the messagePrioritySupported to set
632 */
633 public void setMessagePrioritySupported(boolean messagePrioritySupported) {
634 this.messagePrioritySupported = messagePrioritySupported;
635 }
636
637
638 /**
639 * Sets the transformer used to transform messages before they are sent on
640 * to the JMS bus or when they are received from the bus but before they are
641 * delivered to the JMS client
642 */
643 public void setTransformer(MessageTransformer transformer) {
644 this.transformer = transformer;
645 }
646
647 @SuppressWarnings({ "unchecked", "rawtypes" })
648 @Override
649 public void buildFromProperties(Properties properties) {
650
651 if (properties == null) {
652 properties = new Properties();
653 }
654
655 String temp = properties.getProperty(Context.PROVIDER_URL);
656 if (temp == null || temp.length() == 0) {
657 temp = properties.getProperty("brokerURL");
658 }
659 if (temp != null && temp.length() > 0) {
660 setBrokerURL(temp);
661 }
662
663 Map<String, Object> p = new HashMap(properties);
664 buildFromMap(p);
665 }
666
667 public boolean buildFromMap(Map<String, Object> properties) {
668 boolean rc = false;
669
670 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
671 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
672 setPrefetchPolicy(p);
673 rc = true;
674 }
675
676 RedeliveryPolicy rp = new RedeliveryPolicy();
677 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
678 setRedeliveryPolicy(rp);
679 rc = true;
680 }
681
682 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
683 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
684 setBlobTransferPolicy(blobTransferPolicy);
685 rc = true;
686 }
687
688 rc |= IntrospectionSupport.setProperties(this, properties);
689
690 return rc;
691 }
692
693 @Override
694 public void populateProperties(Properties props) {
695 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
696
697 if (getBrokerURL() != null) {
698 props.setProperty(Context.PROVIDER_URL, getBrokerURL());
699 props.setProperty("brokerURL", getBrokerURL());
700 }
701
702 if (getClientID() != null) {
703 props.setProperty("clientID", getClientID());
704 }
705
706 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
707 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
708 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
709
710 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
711 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
712 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
713 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
714
715 if (getPassword() != null) {
716 props.setProperty("password", getPassword());
717 }
718
719 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
720 props.setProperty("useCompression", Boolean.toString(isUseCompression()));
721 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
722 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
723
724 if (getUserName() != null) {
725 props.setProperty("userName", getUserName());
726 }
727
728 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
729 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
730 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
731 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
732 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
733 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
734 props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
735 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
736 props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
737 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
738 props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
739 props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
740 props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
741 props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
742 }
743
744 public boolean isUseCompression() {
745 return useCompression;
746 }
747
748 /**
749 * Enables the use of compression of the message bodies
750 */
751 public void setUseCompression(boolean useCompression) {
752 this.useCompression = useCompression;
753 }
754
755 public boolean isObjectMessageSerializationDefered() {
756 return objectMessageSerializationDefered;
757 }
758
759 /**
760 * When an object is set on an ObjectMessage, the JMS spec requires the
761 * object to be serialized by that set method. Enabling this flag causes the
762 * object to not get serialized. The object may subsequently get serialized
763 * if the message needs to be sent over a socket or stored to disk.
764 */
765 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
766 this.objectMessageSerializationDefered = objectMessageSerializationDefered;
767 }
768
769 public boolean isDispatchAsync() {
770 return dispatchAsync;
771 }
772
773 /**
774 * Enables or disables the default setting of whether or not consumers have
775 * their messages <a
776 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
777 * synchronously or asynchronously by the broker</a>. For non-durable
778 * topics for example we typically dispatch synchronously by default to
779 * minimize context switches which boost performance. However sometimes its
780 * better to go slower to ensure that a single blocked consumer socket does
781 * not block delivery to other consumers.
782 *
783 * @param asyncDispatch If true then consumers created on this connection
784 * will default to having their messages dispatched
785 * asynchronously. The default value is true.
786 */
787 public void setDispatchAsync(boolean asyncDispatch) {
788 this.dispatchAsync = asyncDispatch;
789 }
790
791 /**
792 * @return Returns the closeTimeout.
793 */
794 public int getCloseTimeout() {
795 return closeTimeout;
796 }
797
798 /**
799 * Sets the timeout before a close is considered complete. Normally a
800 * close() on a connection waits for confirmation from the broker; this
801 * allows that operation to timeout to save the client hanging if there is
802 * no broker
803 */
804 public void setCloseTimeout(int closeTimeout) {
805 this.closeTimeout = closeTimeout;
806 }
807
808 /**
809 * @return Returns the alwaysSessionAsync.
810 */
811 public boolean isAlwaysSessionAsync() {
812 return alwaysSessionAsync;
813 }
814
815 /**
816 * If this flag is set then a separate thread is not used for dispatching
817 * messages for each Session in the Connection. However, a separate thread
818 * is always used if there is more than one session, or the session isn't in
819 * auto acknowledge or duplicates ok mode
820 */
821 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
822 this.alwaysSessionAsync = alwaysSessionAsync;
823 }
824
825 /**
826 * @return Returns the optimizeAcknowledge.
827 */
828 public boolean isOptimizeAcknowledge() {
829 return optimizeAcknowledge;
830 }
831
832 /**
833 * @param optimizeAcknowledge The optimizeAcknowledge to set.
834 */
835 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
836 this.optimizeAcknowledge = optimizeAcknowledge;
837 }
838
839 /**
840 * The max time in milliseconds between optimized ack batches
841 * @param optimizeAcknowledgeTimeOut
842 */
843 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
844 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
845 }
846
847 public long getOptimizeAcknowledgeTimeOut() {
848 return optimizeAcknowledgeTimeOut;
849 }
850
851 public boolean isNestedMapAndListEnabled() {
852 return nestedMapAndListEnabled;
853 }
854
855 /**
856 * Enables/disables whether or not Message properties and MapMessage entries
857 * support <a
858 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
859 * Structures</a> of Map and List objects
860 */
861 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
862 this.nestedMapAndListEnabled = structuredMapsEnabled;
863 }
864
865 public String getClientIDPrefix() {
866 return clientIDPrefix;
867 }
868
869 /**
870 * Sets the prefix used by autogenerated JMS Client ID values which are used
871 * if the JMS client does not explicitly specify on.
872 *
873 * @param clientIDPrefix
874 */
875 public void setClientIDPrefix(String clientIDPrefix) {
876 this.clientIDPrefix = clientIDPrefix;
877 }
878
879 protected synchronized IdGenerator getClientIdGenerator() {
880 if (clientIdGenerator == null) {
881 if (clientIDPrefix != null) {
882 clientIdGenerator = new IdGenerator(clientIDPrefix);
883 } else {
884 clientIdGenerator = new IdGenerator();
885 }
886 }
887 return clientIdGenerator;
888 }
889
890 protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
891 this.clientIdGenerator = clientIdGenerator;
892 }
893
894 /**
895 * Sets the prefix used by connection id generator
896 * @param connectionIDPrefix
897 */
898 public void setConnectionIDPrefix(String connectionIDPrefix) {
899 this.connectionIDPrefix = connectionIDPrefix;
900 }
901
902 protected synchronized IdGenerator getConnectionIdGenerator() {
903 if (connectionIdGenerator == null) {
904 if (connectionIDPrefix != null) {
905 connectionIdGenerator = new IdGenerator(connectionIDPrefix);
906 } else {
907 connectionIdGenerator = new IdGenerator();
908 }
909 }
910 return connectionIdGenerator;
911 }
912
913 protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
914 this.connectionIdGenerator = connectionIdGenerator;
915 }
916
917 /**
918 * @return the statsEnabled
919 */
920 public boolean isStatsEnabled() {
921 return this.factoryStats.isEnabled();
922 }
923
924 /**
925 * @param statsEnabled the statsEnabled to set
926 */
927 public void setStatsEnabled(boolean statsEnabled) {
928 this.factoryStats.setEnabled(statsEnabled);
929 }
930
931 public synchronized int getProducerWindowSize() {
932 return producerWindowSize;
933 }
934
935 public synchronized void setProducerWindowSize(int producerWindowSize) {
936 this.producerWindowSize = producerWindowSize;
937 }
938
939 public long getWarnAboutUnstartedConnectionTimeout() {
940 return warnAboutUnstartedConnectionTimeout;
941 }
942
943 /**
944 * Enables the timeout from a connection creation to when a warning is
945 * generated if the connection is not properly started via
946 * {@link Connection#start()} and a message is received by a consumer. It is
947 * a very common gotcha to forget to <a
948 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
949 * the connection</a> so this option makes the default case to create a
950 * warning if the user forgets. To disable the warning just set the value to <
951 * 0 (say -1).
952 */
953 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
954 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
955 }
956
957 public TransportListener getTransportListener() {
958 return transportListener;
959 }
960
961 /**
962 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
963 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
964 * a transport listener.
965 *
966 * @param transportListener sets the listener to be registered on all connections
967 * created by this factory
968 */
969 public void setTransportListener(TransportListener transportListener) {
970 this.transportListener = transportListener;
971 }
972
973
974 public ExceptionListener getExceptionListener() {
975 return exceptionListener;
976 }
977
978 /**
979 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
980 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
981 * an exception listener.
982 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
983 * on connection (as it will be if more than one connection is subsequently created by this connection factory)
984 * @param exceptionListener sets the exception listener to be registered on all connections
985 * created by this factory
986 */
987 public void setExceptionListener(ExceptionListener exceptionListener) {
988 this.exceptionListener = exceptionListener;
989 }
990
991 public int getAuditDepth() {
992 return auditDepth;
993 }
994
995 public void setAuditDepth(int auditDepth) {
996 this.auditDepth = auditDepth;
997 }
998
999 public int getAuditMaximumProducerNumber() {
1000 return auditMaximumProducerNumber;
1001 }
1002
1003 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1004 this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1005 }
1006
1007 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1008 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1009 }
1010
1011 public boolean isUseDedicatedTaskRunner() {
1012 return useDedicatedTaskRunner;
1013 }
1014
1015 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1016 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1017 }
1018
1019 public long getConsumerFailoverRedeliveryWaitPeriod() {
1020 return consumerFailoverRedeliveryWaitPeriod;
1021 }
1022
1023 public ClientInternalExceptionListener getClientInternalExceptionListener() {
1024 return clientInternalExceptionListener;
1025 }
1026
1027 /**
1028 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1029 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1030 * an exception listener.
1031 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1032 * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1033 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1034 * created by this factory
1035 */
1036 public void setClientInternalExceptionListener(
1037 ClientInternalExceptionListener clientInternalExceptionListener) {
1038 this.clientInternalExceptionListener = clientInternalExceptionListener;
1039 }
1040
1041 /**
1042 * @return the checkForDuplicates
1043 */
1044 public boolean isCheckForDuplicates() {
1045 return this.checkForDuplicates;
1046 }
1047
1048 /**
1049 * @param checkForDuplicates the checkForDuplicates to set
1050 */
1051 public void setCheckForDuplicates(boolean checkForDuplicates) {
1052 this.checkForDuplicates = checkForDuplicates;
1053 }
1054
1055 public boolean isTransactedIndividualAck() {
1056 return transactedIndividualAck;
1057 }
1058
1059 /**
1060 * when true, submit individual transacted acks immediately rather than with transaction completion.
1061 * This allows the acks to represent delivery status which can be persisted on rollback
1062 * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean) true
1063 */
1064 public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1065 this.transactedIndividualAck = transactedIndividualAck;
1066 }
1067
1068
1069 public boolean isNonBlockingRedelivery() {
1070 return nonBlockingRedelivery;
1071 }
1072
1073 /**
1074 * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1075 * from a rolled back transaction. This implies that message order will not be preserved and
1076 * also will result in the TransactedIndividualAck option to be enabled.
1077 */
1078 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1079 this.nonBlockingRedelivery = nonBlockingRedelivery;
1080 }
1081
1082 }