001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023 import java.net.UnknownHostException;
024 import java.util.ArrayList;
025 import java.util.HashMap;
026 import java.util.HashSet;
027 import java.util.Iterator;
028 import java.util.List;
029 import java.util.Map;
030 import java.util.Set;
031 import java.util.concurrent.CopyOnWriteArrayList;
032 import java.util.concurrent.CountDownLatch;
033 import java.util.concurrent.LinkedBlockingQueue;
034 import java.util.concurrent.RejectedExecutionException;
035 import java.util.concurrent.RejectedExecutionHandler;
036 import java.util.concurrent.SynchronousQueue;
037 import java.util.concurrent.ThreadFactory;
038 import java.util.concurrent.ThreadPoolExecutor;
039 import java.util.concurrent.TimeUnit;
040 import java.util.concurrent.atomic.AtomicBoolean;
041
042 import javax.annotation.PostConstruct;
043 import javax.annotation.PreDestroy;
044 import javax.management.MalformedObjectNameException;
045 import javax.management.ObjectName;
046
047 import org.apache.activemq.ActiveMQConnectionMetaData;
048 import org.apache.activemq.ConfigurationException;
049 import org.apache.activemq.Service;
050 import org.apache.activemq.advisory.AdvisoryBroker;
051 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
052 import org.apache.activemq.broker.ft.MasterConnector;
053 import org.apache.activemq.broker.jmx.AnnotatedMBean;
054 import org.apache.activemq.broker.jmx.BrokerView;
055 import org.apache.activemq.broker.jmx.ConnectorView;
056 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
057 import org.apache.activemq.broker.jmx.FTConnectorView;
058 import org.apache.activemq.broker.jmx.JmsConnectorView;
059 import org.apache.activemq.broker.jmx.JobSchedulerView;
060 import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
061 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
062 import org.apache.activemq.broker.jmx.ManagementContext;
063 import org.apache.activemq.broker.jmx.NetworkConnectorView;
064 import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
065 import org.apache.activemq.broker.jmx.ProxyConnectorView;
066 import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
067 import org.apache.activemq.broker.region.Destination;
068 import org.apache.activemq.broker.region.DestinationFactory;
069 import org.apache.activemq.broker.region.DestinationFactoryImpl;
070 import org.apache.activemq.broker.region.DestinationInterceptor;
071 import org.apache.activemq.broker.region.RegionBroker;
072 import org.apache.activemq.broker.region.policy.PolicyMap;
073 import org.apache.activemq.broker.region.virtual.MirroredQueue;
074 import org.apache.activemq.broker.region.virtual.VirtualDestination;
075 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
076 import org.apache.activemq.broker.region.virtual.VirtualTopic;
077 import org.apache.activemq.broker.scheduler.SchedulerBroker;
078 import org.apache.activemq.command.ActiveMQDestination;
079 import org.apache.activemq.command.ActiveMQQueue;
080 import org.apache.activemq.command.BrokerId;
081 import org.apache.activemq.filter.DestinationFilter;
082 import org.apache.activemq.network.ConnectionFilter;
083 import org.apache.activemq.network.DiscoveryNetworkConnector;
084 import org.apache.activemq.network.NetworkConnector;
085 import org.apache.activemq.network.jms.JmsConnector;
086 import org.apache.activemq.proxy.ProxyConnector;
087 import org.apache.activemq.security.MessageAuthorizationPolicy;
088 import org.apache.activemq.selector.SelectorParser;
089 import org.apache.activemq.store.PersistenceAdapter;
090 import org.apache.activemq.store.PersistenceAdapterFactory;
091 import org.apache.activemq.store.amq.AMQPersistenceAdapter;
092 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
093 import org.apache.activemq.store.kahadb.plist.PListStore;
094 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
095 import org.apache.activemq.thread.Scheduler;
096 import org.apache.activemq.thread.TaskRunnerFactory;
097 import org.apache.activemq.transport.TransportFactory;
098 import org.apache.activemq.transport.TransportServer;
099 import org.apache.activemq.transport.vm.VMTransportFactory;
100 import org.apache.activemq.usage.SystemUsage;
101 import org.apache.activemq.util.BrokerSupport;
102 import org.apache.activemq.util.DefaultIOExceptionHandler;
103 import org.apache.activemq.util.IOExceptionHandler;
104 import org.apache.activemq.util.IOExceptionSupport;
105 import org.apache.activemq.util.IOHelper;
106 import org.apache.activemq.util.InetAddressUtil;
107 import org.apache.activemq.util.JMXSupport;
108 import org.apache.activemq.util.ServiceStopper;
109 import org.apache.activemq.util.URISupport;
110 import org.slf4j.Logger;
111 import org.slf4j.LoggerFactory;
112 import org.slf4j.MDC;
113
114 /**
115 * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
116 * number of transport connectors, network connectors and a bunch of properties
117 * which can be used to configure the broker as its lazily created.
118 *
119 *
120 * @org.apache.xbean.XBean
121 */
122 public class BrokerService implements Service {
123 protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
124 public static final String DEFAULT_PORT = "61616";
125 public static final String LOCAL_HOST_NAME;
126 public static final String DEFAULT_BROKER_NAME = "localhost";
127 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
128 private static final long serialVersionUID = 7353129142305630237L;
129 private boolean useJmx = true;
130 private boolean enableStatistics = true;
131 private boolean persistent = true;
132 private boolean populateJMSXUserID;
133 private boolean useAuthenticatedPrincipalForJMSXUserID;
134 private boolean populateUserNameInMBeans;
135
136 private boolean useShutdownHook = true;
137 private boolean useLoggingForShutdownErrors;
138 private boolean shutdownOnMasterFailure;
139 private boolean shutdownOnSlaveFailure;
140 private boolean waitForSlave;
141 private long waitForSlaveTimeout = 600000L;
142 private boolean passiveSlave;
143 private String brokerName = DEFAULT_BROKER_NAME;
144 private File dataDirectoryFile;
145 private File tmpDataDirectory;
146 private Broker broker;
147 private BrokerView adminView;
148 private ManagementContext managementContext;
149 private ObjectName brokerObjectName;
150 private TaskRunnerFactory taskRunnerFactory;
151 private TaskRunnerFactory persistenceTaskRunnerFactory;
152 private SystemUsage systemUsage;
153 private SystemUsage producerSystemUsage;
154 private SystemUsage consumerSystemUsaage;
155 private PersistenceAdapter persistenceAdapter;
156 private PersistenceAdapterFactory persistenceFactory;
157 protected DestinationFactory destinationFactory;
158 private MessageAuthorizationPolicy messageAuthorizationPolicy;
159 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
160 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
161 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
162 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
163 private final List<Service> services = new ArrayList<Service>();
164 private MasterConnector masterConnector;
165 private String masterConnectorURI;
166 private transient Thread shutdownHook;
167 private String[] transportConnectorURIs;
168 private String[] networkConnectorURIs;
169 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
170 // to other jms messaging
171 // systems
172 private boolean deleteAllMessagesOnStartup;
173 private boolean advisorySupport = true;
174 private URI vmConnectorURI;
175 private String defaultSocketURIString;
176 private PolicyMap destinationPolicy;
177 private final AtomicBoolean started = new AtomicBoolean(false);
178 private final AtomicBoolean stopped = new AtomicBoolean(false);
179 private BrokerPlugin[] plugins;
180 private boolean keepDurableSubsActive = true;
181 private boolean useVirtualTopics = true;
182 private boolean useMirroredQueues = false;
183 private boolean useTempMirroredQueues = true;
184 private BrokerId brokerId;
185 private DestinationInterceptor[] destinationInterceptors;
186 private ActiveMQDestination[] destinations;
187 private PListStore tempDataStore;
188 private int persistenceThreadPriority = Thread.MAX_PRIORITY;
189 private boolean useLocalHostBrokerName;
190 private final CountDownLatch stoppedLatch = new CountDownLatch(1);
191 private final CountDownLatch startedLatch = new CountDownLatch(1);
192 private boolean supportFailOver;
193 private Broker regionBroker;
194 private int producerSystemUsagePortion = 60;
195 private int consumerSystemUsagePortion = 40;
196 private boolean splitSystemUsageForProducersConsumers;
197 private boolean monitorConnectionSplits = false;
198 private int taskRunnerPriority = Thread.NORM_PRIORITY;
199 private boolean dedicatedTaskRunner;
200 private boolean cacheTempDestinations = false;// useful for failover
201 private int timeBeforePurgeTempDestinations = 5000;
202 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
203 private boolean systemExitOnShutdown;
204 private int systemExitOnShutdownExitCode;
205 private SslContext sslContext;
206 private boolean forceStart = false;
207 private IOExceptionHandler ioExceptionHandler;
208 private boolean schedulerSupport = false;
209 private File schedulerDirectoryFile;
210 private Scheduler scheduler;
211 private ThreadPoolExecutor executor;
212 private boolean slave = true;
213 private int schedulePeriodForDestinationPurge= 0;
214 private int maxPurgedDestinationsPerSweep = 0;
215 private BrokerContext brokerContext;
216 private boolean networkConnectorStartAsync = false;
217 private boolean allowTempAutoCreationOnSend;
218
219 private int offlineDurableSubscriberTimeout = -1;
220 private int offlineDurableSubscriberTaskSchedule = 300000;
221 private DestinationFilter virtualConsumerDestinationFilter;
222
223 static {
224 String localHostName = "localhost";
225 try {
226 localHostName = InetAddressUtil.getLocalHostName();
227 } catch (UnknownHostException e) {
228 LOG.error("Failed to resolve localhost");
229 }
230 LOCAL_HOST_NAME = localHostName;
231 }
232
233 @Override
234 public String toString() {
235 return "BrokerService[" + getBrokerName() + "]";
236 }
237
238 /**
239 * Adds a new transport connector for the given bind address
240 *
241 * @return the newly created and added transport connector
242 * @throws Exception
243 */
244 public TransportConnector addConnector(String bindAddress) throws Exception {
245 return addConnector(new URI(bindAddress));
246 }
247
248 /**
249 * Adds a new transport connector for the given bind address
250 *
251 * @return the newly created and added transport connector
252 * @throws Exception
253 */
254 public TransportConnector addConnector(URI bindAddress) throws Exception {
255 return addConnector(createTransportConnector(bindAddress));
256 }
257
258 /**
259 * Adds a new transport connector for the given TransportServer transport
260 *
261 * @return the newly created and added transport connector
262 * @throws Exception
263 */
264 public TransportConnector addConnector(TransportServer transport) throws Exception {
265 return addConnector(new TransportConnector(transport));
266 }
267
268 /**
269 * Adds a new transport connector
270 *
271 * @return the transport connector
272 * @throws Exception
273 */
274 public TransportConnector addConnector(TransportConnector connector) throws Exception {
275 transportConnectors.add(connector);
276 return connector;
277 }
278
279 /**
280 * Stops and removes a transport connector from the broker.
281 *
282 * @param connector
283 * @return true if the connector has been previously added to the broker
284 * @throws Exception
285 */
286 public boolean removeConnector(TransportConnector connector) throws Exception {
287 boolean rc = transportConnectors.remove(connector);
288 if (rc) {
289 unregisterConnectorMBean(connector);
290 }
291 return rc;
292 }
293
294 /**
295 * Adds a new network connector using the given discovery address
296 *
297 * @return the newly created and added network connector
298 * @throws Exception
299 */
300 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
301 return addNetworkConnector(new URI(discoveryAddress));
302 }
303
304 /**
305 * Adds a new proxy connector using the given bind address
306 *
307 * @return the newly created and added network connector
308 * @throws Exception
309 */
310 public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
311 return addProxyConnector(new URI(bindAddress));
312 }
313
314 /**
315 * Adds a new network connector using the given discovery address
316 *
317 * @return the newly created and added network connector
318 * @throws Exception
319 */
320 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
321 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
322 return addNetworkConnector(connector);
323 }
324
325 /**
326 * Adds a new proxy connector using the given bind address
327 *
328 * @return the newly created and added network connector
329 * @throws Exception
330 */
331 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
332 ProxyConnector connector = new ProxyConnector();
333 connector.setBind(bindAddress);
334 connector.setRemote(new URI("fanout:multicast://default"));
335 return addProxyConnector(connector);
336 }
337
338 /**
339 * Adds a new network connector to connect this broker to a federated
340 * network
341 */
342 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
343 connector.setBrokerService(this);
344 URI uri = getVmConnectorURI();
345 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
346 map.put("network", "true");
347 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
348 connector.setLocalUri(uri);
349 // Set a connection filter so that the connector does not establish loop
350 // back connections.
351 connector.setConnectionFilter(new ConnectionFilter() {
352 public boolean connectTo(URI location) {
353 List<TransportConnector> transportConnectors = getTransportConnectors();
354 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
355 try {
356 TransportConnector tc = iter.next();
357 if (location.equals(tc.getConnectUri())) {
358 return false;
359 }
360 } catch (Throwable e) {
361 }
362 }
363 return true;
364 }
365 });
366 networkConnectors.add(connector);
367 if (isUseJmx()) {
368 registerNetworkConnectorMBean(connector);
369 }
370 return connector;
371 }
372
373 /**
374 * Removes the given network connector without stopping it. The caller
375 * should call {@link NetworkConnector#stop()} to close the connector
376 */
377 public boolean removeNetworkConnector(NetworkConnector connector) {
378 boolean answer = networkConnectors.remove(connector);
379 if (answer) {
380 unregisterNetworkConnectorMBean(connector);
381 }
382 return answer;
383 }
384
385 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
386 URI uri = getVmConnectorURI();
387 connector.setLocalUri(uri);
388 proxyConnectors.add(connector);
389 if (isUseJmx()) {
390 registerProxyConnectorMBean(connector);
391 }
392 return connector;
393 }
394
395 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
396 connector.setBrokerService(this);
397 jmsConnectors.add(connector);
398 if (isUseJmx()) {
399 registerJmsConnectorMBean(connector);
400 }
401 return connector;
402 }
403
404 public JmsConnector removeJmsConnector(JmsConnector connector) {
405 if (jmsConnectors.remove(connector)) {
406 return connector;
407 }
408 return null;
409 }
410
411 /**
412 * @return Returns the masterConnectorURI.
413 */
414 public String getMasterConnectorURI() {
415 return masterConnectorURI;
416 }
417
418 /**
419 * @param masterConnectorURI
420 * The masterConnectorURI to set.
421 */
422 public void setMasterConnectorURI(String masterConnectorURI) {
423 this.masterConnectorURI = masterConnectorURI;
424 }
425
426 /**
427 * @return true if this Broker is a slave to a Master
428 */
429 public boolean isSlave() {
430 return (masterConnector != null && masterConnector.isSlave()) ||
431 (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
432 (masterConnector == null && slave);
433 }
434
435 public void masterFailed() {
436 if (shutdownOnMasterFailure) {
437 LOG.error("The Master has failed ... shutting down");
438 try {
439 stop();
440 } catch (Exception e) {
441 LOG.error("Failed to stop for master failure", e);
442 }
443 } else {
444 LOG.warn("Master Failed - starting all connectors");
445 try {
446 startAllConnectors();
447 broker.nowMasterBroker();
448 } catch (Exception e) {
449 LOG.error("Failed to startAllConnectors", e);
450 }
451 }
452 }
453
454 public boolean isStarted() {
455 return started.get();
456 }
457
458 /**
459 * Forces a start of the broker.
460 * By default a BrokerService instance that was
461 * previously stopped using BrokerService.stop() cannot be restarted
462 * using BrokerService.start().
463 * This method enforces a restart.
464 * It is not recommended to force a restart of the broker and will not work
465 * for most but some very trivial broker configurations.
466 * For restarting a broker instance we recommend to first call stop() on
467 * the old instance and then recreate a new BrokerService instance.
468 *
469 * @param force - if true enforces a restart.
470 * @throws Exception
471 */
472 public void start(boolean force) throws Exception {
473 forceStart = force;
474 stopped.set(false);
475 started.set(false);
476 start();
477 }
478
479 // Service interface
480 // -------------------------------------------------------------------------
481
482 protected boolean shouldAutostart() {
483 return true;
484 }
485
486 /**
487 *
488 * @throws Exception
489 * @org. apache.xbean.InitMethod
490 */
491 @PostConstruct
492 public void autoStart() throws Exception {
493 if(shouldAutostart()) {
494 start();
495 }
496 }
497
498 public void start() throws Exception {
499 if (stopped.get() || !started.compareAndSet(false, true)) {
500 // lets just ignore redundant start() calls
501 // as its way too easy to not be completely sure if start() has been
502 // called or not with the gazillion of different configuration
503 // mechanisms
504 // throw new IllegalStateException("Allready started.");
505 return;
506 }
507
508 MDC.put("activemq.broker", brokerName);
509
510 try {
511 if (systemExitOnShutdown && useShutdownHook) {
512 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
513 }
514 processHelperProperties();
515 if (isUseJmx()) {
516 startManagementContext();
517 }
518
519 getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
520 getPersistenceAdapter().setBrokerName(getBrokerName());
521 LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
522 if (deleteAllMessagesOnStartup) {
523 deleteAllMessages();
524 }
525 getPersistenceAdapter().start();
526 slave = false;
527 startDestinations();
528 addShutdownHook();
529 getBroker().start();
530 if (isUseJmx()) {
531 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
532 // try to restart management context
533 // typical for slaves that use the same ports as master
534 managementContext.stop();
535 startManagementContext();
536 }
537 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
538 managedBroker.setContextBroker(broker);
539 adminView.setBroker(managedBroker);
540 }
541 BrokerRegistry.getInstance().bind(getBrokerName(), this);
542 // see if there is a MasterBroker service and if so, configure
543 // it and start it.
544 for (Service service : services) {
545 if (service instanceof MasterConnector) {
546 configureService(service);
547 service.start();
548 }
549 }
550 if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
551 startAllConnectors();
552 }
553 if (!stopped.get()) {
554 if (isUseJmx() && masterConnector != null) {
555 registerFTConnectorMBean(masterConnector);
556 }
557 }
558 if (brokerId == null) {
559 brokerId = broker.getBrokerId();
560 }
561 if (ioExceptionHandler == null) {
562 setIoExceptionHandler(new DefaultIOExceptionHandler());
563 }
564 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
565 getBroker().brokerServiceStarted();
566 checkSystemUsageLimits();
567 startedLatch.countDown();
568 } catch (Exception e) {
569 LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
570 try {
571 if (!stopped.get()) {
572 stop();
573 }
574 } catch (Exception ex) {
575 LOG.warn("Failed to stop broker after failure in start ", ex);
576 }
577 throw e;
578 } finally {
579 MDC.remove("activemq.broker");
580 }
581 }
582
583 /**
584 *
585 * @throws Exception
586 * @org.apache .xbean.DestroyMethod
587 */
588 @PreDestroy
589 public void stop() throws Exception {
590 if (!started.get()) {
591 return;
592 }
593
594 MDC.put("activemq.broker", brokerName);
595
596 if (systemExitOnShutdown) {
597 new Thread() {
598 @Override
599 public void run() {
600 System.exit(systemExitOnShutdownExitCode);
601 }
602 }.start();
603 }
604
605 LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
606 removeShutdownHook();
607 if (this.scheduler != null) {
608 this.scheduler.stop();
609 this.scheduler = null;
610 }
611 ServiceStopper stopper = new ServiceStopper();
612 if (services != null) {
613 for (Service service : services) {
614 stopper.stop(service);
615 }
616 }
617 stopAllConnectors(stopper);
618 // remove any VMTransports connected
619 // this has to be done after services are stopped,
620 // to avoid timimg issue with discovery (spinning up a new instance)
621 BrokerRegistry.getInstance().unbind(getBrokerName());
622 VMTransportFactory.stopped(getBrokerName());
623 if (broker != null) {
624 stopper.stop(broker);
625 broker = null;
626 }
627
628 if (tempDataStore != null) {
629 tempDataStore.stop();
630 tempDataStore = null;
631 }
632 try {
633 stopper.stop(persistenceAdapter);
634 persistenceAdapter = null;
635 slave = true;
636 if (isUseJmx()) {
637 stopper.stop(getManagementContext());
638 managementContext = null;
639 }
640 // Clear SelectorParser cache to free memory
641 SelectorParser.clearCache();
642 } finally {
643 stopped.set(true);
644 stoppedLatch.countDown();
645 }
646 if (masterConnectorURI == null) {
647 // master start has not finished yet
648 if (slaveStartSignal.getCount() == 1) {
649 started.set(false);
650 slaveStartSignal.countDown();
651 }
652 } else {
653 for (Service service : services) {
654 if (service instanceof MasterConnector) {
655 MasterConnector mConnector = (MasterConnector) service;
656 if (!mConnector.isSlave()) {
657 // means should be slave but not connected to master yet
658 started.set(false);
659 mConnector.stopBeforeConnected();
660 }
661 }
662 }
663 }
664 if (this.taskRunnerFactory != null) {
665 this.taskRunnerFactory.shutdown();
666 this.taskRunnerFactory = null;
667 }
668 if (this.executor != null) {
669 this.executor.shutdownNow();
670 this.executor = null;
671 }
672
673 this.destinationInterceptors = null;
674 this.destinationFactory = null;
675
676 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
677 synchronized (shutdownHooks) {
678 for (Runnable hook : shutdownHooks) {
679 try {
680 hook.run();
681 } catch (Throwable e) {
682 stopper.onException(hook, e);
683 }
684 }
685 }
686
687 MDC.remove("activemq.broker");
688
689 stopper.throwFirstException();
690 }
691
692 public boolean checkQueueSize(String queueName) {
693 long count = 0;
694 long queueSize = 0;
695 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
696 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
697 if (entry.getKey().isQueue()) {
698 if (entry.getValue().getName().matches(queueName)) {
699 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
700 count += queueSize;
701 if (queueSize > 0) {
702 LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
703 + queueSize);
704 }
705 }
706 }
707 }
708 return count == 0;
709 }
710
711 /**
712 * This method (both connectorName and queueName are using regex to match)
713 * 1. stop the connector (supposed the user input the connector which the
714 * clients connect to) 2. to check whether there is any pending message on
715 * the queues defined by queueName 3. supposedly, after stop the connector,
716 * client should failover to other broker and pending messages should be
717 * forwarded. if no pending messages, the method finally call stop to stop
718 * the broker.
719 *
720 * @param connectorName
721 * @param queueName
722 * @param timeout
723 * @param pollInterval
724 * @throws Exception
725 */
726 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
727 throws Exception {
728 if (isUseJmx()) {
729 if (connectorName == null || queueName == null || timeout <= 0) {
730 throw new Exception(
731 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
732 }
733 if (pollInterval <= 0) {
734 pollInterval = 30;
735 }
736 LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
737 + timeout + " pollInterval:" + pollInterval);
738 TransportConnector connector;
739 for (int i = 0; i < transportConnectors.size(); i++) {
740 connector = transportConnectors.get(i);
741 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
742 connector.stop();
743 }
744 }
745 long start = System.currentTimeMillis();
746 while (System.currentTimeMillis() - start < timeout * 1000) {
747 // check quesize until it gets zero
748 if (checkQueueSize(queueName)) {
749 stop();
750 break;
751 } else {
752 Thread.sleep(pollInterval * 1000);
753 }
754 }
755 if (stopped.get()) {
756 LOG.info("Successfully stop the broker.");
757 } else {
758 LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
759 }
760 }
761 }
762
763 /**
764 * A helper method to block the caller thread until the broker has been
765 * stopped
766 */
767 public void waitUntilStopped() {
768 while (isStarted() && !stopped.get()) {
769 try {
770 stoppedLatch.await();
771 } catch (InterruptedException e) {
772 // ignore
773 }
774 }
775 }
776
777 /**
778 * A helper method to block the caller thread until the broker has fully started
779 * @return boolean true if wait succeeded false if broker was not started or was stopped
780 */
781 public boolean waitUntilStarted() {
782 boolean waitSucceeded = false;
783 while (isStarted() && !stopped.get() && !waitSucceeded) {
784 try {
785 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
786 } catch (InterruptedException ignore) {
787 }
788 }
789 return waitSucceeded;
790 }
791
792 // Properties
793 // -------------------------------------------------------------------------
794 /**
795 * Returns the message broker
796 */
797 public Broker getBroker() throws Exception {
798 if (broker == null) {
799 LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
800 + getBrokerName() + ") is starting");
801 LOG.info("For help or more information please see: http://activemq.apache.org/");
802 broker = createBroker();
803 }
804 return broker;
805 }
806
807 /**
808 * Returns the administration view of the broker; used to create and destroy
809 * resources such as queues and topics. Note this method returns null if JMX
810 * is disabled.
811 */
812 public BrokerView getAdminView() throws Exception {
813 if (adminView == null) {
814 // force lazy creation
815 getBroker();
816 }
817 return adminView;
818 }
819
820 public void setAdminView(BrokerView adminView) {
821 this.adminView = adminView;
822 }
823
824 public String getBrokerName() {
825 return brokerName;
826 }
827
828 /**
829 * Sets the name of this broker; which must be unique in the network
830 *
831 * @param brokerName
832 */
833 public void setBrokerName(String brokerName) {
834 if (brokerName == null) {
835 throw new NullPointerException("The broker name cannot be null");
836 }
837 String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
838 if (!str.equals(brokerName)) {
839 LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
840 }
841 this.brokerName = str.trim();
842 }
843
844 public PersistenceAdapterFactory getPersistenceFactory() {
845 return persistenceFactory;
846 }
847
848 public File getDataDirectoryFile() {
849 if (dataDirectoryFile == null) {
850 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
851 }
852 return dataDirectoryFile;
853 }
854
855 public File getBrokerDataDirectory() {
856 String brokerDir = getBrokerName();
857 return new File(getDataDirectoryFile(), brokerDir);
858 }
859
860 /**
861 * Sets the directory in which the data files will be stored by default for
862 * the JDBC and Journal persistence adaptors.
863 *
864 * @param dataDirectory
865 * the directory to store data files
866 */
867 public void setDataDirectory(String dataDirectory) {
868 setDataDirectoryFile(new File(dataDirectory));
869 }
870
871 /**
872 * Sets the directory in which the data files will be stored by default for
873 * the JDBC and Journal persistence adaptors.
874 *
875 * @param dataDirectoryFile
876 * the directory to store data files
877 */
878 public void setDataDirectoryFile(File dataDirectoryFile) {
879 this.dataDirectoryFile = dataDirectoryFile;
880 }
881
882 /**
883 * @return the tmpDataDirectory
884 */
885 public File getTmpDataDirectory() {
886 if (tmpDataDirectory == null) {
887 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
888 }
889 return tmpDataDirectory;
890 }
891
892 /**
893 * @param tmpDataDirectory
894 * the tmpDataDirectory to set
895 */
896 public void setTmpDataDirectory(File tmpDataDirectory) {
897 this.tmpDataDirectory = tmpDataDirectory;
898 }
899
900 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
901 this.persistenceFactory = persistenceFactory;
902 }
903
904 public void setDestinationFactory(DestinationFactory destinationFactory) {
905 this.destinationFactory = destinationFactory;
906 }
907
908 public boolean isPersistent() {
909 return persistent;
910 }
911
912 /**
913 * Sets whether or not persistence is enabled or disabled.
914 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
915 */
916 public void setPersistent(boolean persistent) {
917 this.persistent = persistent;
918 }
919
920 public boolean isPopulateJMSXUserID() {
921 return populateJMSXUserID;
922 }
923
924 /**
925 * Sets whether or not the broker should populate the JMSXUserID header.
926 */
927 public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
928 this.populateJMSXUserID = populateJMSXUserID;
929 }
930
931 public SystemUsage getSystemUsage() {
932 try {
933 if (systemUsage == null) {
934 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
935 systemUsage.setExecutor(getExecutor());
936 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
937 // 64
938 // Meg
939 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1000 * 50); // 50
940 // Gb
941 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1000 * 100); // 100
942 // GB
943 addService(this.systemUsage);
944 }
945 return systemUsage;
946 } catch (IOException e) {
947 LOG.error("Cannot create SystemUsage", e);
948 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
949 }
950 }
951
952 public void setSystemUsage(SystemUsage memoryManager) {
953 if (this.systemUsage != null) {
954 removeService(this.systemUsage);
955 }
956 this.systemUsage = memoryManager;
957 if (this.systemUsage.getExecutor()==null) {
958 this.systemUsage.setExecutor(getExecutor());
959 }
960 addService(this.systemUsage);
961 }
962
963 /**
964 * @return the consumerUsageManager
965 * @throws IOException
966 */
967 public SystemUsage getConsumerSystemUsage() throws IOException {
968 if (this.consumerSystemUsaage == null) {
969 if (splitSystemUsageForProducersConsumers) {
970 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
971 float portion = consumerSystemUsagePortion / 100f;
972 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
973 addService(this.consumerSystemUsaage);
974 } else {
975 consumerSystemUsaage = getSystemUsage();
976 }
977 }
978 return this.consumerSystemUsaage;
979 }
980
981 /**
982 * @param consumerSystemUsaage
983 * the storeSystemUsage to set
984 */
985 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
986 if (this.consumerSystemUsaage != null) {
987 removeService(this.consumerSystemUsaage);
988 }
989 this.consumerSystemUsaage = consumerSystemUsaage;
990 addService(this.consumerSystemUsaage);
991 }
992
993 /**
994 * @return the producerUsageManager
995 * @throws IOException
996 */
997 public SystemUsage getProducerSystemUsage() throws IOException {
998 if (producerSystemUsage == null) {
999 if (splitSystemUsageForProducersConsumers) {
1000 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1001 float portion = producerSystemUsagePortion / 100f;
1002 producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1003 addService(producerSystemUsage);
1004 } else {
1005 producerSystemUsage = getSystemUsage();
1006 }
1007 }
1008 return producerSystemUsage;
1009 }
1010
1011 /**
1012 * @param producerUsageManager
1013 * the producerUsageManager to set
1014 */
1015 public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1016 if (this.producerSystemUsage != null) {
1017 removeService(this.producerSystemUsage);
1018 }
1019 this.producerSystemUsage = producerUsageManager;
1020 addService(this.producerSystemUsage);
1021 }
1022
1023 public PersistenceAdapter getPersistenceAdapter() throws IOException {
1024 if (persistenceAdapter == null) {
1025 persistenceAdapter = createPersistenceAdapter();
1026 configureService(persistenceAdapter);
1027 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1028 }
1029 return persistenceAdapter;
1030 }
1031
1032 /**
1033 * Sets the persistence adaptor implementation to use for this broker
1034 *
1035 * @throws IOException
1036 */
1037 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1038 this.persistenceAdapter = persistenceAdapter;
1039 configureService(this.persistenceAdapter);
1040 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1041 }
1042
1043 public TaskRunnerFactory getTaskRunnerFactory() {
1044 if (this.taskRunnerFactory == null) {
1045 this.taskRunnerFactory = new TaskRunnerFactory("BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1046 isDedicatedTaskRunner());
1047 }
1048 return this.taskRunnerFactory;
1049 }
1050
1051 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1052 this.taskRunnerFactory = taskRunnerFactory;
1053 }
1054
1055 public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1056 if (taskRunnerFactory == null) {
1057 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1058 true, 1000, isDedicatedTaskRunner());
1059 }
1060 return persistenceTaskRunnerFactory;
1061 }
1062
1063 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1064 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1065 }
1066
1067 public boolean isUseJmx() {
1068 return useJmx;
1069 }
1070
1071 public boolean isEnableStatistics() {
1072 return enableStatistics;
1073 }
1074
1075 /**
1076 * Sets whether or not the Broker's services enable statistics or not.
1077 */
1078 public void setEnableStatistics(boolean enableStatistics) {
1079 this.enableStatistics = enableStatistics;
1080 }
1081
1082 /**
1083 * Sets whether or not the Broker's services should be exposed into JMX or
1084 * not.
1085 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1086 */
1087 public void setUseJmx(boolean useJmx) {
1088 this.useJmx = useJmx;
1089 }
1090
1091 public ObjectName getBrokerObjectName() throws IOException {
1092 if (brokerObjectName == null) {
1093 brokerObjectName = createBrokerObjectName();
1094 }
1095 return brokerObjectName;
1096 }
1097
1098 /**
1099 * Sets the JMX ObjectName for this broker
1100 */
1101 public void setBrokerObjectName(ObjectName brokerObjectName) {
1102 this.brokerObjectName = brokerObjectName;
1103 }
1104
1105 public ManagementContext getManagementContext() {
1106 if (managementContext == null) {
1107 managementContext = new ManagementContext();
1108 }
1109 return managementContext;
1110 }
1111
1112 public void setManagementContext(ManagementContext managementContext) {
1113 this.managementContext = managementContext;
1114 }
1115
1116 public NetworkConnector getNetworkConnectorByName(String connectorName) {
1117 for (NetworkConnector connector : networkConnectors) {
1118 if (connector.getName().equals(connectorName)) {
1119 return connector;
1120 }
1121 }
1122 return null;
1123 }
1124
1125 public String[] getNetworkConnectorURIs() {
1126 return networkConnectorURIs;
1127 }
1128
1129 public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1130 this.networkConnectorURIs = networkConnectorURIs;
1131 }
1132
1133 public TransportConnector getConnectorByName(String connectorName) {
1134 for (TransportConnector connector : transportConnectors) {
1135 if (connector.getName().equals(connectorName)) {
1136 return connector;
1137 }
1138 }
1139 return null;
1140 }
1141
1142 public Map<String, String> getTransportConnectorURIsAsMap() {
1143 Map<String, String> answer = new HashMap<String, String>();
1144 for (TransportConnector connector : transportConnectors) {
1145 try {
1146 URI uri = connector.getConnectUri();
1147 if (uri != null) {
1148 String scheme = uri.getScheme();
1149 if (scheme != null) {
1150 answer.put(scheme.toLowerCase(), uri.toString());
1151 }
1152 }
1153 } catch (Exception e) {
1154 LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1155 }
1156 }
1157 return answer;
1158 }
1159
1160 public String[] getTransportConnectorURIs() {
1161 return transportConnectorURIs;
1162 }
1163
1164 public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1165 this.transportConnectorURIs = transportConnectorURIs;
1166 }
1167
1168 /**
1169 * @return Returns the jmsBridgeConnectors.
1170 */
1171 public JmsConnector[] getJmsBridgeConnectors() {
1172 return jmsBridgeConnectors;
1173 }
1174
1175 /**
1176 * @param jmsConnectors
1177 * The jmsBridgeConnectors to set.
1178 */
1179 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1180 this.jmsBridgeConnectors = jmsConnectors;
1181 }
1182
1183 public Service[] getServices() {
1184 return services.toArray(new Service[0]);
1185 }
1186
1187 /**
1188 * Sets the services associated with this broker such as a
1189 * {@link MasterConnector}
1190 */
1191 public void setServices(Service[] services) {
1192 this.services.clear();
1193 if (services != null) {
1194 for (int i = 0; i < services.length; i++) {
1195 this.services.add(services[i]);
1196 }
1197 }
1198 }
1199
1200 /**
1201 * Adds a new service so that it will be started as part of the broker
1202 * lifecycle
1203 */
1204 public void addService(Service service) {
1205 services.add(service);
1206 }
1207
1208 public void removeService(Service service) {
1209 services.remove(service);
1210 }
1211
1212 public boolean isUseLoggingForShutdownErrors() {
1213 return useLoggingForShutdownErrors;
1214 }
1215
1216 /**
1217 * Sets whether or not we should use commons-logging when reporting errors
1218 * when shutting down the broker
1219 */
1220 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1221 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1222 }
1223
1224 public boolean isUseShutdownHook() {
1225 return useShutdownHook;
1226 }
1227
1228 /**
1229 * Sets whether or not we should use a shutdown handler to close down the
1230 * broker cleanly if the JVM is terminated. It is recommended you leave this
1231 * enabled.
1232 */
1233 public void setUseShutdownHook(boolean useShutdownHook) {
1234 this.useShutdownHook = useShutdownHook;
1235 }
1236
1237 public boolean isAdvisorySupport() {
1238 return advisorySupport;
1239 }
1240
1241 /**
1242 * Allows the support of advisory messages to be disabled for performance
1243 * reasons.
1244 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1245 */
1246 public void setAdvisorySupport(boolean advisorySupport) {
1247 this.advisorySupport = advisorySupport;
1248 }
1249
1250 public List<TransportConnector> getTransportConnectors() {
1251 return new ArrayList<TransportConnector>(transportConnectors);
1252 }
1253
1254 /**
1255 * Sets the transport connectors which this broker will listen on for new
1256 * clients
1257 *
1258 * @org.apache.xbean.Property
1259 * nestedType="org.apache.activemq.broker.TransportConnector"
1260 */
1261 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1262 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1263 TransportConnector connector = iter.next();
1264 addConnector(connector);
1265 }
1266 }
1267
1268 public TransportConnector getTransportConnectorByName(String name){
1269 for (TransportConnector transportConnector:transportConnectors){
1270 if (name.equals(transportConnector.getName())){
1271 return transportConnector;
1272 }
1273 }
1274 return null;
1275 }
1276
1277 public TransportConnector getTransportConnectorByScheme(String scheme){
1278 for (TransportConnector transportConnector:transportConnectors){
1279 if (scheme.equals(transportConnector.getUri().getScheme())){
1280 return transportConnector;
1281 }
1282 }
1283 return null;
1284 }
1285
1286 public List<NetworkConnector> getNetworkConnectors() {
1287 return new ArrayList<NetworkConnector>(networkConnectors);
1288 }
1289
1290 public List<ProxyConnector> getProxyConnectors() {
1291 return new ArrayList<ProxyConnector>(proxyConnectors);
1292 }
1293
1294 /**
1295 * Sets the network connectors which this broker will use to connect to
1296 * other brokers in a federated network
1297 *
1298 * @org.apache.xbean.Property
1299 * nestedType="org.apache.activemq.network.NetworkConnector"
1300 */
1301 public void setNetworkConnectors(List networkConnectors) throws Exception {
1302 for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1303 NetworkConnector connector = (NetworkConnector) iter.next();
1304 addNetworkConnector(connector);
1305 }
1306 }
1307
1308 /**
1309 * Sets the network connectors which this broker will use to connect to
1310 * other brokers in a federated network
1311 */
1312 public void setProxyConnectors(List proxyConnectors) throws Exception {
1313 for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1314 ProxyConnector connector = (ProxyConnector) iter.next();
1315 addProxyConnector(connector);
1316 }
1317 }
1318
1319 public PolicyMap getDestinationPolicy() {
1320 return destinationPolicy;
1321 }
1322
1323 /**
1324 * Sets the destination specific policies available either for exact
1325 * destinations or for wildcard areas of destinations.
1326 */
1327 public void setDestinationPolicy(PolicyMap policyMap) {
1328 this.destinationPolicy = policyMap;
1329 }
1330
1331 public BrokerPlugin[] getPlugins() {
1332 return plugins;
1333 }
1334
1335 /**
1336 * Sets a number of broker plugins to install such as for security
1337 * authentication or authorization
1338 */
1339 public void setPlugins(BrokerPlugin[] plugins) {
1340 this.plugins = plugins;
1341 }
1342
1343 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1344 return messageAuthorizationPolicy;
1345 }
1346
1347 /**
1348 * Sets the policy used to decide if the current connection is authorized to
1349 * consume a given message
1350 */
1351 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1352 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1353 }
1354
1355 /**
1356 * Delete all messages from the persistent store
1357 *
1358 * @throws IOException
1359 */
1360 public void deleteAllMessages() throws IOException {
1361 getPersistenceAdapter().deleteAllMessages();
1362 }
1363
1364 public boolean isDeleteAllMessagesOnStartup() {
1365 return deleteAllMessagesOnStartup;
1366 }
1367
1368 /**
1369 * Sets whether or not all messages are deleted on startup - mostly only
1370 * useful for testing.
1371 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1372 */
1373 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1374 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1375 }
1376
1377 public URI getVmConnectorURI() {
1378 if (vmConnectorURI == null) {
1379 try {
1380 vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1381 } catch (URISyntaxException e) {
1382 LOG.error("Badly formed URI from " + getBrokerName(), e);
1383 }
1384 }
1385 return vmConnectorURI;
1386 }
1387
1388 public void setVmConnectorURI(URI vmConnectorURI) {
1389 this.vmConnectorURI = vmConnectorURI;
1390 }
1391
1392 public String getDefaultSocketURIString() {
1393
1394 if (started.get()) {
1395 if (this.defaultSocketURIString == null) {
1396 for (TransportConnector tc:this.transportConnectors) {
1397 String result = null;
1398 try {
1399 result = tc.getPublishableConnectString();
1400 } catch (Exception e) {
1401 LOG.warn("Failed to get the ConnectURI for "+tc,e);
1402 }
1403 if (result != null) {
1404 // find first publishable uri
1405 if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1406 this.defaultSocketURIString = result;
1407 break;
1408 } else {
1409 // or use the first defined
1410 if (this.defaultSocketURIString == null) {
1411 this.defaultSocketURIString = result;
1412 }
1413 }
1414 }
1415 }
1416
1417 }
1418 return this.defaultSocketURIString;
1419 }
1420 return null;
1421 }
1422
1423 /**
1424 * @return Returns the shutdownOnMasterFailure.
1425 */
1426 public boolean isShutdownOnMasterFailure() {
1427 return shutdownOnMasterFailure;
1428 }
1429
1430 /**
1431 * @param shutdownOnMasterFailure
1432 * The shutdownOnMasterFailure to set.
1433 */
1434 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1435 this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1436 }
1437
1438 public boolean isKeepDurableSubsActive() {
1439 return keepDurableSubsActive;
1440 }
1441
1442 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1443 this.keepDurableSubsActive = keepDurableSubsActive;
1444 }
1445
1446 public boolean isUseVirtualTopics() {
1447 return useVirtualTopics;
1448 }
1449
1450 /**
1451 * Sets whether or not <a
1452 * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1453 * Topics</a> should be supported by default if they have not been
1454 * explicitly configured.
1455 */
1456 public void setUseVirtualTopics(boolean useVirtualTopics) {
1457 this.useVirtualTopics = useVirtualTopics;
1458 }
1459
1460 public DestinationInterceptor[] getDestinationInterceptors() {
1461 return destinationInterceptors;
1462 }
1463
1464 public boolean isUseMirroredQueues() {
1465 return useMirroredQueues;
1466 }
1467
1468 /**
1469 * Sets whether or not <a
1470 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1471 * Queues</a> should be supported by default if they have not been
1472 * explicitly configured.
1473 */
1474 public void setUseMirroredQueues(boolean useMirroredQueues) {
1475 this.useMirroredQueues = useMirroredQueues;
1476 }
1477
1478 /**
1479 * Sets the destination interceptors to use
1480 */
1481 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1482 this.destinationInterceptors = destinationInterceptors;
1483 }
1484
1485 public ActiveMQDestination[] getDestinations() {
1486 return destinations;
1487 }
1488
1489 /**
1490 * Sets the destinations which should be loaded/created on startup
1491 */
1492 public void setDestinations(ActiveMQDestination[] destinations) {
1493 this.destinations = destinations;
1494 }
1495
1496 /**
1497 * @return the tempDataStore
1498 */
1499 public synchronized PListStore getTempDataStore() {
1500 if (tempDataStore == null) {
1501 if (!isPersistent()) {
1502 return null;
1503 }
1504 boolean result = true;
1505 boolean empty = true;
1506 try {
1507 File directory = getTmpDataDirectory();
1508 if (directory.exists() && directory.isDirectory()) {
1509 File[] files = directory.listFiles();
1510 if (files != null && files.length > 0) {
1511 empty = false;
1512 for (int i = 0; i < files.length; i++) {
1513 File file = files[i];
1514 if (!file.isDirectory()) {
1515 result &= file.delete();
1516 }
1517 }
1518 }
1519 }
1520 if (!empty) {
1521 String str = result ? "Successfully deleted" : "Failed to delete";
1522 LOG.info(str + " temporary storage");
1523 }
1524 this.tempDataStore = new PListStore();
1525 this.tempDataStore.setDirectory(getTmpDataDirectory());
1526 configureService(tempDataStore);
1527 this.tempDataStore.start();
1528 } catch (Exception e) {
1529 throw new RuntimeException(e);
1530 }
1531 }
1532 return tempDataStore;
1533 }
1534
1535 /**
1536 * @param tempDataStore
1537 * the tempDataStore to set
1538 */
1539 public void setTempDataStore(PListStore tempDataStore) {
1540 this.tempDataStore = tempDataStore;
1541 configureService(tempDataStore);
1542 try {
1543 tempDataStore.start();
1544 } catch (Exception e) {
1545 RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
1546 LOG.error(exception.getLocalizedMessage(), e);
1547 throw exception;
1548 }
1549 }
1550
1551 public int getPersistenceThreadPriority() {
1552 return persistenceThreadPriority;
1553 }
1554
1555 public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1556 this.persistenceThreadPriority = persistenceThreadPriority;
1557 }
1558
1559 /**
1560 * @return the useLocalHostBrokerName
1561 */
1562 public boolean isUseLocalHostBrokerName() {
1563 return this.useLocalHostBrokerName;
1564 }
1565
1566 /**
1567 * @param useLocalHostBrokerName
1568 * the useLocalHostBrokerName to set
1569 */
1570 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1571 this.useLocalHostBrokerName = useLocalHostBrokerName;
1572 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1573 brokerName = LOCAL_HOST_NAME;
1574 }
1575 }
1576
1577 /**
1578 * @return the supportFailOver
1579 */
1580 public boolean isSupportFailOver() {
1581 return this.supportFailOver;
1582 }
1583
1584 /**
1585 * @param supportFailOver
1586 * the supportFailOver to set
1587 */
1588 public void setSupportFailOver(boolean supportFailOver) {
1589 this.supportFailOver = supportFailOver;
1590 }
1591
1592 /**
1593 * Looks up and lazily creates if necessary the destination for the given
1594 * JMS name
1595 */
1596 public Destination getDestination(ActiveMQDestination destination) throws Exception {
1597 return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1598 }
1599
1600 public void removeDestination(ActiveMQDestination destination) throws Exception {
1601 getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1602 }
1603
1604 public int getProducerSystemUsagePortion() {
1605 return producerSystemUsagePortion;
1606 }
1607
1608 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1609 this.producerSystemUsagePortion = producerSystemUsagePortion;
1610 }
1611
1612 public int getConsumerSystemUsagePortion() {
1613 return consumerSystemUsagePortion;
1614 }
1615
1616 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1617 this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1618 }
1619
1620 public boolean isSplitSystemUsageForProducersConsumers() {
1621 return splitSystemUsageForProducersConsumers;
1622 }
1623
1624 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1625 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1626 }
1627
1628 public boolean isMonitorConnectionSplits() {
1629 return monitorConnectionSplits;
1630 }
1631
1632 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1633 this.monitorConnectionSplits = monitorConnectionSplits;
1634 }
1635
1636 public int getTaskRunnerPriority() {
1637 return taskRunnerPriority;
1638 }
1639
1640 public void setTaskRunnerPriority(int taskRunnerPriority) {
1641 this.taskRunnerPriority = taskRunnerPriority;
1642 }
1643
1644 public boolean isDedicatedTaskRunner() {
1645 return dedicatedTaskRunner;
1646 }
1647
1648 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1649 this.dedicatedTaskRunner = dedicatedTaskRunner;
1650 }
1651
1652 public boolean isCacheTempDestinations() {
1653 return cacheTempDestinations;
1654 }
1655
1656 public void setCacheTempDestinations(boolean cacheTempDestinations) {
1657 this.cacheTempDestinations = cacheTempDestinations;
1658 }
1659
1660 public int getTimeBeforePurgeTempDestinations() {
1661 return timeBeforePurgeTempDestinations;
1662 }
1663
1664 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1665 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1666 }
1667
1668 public boolean isUseTempMirroredQueues() {
1669 return useTempMirroredQueues;
1670 }
1671
1672 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1673 this.useTempMirroredQueues = useTempMirroredQueues;
1674 }
1675
1676 //
1677 // Implementation methods
1678 // -------------------------------------------------------------------------
1679 /**
1680 * Handles any lazy-creation helper properties which are added to make
1681 * things easier to configure inside environments such as Spring
1682 *
1683 * @throws Exception
1684 */
1685 protected void processHelperProperties() throws Exception {
1686 boolean masterServiceExists = false;
1687 if (transportConnectorURIs != null) {
1688 for (int i = 0; i < transportConnectorURIs.length; i++) {
1689 String uri = transportConnectorURIs[i];
1690 addConnector(uri);
1691 }
1692 }
1693 if (networkConnectorURIs != null) {
1694 for (int i = 0; i < networkConnectorURIs.length; i++) {
1695 String uri = networkConnectorURIs[i];
1696 addNetworkConnector(uri);
1697 }
1698 }
1699 if (jmsBridgeConnectors != null) {
1700 for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1701 addJmsConnector(jmsBridgeConnectors[i]);
1702 }
1703 }
1704 for (Service service : services) {
1705 if (service instanceof MasterConnector) {
1706 masterServiceExists = true;
1707 break;
1708 }
1709 }
1710 if (masterConnectorURI != null) {
1711 if (masterServiceExists) {
1712 throw new IllegalStateException(
1713 "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1714 } else {
1715 addService(new MasterConnector(masterConnectorURI));
1716 }
1717 }
1718 }
1719
1720 protected void checkSystemUsageLimits() throws IOException {
1721 SystemUsage usage = getSystemUsage();
1722 long memLimit = usage.getMemoryUsage().getLimit();
1723 long jvmLimit = Runtime.getRuntime().maxMemory();
1724
1725 if (memLimit > jvmLimit) {
1726 LOG.error("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
1727 " mb) is more than the maximum available for the JVM: " +
1728 jvmLimit / (1024 * 1024) + " mb");
1729 }
1730
1731 if (getPersistenceAdapter() != null) {
1732 PersistenceAdapter adapter = getPersistenceAdapter();
1733 File dir = adapter.getDirectory();
1734
1735 if (dir != null) {
1736 String dirPath = dir.getAbsolutePath();
1737 if (!dir.isAbsolute()) {
1738 dir = new File(dirPath);
1739 }
1740
1741 while (dir != null && dir.isDirectory() == false) {
1742 dir = dir.getParentFile();
1743 }
1744 long storeLimit = usage.getStoreUsage().getLimit();
1745 long dirFreeSpace = dir.getUsableSpace();
1746 if (storeLimit > dirFreeSpace) {
1747 LOG.warn("Store limit is " + storeLimit / (1024 * 1024) +
1748 " mb, whilst the data directory: " + dir.getAbsolutePath() +
1749 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1750 }
1751 }
1752
1753 long maxJournalFileSize = 0;
1754 long storeLimit = usage.getStoreUsage().getLimit();
1755
1756 if (adapter instanceof KahaDBPersistenceAdapter) {
1757 KahaDBPersistenceAdapter kahaDB = (KahaDBPersistenceAdapter) adapter;
1758 maxJournalFileSize = kahaDB.getJournalMaxFileLength();
1759 } else if (adapter instanceof AMQPersistenceAdapter) {
1760 AMQPersistenceAdapter amqAdapter = (AMQPersistenceAdapter) adapter;
1761 maxJournalFileSize = amqAdapter.getMaxFileLength();
1762 }
1763
1764 if (storeLimit < maxJournalFileSize) {
1765 LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
1766 " mb, whilst the max journal file size for the store is: " +
1767 maxJournalFileSize / (1024 * 1024) + " mb, " +
1768 "the store will not accept any data when used.");
1769 }
1770 }
1771
1772 File tmpDir = getTmpDataDirectory();
1773 if (tmpDir != null) {
1774
1775 String tmpDirPath = tmpDir.getAbsolutePath();
1776 if (!tmpDir.isAbsolute()) {
1777 tmpDir = new File(tmpDirPath);
1778 }
1779
1780 long storeLimit = usage.getTempUsage().getLimit();
1781 while (tmpDir != null && tmpDir.isDirectory() == false) {
1782 tmpDir = tmpDir.getParentFile();
1783 }
1784 long dirFreeSpace = tmpDir.getUsableSpace();
1785 if (storeLimit > dirFreeSpace) {
1786 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1787 " mb, whilst the temporary data directory: " + tmpDirPath +
1788 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1789 }
1790
1791 long maxJournalFileSize;
1792
1793 if (usage.getTempUsage().getStore() != null) {
1794 maxJournalFileSize = usage.getTempUsage().getStore().getJournalMaxFileLength();
1795 } else {
1796 maxJournalFileSize = org.apache.kahadb.journal.Journal.DEFAULT_MAX_FILE_LENGTH;
1797 }
1798
1799 if (storeLimit < maxJournalFileSize) {
1800 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1801 " mb, whilst the max journal file size for the temporary store is: " +
1802 maxJournalFileSize / (1024 * 1024) + " mb, " +
1803 "the temp store will not accept any data when used.");
1804 }
1805 }
1806 }
1807
1808 public void stopAllConnectors(ServiceStopper stopper) {
1809 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1810 NetworkConnector connector = iter.next();
1811 unregisterNetworkConnectorMBean(connector);
1812 stopper.stop(connector);
1813 }
1814 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1815 ProxyConnector connector = iter.next();
1816 stopper.stop(connector);
1817 }
1818 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1819 JmsConnector connector = iter.next();
1820 stopper.stop(connector);
1821 }
1822 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1823 TransportConnector connector = iter.next();
1824 stopper.stop(connector);
1825 }
1826 }
1827
1828 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1829 try {
1830 ObjectName objectName = createConnectorObjectName(connector);
1831 connector = connector.asManagedConnector(getManagementContext(), objectName);
1832 ConnectorViewMBean view = new ConnectorView(connector);
1833 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1834 return connector;
1835 } catch (Throwable e) {
1836 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1837 }
1838 }
1839
1840 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1841 if (isUseJmx()) {
1842 try {
1843 ObjectName objectName = createConnectorObjectName(connector);
1844 getManagementContext().unregisterMBean(objectName);
1845 } catch (Throwable e) {
1846 throw IOExceptionSupport.create(
1847 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1848 }
1849 }
1850 }
1851
1852 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1853 return adaptor;
1854 }
1855
1856 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1857 if (isUseJmx()) {
1858 }
1859 }
1860
1861 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1862 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1863 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
1864 + JMXSupport.encodeObjectNamePart(connector.getName()));
1865 }
1866
1867 protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1868 NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1869 try {
1870 ObjectName objectName = createNetworkConnectorObjectName(connector);
1871 connector.setObjectName(objectName);
1872 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1873 } catch (Throwable e) {
1874 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1875 }
1876 }
1877
1878 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
1879 throws MalformedObjectNameException {
1880 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1881 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1882 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1883 }
1884
1885
1886 public ObjectName createDuplexNetworkConnectorObjectName(String transport)
1887 throws MalformedObjectNameException {
1888 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1889 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1890 + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport));
1891 }
1892
1893 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1894 if (isUseJmx()) {
1895 try {
1896 ObjectName objectName = createNetworkConnectorObjectName(connector);
1897 getManagementContext().unregisterMBean(objectName);
1898 } catch (Exception e) {
1899 LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
1900 }
1901 }
1902 }
1903
1904 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
1905 ProxyConnectorView view = new ProxyConnectorView(connector);
1906 try {
1907 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1908 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
1909 + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1910 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1911 } catch (Throwable e) {
1912 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1913 }
1914 }
1915
1916 protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
1917 FTConnectorView view = new FTConnectorView(connector);
1918 try {
1919 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1920 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
1921 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1922 } catch (Throwable e) {
1923 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1924 }
1925 }
1926
1927 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
1928 JmsConnectorView view = new JmsConnectorView(connector);
1929 try {
1930 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1931 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
1932 + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1933 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1934 } catch (Throwable e) {
1935 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1936 }
1937 }
1938
1939 /**
1940 * Factory method to create a new broker
1941 *
1942 * @throws Exception
1943 * @throws
1944 * @throws
1945 */
1946 protected Broker createBroker() throws Exception {
1947 regionBroker = createRegionBroker();
1948 Broker broker = addInterceptors(regionBroker);
1949 // Add a filter that will stop access to the broker once stopped
1950 broker = new MutableBrokerFilter(broker) {
1951 Broker old;
1952
1953 @Override
1954 public void stop() throws Exception {
1955 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
1956 // Just ignore additional stop actions.
1957 @Override
1958 public void stop() throws Exception {
1959 }
1960 });
1961 old.stop();
1962 }
1963
1964 @Override
1965 public void start() throws Exception {
1966 if (forceStart && old != null) {
1967 this.next.set(old);
1968 }
1969 getNext().start();
1970 }
1971 };
1972 return broker;
1973 }
1974
1975 /**
1976 * Factory method to create the core region broker onto which interceptors
1977 * are added
1978 *
1979 * @throws Exception
1980 */
1981 protected Broker createRegionBroker() throws Exception {
1982 if (destinationInterceptors == null) {
1983 destinationInterceptors = createDefaultDestinationInterceptor();
1984 }
1985 configureServices(destinationInterceptors);
1986 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1987 if (destinationFactory == null) {
1988 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
1989 }
1990 return createRegionBroker(destinationInterceptor);
1991 }
1992
1993 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
1994 RegionBroker regionBroker;
1995 if (isUseJmx()) {
1996 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
1997 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
1998 } else {
1999 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2000 destinationInterceptor,getScheduler(),getExecutor());
2001 }
2002 destinationFactory.setRegionBroker(regionBroker);
2003 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2004 regionBroker.setBrokerName(getBrokerName());
2005 regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2006 regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2007 if (brokerId != null) {
2008 regionBroker.setBrokerId(brokerId);
2009 }
2010 return regionBroker;
2011 }
2012
2013 /**
2014 * Create the default destination interceptor
2015 */
2016 protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2017 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2018 if (isUseVirtualTopics()) {
2019 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2020 VirtualTopic virtualTopic = new VirtualTopic();
2021 virtualTopic.setName("VirtualTopic.>");
2022 VirtualDestination[] virtualDestinations = { virtualTopic };
2023 interceptor.setVirtualDestinations(virtualDestinations);
2024 answer.add(interceptor);
2025 }
2026 if (isUseMirroredQueues()) {
2027 MirroredQueue interceptor = new MirroredQueue();
2028 answer.add(interceptor);
2029 }
2030 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2031 answer.toArray(array);
2032 return array;
2033 }
2034
2035 /**
2036 * Strategy method to add interceptors to the broker
2037 *
2038 * @throws IOException
2039 */
2040 protected Broker addInterceptors(Broker broker) throws Exception {
2041 if (isSchedulerSupport()) {
2042 SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
2043 if (isUseJmx()) {
2044 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2045 try {
2046 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":"
2047 + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
2048 + "Type=jobScheduler," + "jobSchedulerName=JMS");
2049
2050 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2051 this.adminView.setJMSJobScheduler(objectName);
2052 } catch (Throwable e) {
2053 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2054 + e.getMessage(), e);
2055 }
2056
2057 }
2058 broker = sb;
2059 }
2060 if (isAdvisorySupport()) {
2061 broker = new AdvisoryBroker(broker);
2062 }
2063 broker = new CompositeDestinationBroker(broker);
2064 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2065 if (isPopulateJMSXUserID()) {
2066 UserIDBroker userIDBroker = new UserIDBroker(broker);
2067 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2068 broker = userIDBroker;
2069 }
2070 if (isMonitorConnectionSplits()) {
2071 broker = new ConnectionSplitBroker(broker);
2072 }
2073 if (plugins != null) {
2074 for (int i = 0; i < plugins.length; i++) {
2075 BrokerPlugin plugin = plugins[i];
2076 broker = plugin.installPlugin(broker);
2077 }
2078 }
2079 return broker;
2080 }
2081
2082 protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2083 if (isPersistent()) {
2084 PersistenceAdapterFactory fac = getPersistenceFactory();
2085 if (fac != null) {
2086 return fac.createPersistenceAdapter();
2087 }else {
2088 KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
2089 File dir = new File(getBrokerDataDirectory(),"KahaDB");
2090 adaptor.setDirectory(dir);
2091 return adaptor;
2092 }
2093 } else {
2094 return new MemoryPersistenceAdapter();
2095 }
2096 }
2097
2098 protected ObjectName createBrokerObjectName() throws IOException {
2099 try {
2100 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2101 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
2102 } catch (Throwable e) {
2103 throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
2104 }
2105 }
2106
2107 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2108 TransportServer transport = TransportFactory.bind(this, brokerURI);
2109 return new TransportConnector(transport);
2110 }
2111
2112 /**
2113 * Extracts the port from the options
2114 */
2115 protected Object getPort(Map options) {
2116 Object port = options.get("port");
2117 if (port == null) {
2118 port = DEFAULT_PORT;
2119 LOG.warn("No port specified so defaulting to: " + port);
2120 }
2121 return port;
2122 }
2123
2124 protected void addShutdownHook() {
2125 if (useShutdownHook) {
2126 shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2127 @Override
2128 public void run() {
2129 containerShutdown();
2130 }
2131 };
2132 Runtime.getRuntime().addShutdownHook(shutdownHook);
2133 }
2134 }
2135
2136 protected void removeShutdownHook() {
2137 if (shutdownHook != null) {
2138 try {
2139 Runtime.getRuntime().removeShutdownHook(shutdownHook);
2140 } catch (Exception e) {
2141 LOG.debug("Caught exception, must be shutting down: " + e);
2142 }
2143 }
2144 }
2145
2146 /**
2147 * Sets hooks to be executed when broker shut down
2148 *
2149 * @org.apache.xbean.Property
2150 */
2151 public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2152 for (Runnable hook : hooks) {
2153 addShutdownHook(hook);
2154 }
2155 }
2156
2157 /**
2158 * Causes a clean shutdown of the container when the VM is being shut down
2159 */
2160 protected void containerShutdown() {
2161 try {
2162 stop();
2163 } catch (IOException e) {
2164 Throwable linkedException = e.getCause();
2165 if (linkedException != null) {
2166 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2167 } else {
2168 logError("Failed to shut down: " + e, e);
2169 }
2170 if (!useLoggingForShutdownErrors) {
2171 e.printStackTrace(System.err);
2172 }
2173 } catch (Exception e) {
2174 logError("Failed to shut down: " + e, e);
2175 }
2176 }
2177
2178 protected void logError(String message, Throwable e) {
2179 if (useLoggingForShutdownErrors) {
2180 LOG.error("Failed to shut down: " + e);
2181 } else {
2182 System.err.println("Failed to shut down: " + e);
2183 }
2184 }
2185
2186 /**
2187 * Starts any configured destinations on startup
2188 */
2189 protected void startDestinations() throws Exception {
2190 if (destinations != null) {
2191 ConnectionContext adminConnectionContext = getAdminConnectionContext();
2192 for (int i = 0; i < destinations.length; i++) {
2193 ActiveMQDestination destination = destinations[i];
2194 getBroker().addDestination(adminConnectionContext, destination,true);
2195 }
2196 }
2197 if (isUseVirtualTopics()) {
2198 startVirtualConsumerDestinations();
2199 }
2200 }
2201
2202 /**
2203 * Returns the broker's administration connection context used for
2204 * configuring the broker at startup
2205 */
2206 public ConnectionContext getAdminConnectionContext() throws Exception {
2207 return BrokerSupport.getConnectionContext(getBroker());
2208 }
2209
2210 protected void waitForSlave() {
2211 try {
2212 if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
2213 throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
2214 }
2215 } catch (InterruptedException e) {
2216 LOG.error("Exception waiting for slave:" + e);
2217 }
2218 }
2219
2220 protected void slaveConnectionEstablished() {
2221 slaveStartSignal.countDown();
2222 }
2223
2224 protected void startManagementContext() throws Exception {
2225 getManagementContext().start();
2226 adminView = new BrokerView(this, null);
2227 ObjectName objectName = getBrokerObjectName();
2228 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2229 }
2230
2231 /**
2232 * Start all transport and network connections, proxies and bridges
2233 *
2234 * @throws Exception
2235 */
2236 public void startAllConnectors() throws Exception {
2237 if (!isSlave()) {
2238 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2239 List<TransportConnector> al = new ArrayList<TransportConnector>();
2240 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2241 TransportConnector connector = iter.next();
2242 connector.setBrokerService(this);
2243 al.add(startTransportConnector(connector));
2244 }
2245 if (al.size() > 0) {
2246 // let's clear the transportConnectors list and replace it with
2247 // the started transportConnector instances
2248 this.transportConnectors.clear();
2249 setTransportConnectors(al);
2250 }
2251 URI uri = getVmConnectorURI();
2252 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2253 map.put("network", "true");
2254 map.put("async", "false");
2255 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2256 if (isWaitForSlave()) {
2257 waitForSlave();
2258 }
2259 if (!stopped.get()) {
2260 ThreadPoolExecutor networkConnectorStartExecutor = null;
2261 if (isNetworkConnectorStartAsync()) {
2262 // spin up as many threads as needed
2263 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2264 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2265 new ThreadFactory() {
2266 int count=0;
2267 public Thread newThread(Runnable runnable) {
2268 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2269 thread.setDaemon(true);
2270 return thread;
2271 }
2272 });
2273 }
2274
2275 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2276 final NetworkConnector connector = iter.next();
2277 connector.setLocalUri(uri);
2278 connector.setBrokerName(getBrokerName());
2279 connector.setDurableDestinations(durableDestinations);
2280 if (getDefaultSocketURIString() != null) {
2281 connector.setBrokerURL(getDefaultSocketURIString());
2282 }
2283 if (networkConnectorStartExecutor != null) {
2284 networkConnectorStartExecutor.execute(new Runnable() {
2285 public void run() {
2286 try {
2287 LOG.info("Async start of " + connector);
2288 connector.start();
2289 } catch(Exception e) {
2290 LOG.error("Async start of network connector: " + connector + " failed", e);
2291 }
2292 }
2293 });
2294 } else {
2295 connector.start();
2296 }
2297 }
2298 if (networkConnectorStartExecutor != null) {
2299 // executor done when enqueued tasks are complete
2300 networkConnectorStartExecutor.shutdown();
2301 networkConnectorStartExecutor = null;
2302 }
2303
2304 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2305 ProxyConnector connector = iter.next();
2306 connector.start();
2307 }
2308 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2309 JmsConnector connector = iter.next();
2310 connector.start();
2311 }
2312 for (Service service : services) {
2313 configureService(service);
2314 service.start();
2315 }
2316 }
2317 }
2318 }
2319
2320 protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2321 connector.setTaskRunnerFactory(getTaskRunnerFactory());
2322 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2323 if (policy != null) {
2324 connector.setMessageAuthorizationPolicy(policy);
2325 }
2326 if (isUseJmx()) {
2327 connector = registerConnectorMBean(connector);
2328 }
2329 connector.getStatistics().setEnabled(enableStatistics);
2330 connector.start();
2331 return connector;
2332 }
2333
2334 /**
2335 * Perform any custom dependency injection
2336 */
2337 protected void configureServices(Object[] services) {
2338 for (Object service : services) {
2339 configureService(service);
2340 }
2341 }
2342
2343 /**
2344 * Perform any custom dependency injection
2345 */
2346 protected void configureService(Object service) {
2347 if (service instanceof BrokerServiceAware) {
2348 BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2349 serviceAware.setBrokerService(this);
2350 }
2351 if (masterConnector == null) {
2352 if (service instanceof MasterConnector) {
2353 masterConnector = (MasterConnector) service;
2354 supportFailOver = true;
2355 }
2356 }
2357 }
2358
2359 public void handleIOException(IOException exception) {
2360 if (ioExceptionHandler != null) {
2361 ioExceptionHandler.handle(exception);
2362 } else {
2363 LOG.info("No IOExceptionHandler registered, ignoring IO exception, " + exception, exception);
2364 }
2365 }
2366
2367 protected void startVirtualConsumerDestinations() throws Exception {
2368 ConnectionContext adminConnectionContext = getAdminConnectionContext();
2369 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2370 DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2371 if (!destinations.isEmpty()) {
2372 for (ActiveMQDestination destination : destinations) {
2373 if (filter.matches(destination) == true) {
2374 broker.addDestination(adminConnectionContext, destination, false);
2375 }
2376 }
2377 }
2378 }
2379
2380 private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2381 // created at startup, so no sync needed
2382 if (virtualConsumerDestinationFilter == null) {
2383 Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2384 for (DestinationInterceptor interceptor : destinationInterceptors) {
2385 if (interceptor instanceof VirtualDestinationInterceptor) {
2386 VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2387 for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2388 if (virtualDestination instanceof VirtualTopic) {
2389 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2390 }
2391 }
2392 }
2393 }
2394 ActiveMQQueue filter = new ActiveMQQueue();
2395 filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2396 virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2397 }
2398 return virtualConsumerDestinationFilter;
2399 }
2400
2401 protected synchronized ThreadPoolExecutor getExecutor() {
2402 if (this.executor == null) {
2403 this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2404
2405 private long i = 0;
2406
2407 @Override
2408 public Thread newThread(Runnable runnable) {
2409 this.i++;
2410 Thread thread = new Thread(runnable, "BrokerService.worker." + this.i);
2411 thread.setDaemon(true);
2412 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2413 @Override
2414 public void uncaughtException(final Thread t, final Throwable e) {
2415 LOG.error("Error in thread '{}'", t.getName(), e);
2416 }
2417 });
2418 return thread;
2419 }
2420 }, new RejectedExecutionHandler() {
2421 @Override
2422 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2423 try {
2424 executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2425 } catch (InterruptedException e) {
2426 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2427 }
2428
2429 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2430 }
2431 });
2432 }
2433 return this.executor;
2434 }
2435
2436 public synchronized Scheduler getScheduler() {
2437 if (this.scheduler==null) {
2438 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2439 try {
2440 this.scheduler.start();
2441 } catch (Exception e) {
2442 LOG.error("Failed to start Scheduler ",e);
2443 }
2444 }
2445 return this.scheduler;
2446 }
2447
2448 public Broker getRegionBroker() {
2449 return regionBroker;
2450 }
2451
2452 public void setRegionBroker(Broker regionBroker) {
2453 this.regionBroker = regionBroker;
2454 }
2455
2456 public void addShutdownHook(Runnable hook) {
2457 synchronized (shutdownHooks) {
2458 shutdownHooks.add(hook);
2459 }
2460 }
2461
2462 public void removeShutdownHook(Runnable hook) {
2463 synchronized (shutdownHooks) {
2464 shutdownHooks.remove(hook);
2465 }
2466 }
2467
2468 public boolean isSystemExitOnShutdown() {
2469 return systemExitOnShutdown;
2470 }
2471
2472 /**
2473 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2474 */
2475 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2476 this.systemExitOnShutdown = systemExitOnShutdown;
2477 }
2478
2479 public int getSystemExitOnShutdownExitCode() {
2480 return systemExitOnShutdownExitCode;
2481 }
2482
2483 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2484 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2485 }
2486
2487 public SslContext getSslContext() {
2488 return sslContext;
2489 }
2490
2491 public void setSslContext(SslContext sslContext) {
2492 this.sslContext = sslContext;
2493 }
2494
2495 public boolean isShutdownOnSlaveFailure() {
2496 return shutdownOnSlaveFailure;
2497 }
2498
2499 /**
2500 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2501 */
2502 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2503 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2504 }
2505
2506 public boolean isWaitForSlave() {
2507 return waitForSlave;
2508 }
2509
2510 /**
2511 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2512 */
2513 public void setWaitForSlave(boolean waitForSlave) {
2514 this.waitForSlave = waitForSlave;
2515 }
2516
2517 public long getWaitForSlaveTimeout() {
2518 return this.waitForSlaveTimeout;
2519 }
2520
2521 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2522 this.waitForSlaveTimeout = waitForSlaveTimeout;
2523 }
2524
2525 public CountDownLatch getSlaveStartSignal() {
2526 return slaveStartSignal;
2527 }
2528
2529 /**
2530 * Get the passiveSlave
2531 * @return the passiveSlave
2532 */
2533 public boolean isPassiveSlave() {
2534 return this.passiveSlave;
2535 }
2536
2537 /**
2538 * Set the passiveSlave
2539 * @param passiveSlave the passiveSlave to set
2540 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2541 */
2542 public void setPassiveSlave(boolean passiveSlave) {
2543 this.passiveSlave = passiveSlave;
2544 }
2545
2546 /**
2547 * override the Default IOException handler, called when persistence adapter
2548 * has experiences File or JDBC I/O Exceptions
2549 *
2550 * @param ioExceptionHandler
2551 */
2552 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2553 configureService(ioExceptionHandler);
2554 this.ioExceptionHandler = ioExceptionHandler;
2555 }
2556
2557 public IOExceptionHandler getIoExceptionHandler() {
2558 return ioExceptionHandler;
2559 }
2560
2561 /**
2562 * @return the schedulerSupport
2563 */
2564 public boolean isSchedulerSupport() {
2565 return this.schedulerSupport;
2566 }
2567
2568 /**
2569 * @param schedulerSupport the schedulerSupport to set
2570 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2571 */
2572 public void setSchedulerSupport(boolean schedulerSupport) {
2573 this.schedulerSupport = schedulerSupport;
2574 }
2575
2576 /**
2577 * @return the schedulerDirectory
2578 */
2579 public File getSchedulerDirectoryFile() {
2580 if (this.schedulerDirectoryFile == null) {
2581 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2582 }
2583 return schedulerDirectoryFile;
2584 }
2585
2586 /**
2587 * @param schedulerDirectory the schedulerDirectory to set
2588 */
2589 public void setSchedulerDirectoryFile(File schedulerDirectory) {
2590 this.schedulerDirectoryFile = schedulerDirectory;
2591 }
2592
2593 public void setSchedulerDirectory(String schedulerDirectory) {
2594 setSchedulerDirectoryFile(new File(schedulerDirectory));
2595 }
2596
2597 public int getSchedulePeriodForDestinationPurge() {
2598 return this.schedulePeriodForDestinationPurge;
2599 }
2600
2601 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2602 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2603 }
2604
2605 public int getMaxPurgedDestinationsPerSweep() {
2606 return this.maxPurgedDestinationsPerSweep;
2607 }
2608
2609 public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2610 this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2611 }
2612
2613 public BrokerContext getBrokerContext() {
2614 return brokerContext;
2615 }
2616
2617 public void setBrokerContext(BrokerContext brokerContext) {
2618 this.brokerContext = brokerContext;
2619 }
2620
2621 public void setBrokerId(String brokerId) {
2622 this.brokerId = new BrokerId(brokerId);
2623 }
2624
2625 public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2626 return useAuthenticatedPrincipalForJMSXUserID;
2627 }
2628
2629 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2630 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2631 }
2632
2633 /**
2634 * Should MBeans that support showing the Authenticated User Name information have this
2635 * value filled in or not.
2636 *
2637 * @return true if user names should be exposed in MBeans
2638 */
2639 public boolean isPopulateUserNameInMBeans() {
2640 return this.populateUserNameInMBeans;
2641 }
2642
2643 /**
2644 * Sets whether Authenticated User Name information is shown in MBeans that support this field.
2645 * @param true if MBeans should expose user name information.
2646 */
2647 public void setPopulateUserNameInMBeans(boolean value) {
2648 this.populateUserNameInMBeans = value;
2649 }
2650
2651 public boolean isNetworkConnectorStartAsync() {
2652 return networkConnectorStartAsync;
2653 }
2654
2655 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2656 this.networkConnectorStartAsync = networkConnectorStartAsync;
2657 }
2658
2659 public boolean isAllowTempAutoCreationOnSend() {
2660 return allowTempAutoCreationOnSend;
2661 }
2662
2663 /**
2664 * enable if temp destinations need to be propagated through a network when
2665 * advisorySupport==false. This is used in conjunction with the policy
2666 * gcInactiveDestinations for matching temps so they can get removed
2667 * when inactive
2668 *
2669 * @param allowTempAutoCreationOnSend
2670 */
2671 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
2672 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
2673 }
2674
2675 public int getOfflineDurableSubscriberTimeout() {
2676 return offlineDurableSubscriberTimeout;
2677 }
2678
2679 public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) {
2680 this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
2681 }
2682
2683 public int getOfflineDurableSubscriberTaskSchedule() {
2684 return offlineDurableSubscriberTaskSchedule;
2685 }
2686
2687 public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) {
2688 this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
2689 }
2690
2691 public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
2692 return isUseVirtualTopics() && destination.isQueue() &&
2693 getVirtualTopicConsumerDestinationFilter().matches(destination);
2694 }
2695 }