001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import org.apache.activemq.broker.Broker;
020 import org.apache.activemq.broker.BrokerService;
021 import org.apache.activemq.broker.Connection;
022 import org.apache.activemq.broker.ConnectionContext;
023 import org.apache.activemq.broker.ConsumerBrokerExchange;
024 import org.apache.activemq.broker.EmptyBroker;
025 import org.apache.activemq.broker.ProducerBrokerExchange;
026 import org.apache.activemq.broker.TransportConnector;
027 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028 import org.apache.activemq.broker.region.policy.PolicyMap;
029 import org.apache.activemq.command.*;
030 import org.apache.activemq.state.ConnectionState;
031 import org.apache.activemq.store.kahadb.plist.PListStore;
032 import org.apache.activemq.thread.Scheduler;
033 import org.apache.activemq.thread.TaskRunnerFactory;
034 import org.apache.activemq.usage.SystemUsage;
035 import org.apache.activemq.util.BrokerSupport;
036 import org.apache.activemq.util.IdGenerator;
037 import org.apache.activemq.util.InetAddressUtil;
038 import org.apache.activemq.util.LongSequenceGenerator;
039 import org.apache.activemq.util.ServiceStopper;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042
043 import javax.jms.InvalidClientIDException;
044 import javax.jms.JMSException;
045 import java.io.IOException;
046 import java.net.URI;
047 import java.util.ArrayList;
048 import java.util.Collections;
049 import java.util.HashMap;
050 import java.util.List;
051 import java.util.Map;
052 import java.util.Set;
053 import java.util.concurrent.ConcurrentHashMap;
054 import java.util.concurrent.CopyOnWriteArrayList;
055 import java.util.concurrent.ThreadPoolExecutor;
056 import java.util.concurrent.locks.ReentrantReadWriteLock;
057
058 /**
059 * Routes Broker operations to the correct messaging regions for processing.
060 *
061 *
062 */
063 public class RegionBroker extends EmptyBroker {
064 public static final String ORIGINAL_EXPIRATION = "originalExpiration";
065 private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
066 private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
067
068 protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
069 protected DestinationFactory destinationFactory;
070 protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
071
072 private final Region queueRegion;
073 private final Region topicRegion;
074 private final Region tempQueueRegion;
075 private final Region tempTopicRegion;
076 protected final BrokerService brokerService;
077 private boolean started;
078 private boolean keepDurableSubsActive;
079
080 private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
081 private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
082 private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
083
084 private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
085 private BrokerId brokerId;
086 private String brokerName;
087 private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
088 private final DestinationInterceptor destinationInterceptor;
089 private ConnectionContext adminConnectionContext;
090 private final Scheduler scheduler;
091 private final ThreadPoolExecutor executor;
092 private boolean allowTempAutoCreationOnSend;
093
094 private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
095 private final Runnable purgeInactiveDestinationsTask = new Runnable() {
096 public void run() {
097 purgeInactiveDestinations();
098 }
099 };
100
101 public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
102 DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
103 this.brokerService = brokerService;
104 this.executor=executor;
105 this.scheduler = scheduler;
106 if (destinationFactory == null) {
107 throw new IllegalArgumentException("null destinationFactory");
108 }
109 this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
110 this.destinationFactory = destinationFactory;
111 queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
112 topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
113 this.destinationInterceptor = destinationInterceptor;
114 tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
115 tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
116 }
117
118 @Override
119 public Map<ActiveMQDestination, Destination> getDestinationMap() {
120 Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap());
121 answer.putAll(getTopicRegion().getDestinationMap());
122 return answer;
123 }
124
125 @Override
126 public Set <Destination> getDestinations(ActiveMQDestination destination) {
127 switch (destination.getDestinationType()) {
128 case ActiveMQDestination.QUEUE_TYPE:
129 return queueRegion.getDestinations(destination);
130 case ActiveMQDestination.TOPIC_TYPE:
131 return topicRegion.getDestinations(destination);
132 case ActiveMQDestination.TEMP_QUEUE_TYPE:
133 return tempQueueRegion.getDestinations(destination);
134 case ActiveMQDestination.TEMP_TOPIC_TYPE:
135 return tempTopicRegion.getDestinations(destination);
136 default:
137 return Collections.emptySet();
138 }
139 }
140
141 @Override
142 @SuppressWarnings("rawtypes")
143 public Broker getAdaptor(Class type) {
144 if (type.isInstance(this)) {
145 return this;
146 }
147 return null;
148 }
149
150 public Region getQueueRegion() {
151 return queueRegion;
152 }
153
154 public Region getTempQueueRegion() {
155 return tempQueueRegion;
156 }
157
158 public Region getTempTopicRegion() {
159 return tempTopicRegion;
160 }
161
162 public Region getTopicRegion() {
163 return topicRegion;
164 }
165
166 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
167 return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
168 }
169
170 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
171 return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
172 }
173
174 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
175 return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
176 }
177
178 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
179 return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
180 }
181
182 @Override
183 public void start() throws Exception {
184 ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
185 started = true;
186 queueRegion.start();
187 topicRegion.start();
188 tempQueueRegion.start();
189 tempTopicRegion.start();
190 int period = this.brokerService.getSchedulePeriodForDestinationPurge();
191 if (period > 0) {
192 this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
193 }
194 }
195
196 @Override
197 public void stop() throws Exception {
198 started = false;
199 this.scheduler.cancel(purgeInactiveDestinationsTask);
200 ServiceStopper ss = new ServiceStopper();
201 doStop(ss);
202 ss.throwFirstException();
203 // clear the state
204 clientIdSet.clear();
205 connections.clear();
206 destinations.clear();
207 brokerInfos.clear();
208 }
209
210 public PolicyMap getDestinationPolicy() {
211 return brokerService != null ? brokerService.getDestinationPolicy() : null;
212 }
213
214 @Override
215 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
216 String clientId = info.getClientId();
217 if (clientId == null) {
218 throw new InvalidClientIDException("No clientID specified for connection request");
219 }
220 synchronized (clientIdSet) {
221 ConnectionContext oldContext = clientIdSet.get(clientId);
222 if (oldContext != null) {
223 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
224 + oldContext.getConnection().getRemoteAddress());
225 } else {
226 clientIdSet.put(clientId, context);
227 }
228 }
229
230 connections.add(context.getConnection());
231 }
232
233 @Override
234 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
235 String clientId = info.getClientId();
236 if (clientId == null) {
237 throw new InvalidClientIDException("No clientID specified for connection disconnect request");
238 }
239 synchronized (clientIdSet) {
240 ConnectionContext oldValue = clientIdSet.get(clientId);
241 // we may be removing the duplicate connection, not the first
242 // connection to be created
243 // so lets check that their connection IDs are the same
244 if (oldValue == context) {
245 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
246 clientIdSet.remove(clientId);
247 }
248 }
249 }
250 connections.remove(context.getConnection());
251 }
252
253 protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
254 return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
255 }
256
257 @Override
258 public Connection[] getClients() throws Exception {
259 ArrayList<Connection> l = new ArrayList<Connection>(connections);
260 Connection rc[] = new Connection[l.size()];
261 l.toArray(rc);
262 return rc;
263 }
264
265 @Override
266 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
267
268 Destination answer;
269
270 answer = destinations.get(destination);
271 if (answer != null) {
272 return answer;
273 }
274
275 synchronized (destinations) {
276 answer = destinations.get(destination);
277 if (answer != null) {
278 return answer;
279 }
280
281 switch (destination.getDestinationType()) {
282 case ActiveMQDestination.QUEUE_TYPE:
283 answer = queueRegion.addDestination(context, destination,true);
284 break;
285 case ActiveMQDestination.TOPIC_TYPE:
286 answer = topicRegion.addDestination(context, destination,true);
287 break;
288 case ActiveMQDestination.TEMP_QUEUE_TYPE:
289 answer = tempQueueRegion.addDestination(context, destination, createIfTemp);
290 break;
291 case ActiveMQDestination.TEMP_TOPIC_TYPE:
292 answer = tempTopicRegion.addDestination(context, destination, createIfTemp);
293 break;
294 default:
295 throw createUnknownDestinationTypeException(destination);
296 }
297
298 destinations.put(destination, answer);
299 return answer;
300 }
301
302 }
303
304 @Override
305 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
306
307 if (destinations.containsKey(destination)) {
308 switch (destination.getDestinationType()) {
309 case ActiveMQDestination.QUEUE_TYPE:
310 queueRegion.removeDestination(context, destination, timeout);
311 break;
312 case ActiveMQDestination.TOPIC_TYPE:
313 topicRegion.removeDestination(context, destination, timeout);
314 break;
315 case ActiveMQDestination.TEMP_QUEUE_TYPE:
316 tempQueueRegion.removeDestination(context, destination, timeout);
317 break;
318 case ActiveMQDestination.TEMP_TOPIC_TYPE:
319 tempTopicRegion.removeDestination(context, destination, timeout);
320 break;
321 default:
322 throw createUnknownDestinationTypeException(destination);
323 }
324 destinations.remove(destination);
325
326 }
327
328 }
329
330 @Override
331 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
332 addDestination(context, info.getDestination(),true);
333
334 }
335
336 @Override
337 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
338 removeDestination(context, info.getDestination(), info.getTimeout());
339
340 }
341
342 @Override
343 public ActiveMQDestination[] getDestinations() throws Exception {
344 ArrayList<ActiveMQDestination> l;
345
346 l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
347
348 ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
349 l.toArray(rc);
350 return rc;
351 }
352
353 @Override
354 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
355 ActiveMQDestination destination = info.getDestination();
356 if (destination != null) {
357 inactiveDestinationsPurgeLock.readLock().lock();
358 try {
359 // This seems to cause the destination to be added but without
360 // advisories firing...
361 context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
362 switch (destination.getDestinationType()) {
363 case ActiveMQDestination.QUEUE_TYPE:
364 queueRegion.addProducer(context, info);
365 break;
366 case ActiveMQDestination.TOPIC_TYPE:
367 topicRegion.addProducer(context, info);
368 break;
369 case ActiveMQDestination.TEMP_QUEUE_TYPE:
370 tempQueueRegion.addProducer(context, info);
371 break;
372 case ActiveMQDestination.TEMP_TOPIC_TYPE:
373 tempTopicRegion.addProducer(context, info);
374 break;
375 }
376 } finally {
377 inactiveDestinationsPurgeLock.readLock().unlock();
378 }
379 }
380 }
381
382 @Override
383 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
384 ActiveMQDestination destination = info.getDestination();
385 if (destination != null) {
386 inactiveDestinationsPurgeLock.readLock().lock();
387 try {
388 switch (destination.getDestinationType()) {
389 case ActiveMQDestination.QUEUE_TYPE:
390 queueRegion.removeProducer(context, info);
391 break;
392 case ActiveMQDestination.TOPIC_TYPE:
393 topicRegion.removeProducer(context, info);
394 break;
395 case ActiveMQDestination.TEMP_QUEUE_TYPE:
396 tempQueueRegion.removeProducer(context, info);
397 break;
398 case ActiveMQDestination.TEMP_TOPIC_TYPE:
399 tempTopicRegion.removeProducer(context, info);
400 break;
401 }
402 } finally {
403 inactiveDestinationsPurgeLock.readLock().unlock();
404 }
405 }
406 }
407
408 @Override
409 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
410 ActiveMQDestination destination = info.getDestination();
411 if (destinationInterceptor != null) {
412 destinationInterceptor.create(this, context, destination);
413 }
414 inactiveDestinationsPurgeLock.readLock().lock();
415 try {
416 switch (destination.getDestinationType()) {
417 case ActiveMQDestination.QUEUE_TYPE:
418 return queueRegion.addConsumer(context, info);
419
420 case ActiveMQDestination.TOPIC_TYPE:
421 return topicRegion.addConsumer(context, info);
422
423 case ActiveMQDestination.TEMP_QUEUE_TYPE:
424 return tempQueueRegion.addConsumer(context, info);
425
426 case ActiveMQDestination.TEMP_TOPIC_TYPE:
427 return tempTopicRegion.addConsumer(context, info);
428
429 default:
430 throw createUnknownDestinationTypeException(destination);
431 }
432 } finally {
433 inactiveDestinationsPurgeLock.readLock().unlock();
434 }
435 }
436
437 @Override
438 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
439 ActiveMQDestination destination = info.getDestination();
440 inactiveDestinationsPurgeLock.readLock().lock();
441 try {
442 switch (destination.getDestinationType()) {
443
444 case ActiveMQDestination.QUEUE_TYPE:
445 queueRegion.removeConsumer(context, info);
446 break;
447 case ActiveMQDestination.TOPIC_TYPE:
448 topicRegion.removeConsumer(context, info);
449 break;
450 case ActiveMQDestination.TEMP_QUEUE_TYPE:
451 tempQueueRegion.removeConsumer(context, info);
452 break;
453 case ActiveMQDestination.TEMP_TOPIC_TYPE:
454 tempTopicRegion.removeConsumer(context, info);
455 break;
456 default:
457 throw createUnknownDestinationTypeException(destination);
458 }
459 } finally {
460 inactiveDestinationsPurgeLock.readLock().unlock();
461 }
462 }
463
464 @Override
465 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
466 inactiveDestinationsPurgeLock.readLock().lock();
467 try {
468 topicRegion.removeSubscription(context, info);
469 } finally {
470 inactiveDestinationsPurgeLock.readLock().unlock();
471 }
472 }
473
474 @Override
475 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
476 message.setBrokerInTime(System.currentTimeMillis());
477 if (producerExchange.isMutable() || producerExchange.getRegion() == null
478 || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
479 ActiveMQDestination destination = message.getDestination();
480 // ensure the destination is registered with the RegionBroker
481 producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
482 Region region;
483 switch (destination.getDestinationType()) {
484 case ActiveMQDestination.QUEUE_TYPE:
485 region = queueRegion;
486 break;
487 case ActiveMQDestination.TOPIC_TYPE:
488 region = topicRegion;
489 break;
490 case ActiveMQDestination.TEMP_QUEUE_TYPE:
491 region = tempQueueRegion;
492 break;
493 case ActiveMQDestination.TEMP_TOPIC_TYPE:
494 region = tempTopicRegion;
495 break;
496 default:
497 throw createUnknownDestinationTypeException(destination);
498 }
499 producerExchange.setRegion(region);
500 producerExchange.setRegionDestination(null);
501 }
502
503 producerExchange.getRegion().send(producerExchange, message);
504 }
505
506 @Override
507 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
508 if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
509 ActiveMQDestination destination = ack.getDestination();
510 Region region;
511 switch (destination.getDestinationType()) {
512 case ActiveMQDestination.QUEUE_TYPE:
513 region = queueRegion;
514 break;
515 case ActiveMQDestination.TOPIC_TYPE:
516 region = topicRegion;
517 break;
518 case ActiveMQDestination.TEMP_QUEUE_TYPE:
519 region = tempQueueRegion;
520 break;
521 case ActiveMQDestination.TEMP_TOPIC_TYPE:
522 region = tempTopicRegion;
523 break;
524 default:
525 throw createUnknownDestinationTypeException(destination);
526 }
527 consumerExchange.setRegion(region);
528 }
529 consumerExchange.getRegion().acknowledge(consumerExchange, ack);
530 }
531
532 @Override
533 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
534 ActiveMQDestination destination = pull.getDestination();
535 switch (destination.getDestinationType()) {
536 case ActiveMQDestination.QUEUE_TYPE:
537 return queueRegion.messagePull(context, pull);
538
539 case ActiveMQDestination.TOPIC_TYPE:
540 return topicRegion.messagePull(context, pull);
541
542 case ActiveMQDestination.TEMP_QUEUE_TYPE:
543 return tempQueueRegion.messagePull(context, pull);
544
545 case ActiveMQDestination.TEMP_TOPIC_TYPE:
546 return tempTopicRegion.messagePull(context, pull);
547 default:
548 throw createUnknownDestinationTypeException(destination);
549 }
550 }
551
552 @Override
553 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
554 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
555 }
556
557 @Override
558 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
559 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
560 }
561
562 @Override
563 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
564 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
565 }
566
567 @Override
568 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
569 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
570 }
571
572 @Override
573 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
574 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
575 }
576
577 @Override
578 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
579 throw new IllegalAccessException("Transaction operation not implemented by this broker.");
580 }
581
582 @Override
583 public void gc() {
584 queueRegion.gc();
585 topicRegion.gc();
586 }
587
588 @Override
589 public BrokerId getBrokerId() {
590 if (brokerId == null) {
591 brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
592 }
593 return brokerId;
594 }
595
596 public void setBrokerId(BrokerId brokerId) {
597 this.brokerId = brokerId;
598 }
599
600 @Override
601 public String getBrokerName() {
602 if (brokerName == null) {
603 try {
604 brokerName = InetAddressUtil.getLocalHostName().toLowerCase();
605 } catch (Exception e) {
606 brokerName = "localhost";
607 }
608 }
609 return brokerName;
610 }
611
612 public void setBrokerName(String brokerName) {
613 this.brokerName = brokerName;
614 }
615
616 public DestinationStatistics getDestinationStatistics() {
617 return destinationStatistics;
618 }
619
620 protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
621 return new JMSException("Unknown destination type: " + destination.getDestinationType());
622 }
623
624 @Override
625 public synchronized void addBroker(Connection connection, BrokerInfo info) {
626 BrokerInfo existing = brokerInfos.get(info.getBrokerId());
627 if (existing == null) {
628 existing = info.copy();
629 existing.setPeerBrokerInfos(null);
630 brokerInfos.put(info.getBrokerId(), existing);
631 }
632 existing.incrementRefCount();
633 if (LOG.isDebugEnabled()) {
634 LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
635 }
636 addBrokerInClusterUpdate(info);
637 }
638
639 @Override
640 public synchronized void removeBroker(Connection connection, BrokerInfo info) {
641 if (info != null) {
642 BrokerInfo existing = brokerInfos.get(info.getBrokerId());
643 if (existing != null && existing.decrementRefCount() == 0) {
644 brokerInfos.remove(info.getBrokerId());
645 }
646 if (LOG.isDebugEnabled()) {
647 LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
648 }
649 removeBrokerInClusterUpdate(info);
650 }
651 }
652
653 @Override
654 public synchronized BrokerInfo[] getPeerBrokerInfos() {
655 BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
656 result = brokerInfos.values().toArray(result);
657 return result;
658 }
659
660 @Override
661 public void preProcessDispatch(MessageDispatch messageDispatch) {
662 Message message = messageDispatch.getMessage();
663 if (message != null) {
664 long endTime = System.currentTimeMillis();
665 message.setBrokerOutTime(endTime);
666 if (getBrokerService().isEnableStatistics()) {
667 long totalTime = endTime - message.getBrokerInTime();
668 message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
669 }
670 }
671 }
672
673 @Override
674 public void postProcessDispatch(MessageDispatch messageDispatch) {
675 }
676
677 @Override
678 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
679 ActiveMQDestination destination = messageDispatchNotification.getDestination();
680 switch (destination.getDestinationType()) {
681 case ActiveMQDestination.QUEUE_TYPE:
682 queueRegion.processDispatchNotification(messageDispatchNotification);
683 break;
684 case ActiveMQDestination.TOPIC_TYPE:
685 topicRegion.processDispatchNotification(messageDispatchNotification);
686 break;
687 case ActiveMQDestination.TEMP_QUEUE_TYPE:
688 tempQueueRegion.processDispatchNotification(messageDispatchNotification);
689 break;
690 case ActiveMQDestination.TEMP_TOPIC_TYPE:
691 tempTopicRegion.processDispatchNotification(messageDispatchNotification);
692 break;
693 default:
694 throw createUnknownDestinationTypeException(destination);
695 }
696 }
697
698 public boolean isSlaveBroker() {
699 return brokerService.isSlave();
700 }
701
702 @Override
703 public boolean isStopped() {
704 return !started;
705 }
706
707 @Override
708 public Set<ActiveMQDestination> getDurableDestinations() {
709 return destinationFactory.getDestinations();
710 }
711
712 protected void doStop(ServiceStopper ss) {
713 ss.stop(queueRegion);
714 ss.stop(topicRegion);
715 ss.stop(tempQueueRegion);
716 ss.stop(tempTopicRegion);
717 }
718
719 public boolean isKeepDurableSubsActive() {
720 return keepDurableSubsActive;
721 }
722
723 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
724 this.keepDurableSubsActive = keepDurableSubsActive;
725 }
726
727 public DestinationInterceptor getDestinationInterceptor() {
728 return destinationInterceptor;
729 }
730
731 @Override
732 public ConnectionContext getAdminConnectionContext() {
733 return adminConnectionContext;
734 }
735
736 @Override
737 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
738 this.adminConnectionContext = adminConnectionContext;
739 }
740
741 public Map<ConnectionId, ConnectionState> getConnectionStates() {
742 return connectionStates;
743 }
744
745 @Override
746 public PListStore getTempDataStore() {
747 return brokerService.getTempDataStore();
748 }
749
750 @Override
751 public URI getVmConnectorURI() {
752 return brokerService.getVmConnectorURI();
753 }
754
755 @Override
756 public void brokerServiceStarted() {
757 }
758
759 @Override
760 public BrokerService getBrokerService() {
761 return brokerService;
762 }
763
764 @Override
765 public boolean isExpired(MessageReference messageReference) {
766 boolean expired = false;
767 if (messageReference.isExpired()) {
768 try {
769 // prevent duplicate expiry processing
770 Message message = messageReference.getMessage();
771 synchronized (message) {
772 expired = stampAsExpired(message);
773 }
774 } catch (IOException e) {
775 LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
776 }
777 }
778 return expired;
779 }
780
781 private boolean stampAsExpired(Message message) throws IOException {
782 boolean stamped=false;
783 if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
784 long expiration=message.getExpiration();
785 message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
786 stamped = true;
787 }
788 return stamped;
789 }
790
791
792 @Override
793 public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
794 if (LOG.isDebugEnabled()) {
795 LOG.debug("Message expired " + node);
796 }
797 getRoot().sendToDeadLetterQueue(context, node, subscription);
798 }
799
800 @Override
801 public void sendToDeadLetterQueue(ConnectionContext context,
802 MessageReference node, Subscription subscription){
803 try{
804 if(node!=null){
805 Message message=node.getMessage();
806 if(message!=null && node.getRegionDestination()!=null){
807 DeadLetterStrategy deadLetterStrategy=node
808 .getRegionDestination().getDeadLetterStrategy();
809 if(deadLetterStrategy!=null){
810 if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
811 // message may be inflight to other subscriptions so do not modify
812 message = message.copy();
813 stampAsExpired(message);
814 message.setExpiration(0);
815 if(!message.isPersistent()){
816 message.setPersistent(true);
817 message.setProperty("originalDeliveryMode",
818 "NON_PERSISTENT");
819 }
820 // The original destination and transaction id do
821 // not get filled when the message is first sent,
822 // it is only populated if the message is routed to
823 // another destination like the DLQ
824 ActiveMQDestination deadLetterDestination=deadLetterStrategy
825 .getDeadLetterQueueFor(message, subscription);
826 if (context.getBroker()==null) {
827 context.setBroker(getRoot());
828 }
829 BrokerSupport.resendNoCopy(context,message,
830 deadLetterDestination);
831 }
832 } else {
833 if (LOG.isDebugEnabled()) {
834 LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
835 + message.getMessageId() + ", destination: " + message.getDestination());
836 }
837 }
838 }
839 }
840 }catch(Exception e){
841 LOG.warn("Caught an exception sending to DLQ: "+node,e);
842 }
843 }
844
845 @Override
846 public Broker getRoot() {
847 try {
848 return getBrokerService().getBroker();
849 } catch (Exception e) {
850 LOG.error("Trying to get Root Broker " + e);
851 throw new RuntimeException("The broker from the BrokerService should not throw an exception");
852 }
853 }
854
855 /**
856 * @return the broker sequence id
857 */
858 @Override
859 public long getBrokerSequenceId() {
860 synchronized(sequenceGenerator) {
861 return sequenceGenerator.getNextSequenceId();
862 }
863 }
864
865
866 @Override
867 public Scheduler getScheduler() {
868 return this.scheduler;
869 }
870
871 public ThreadPoolExecutor getExecutor() {
872 return this.executor;
873 }
874
875 @Override
876 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
877 ActiveMQDestination destination = control.getDestination();
878 switch (destination.getDestinationType()) {
879 case ActiveMQDestination.QUEUE_TYPE:
880 queueRegion.processConsumerControl(consumerExchange, control);
881 break;
882
883 case ActiveMQDestination.TOPIC_TYPE:
884 topicRegion.processConsumerControl(consumerExchange, control);
885 break;
886
887 case ActiveMQDestination.TEMP_QUEUE_TYPE:
888 tempQueueRegion.processConsumerControl(consumerExchange, control);
889 break;
890
891 case ActiveMQDestination.TEMP_TOPIC_TYPE:
892 tempTopicRegion.processConsumerControl(consumerExchange, control);
893 break;
894
895 default:
896 LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control);
897 }
898 }
899
900 protected void addBrokerInClusterUpdate(BrokerInfo info) {
901 List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
902 for (TransportConnector connector : connectors) {
903 if (connector.isUpdateClusterClients()) {
904 connector.addPeerBroker(info);
905 connector.updateClientClusterInfo();
906 }
907 }
908 }
909
910 protected void removeBrokerInClusterUpdate(BrokerInfo info) {
911 List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
912 for (TransportConnector connector : connectors) {
913 if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
914 connector.removePeerBroker(info);
915 connector.updateClientClusterInfo();
916 }
917 }
918 }
919
920 protected void purgeInactiveDestinations() {
921 inactiveDestinationsPurgeLock.writeLock().lock();
922 try {
923 List<Destination> list = new ArrayList<Destination>();
924 Map<ActiveMQDestination, Destination> map = getDestinationMap();
925 if (isAllowTempAutoCreationOnSend()) {
926 map.putAll(tempQueueRegion.getDestinationMap());
927 map.putAll(tempTopicRegion.getDestinationMap());
928 }
929 long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
930 long timeStamp = System.currentTimeMillis();
931 for (Destination d : map.values()) {
932 d.markForGC(timeStamp);
933 if (d.canGC()) {
934 list.add(d);
935 if (maxPurgedDests > 0 && list.size() == maxPurgedDests) {
936 break;
937 }
938 }
939 }
940
941 if (!list.isEmpty()) {
942 ConnectionContext context = BrokerSupport.getConnectionContext(this);
943 context.setBroker(this);
944
945 for (Destination dest : list) {
946 Logger log = LOG;
947 if (dest instanceof BaseDestination) {
948 log = ((BaseDestination) dest).getLog();
949 }
950 log.info(dest.getName() + " Inactive for longer than " +
951 dest.getInactiveTimoutBeforeGC() + " ms - removing ...");
952 try {
953 getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
954 } catch (Exception e) {
955 LOG.error("Failed to remove inactive destination " + dest, e);
956 }
957 }
958 }
959 } finally {
960 inactiveDestinationsPurgeLock.writeLock().unlock();
961 }
962 }
963
964 public boolean isAllowTempAutoCreationOnSend() {
965 return allowTempAutoCreationOnSend;
966 }
967
968 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
969 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
970 }
971 }