001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.util.ArrayList;
020 import java.util.HashMap;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.Set;
025 import java.util.concurrent.ConcurrentHashMap;
026 import java.util.concurrent.locks.ReentrantReadWriteLock;
027
028 import javax.jms.JMSException;
029 import org.apache.activemq.broker.ConnectionContext;
030 import org.apache.activemq.broker.ConsumerBrokerExchange;
031 import org.apache.activemq.broker.DestinationAlreadyExistsException;
032 import org.apache.activemq.broker.ProducerBrokerExchange;
033 import org.apache.activemq.broker.TransportConnection;
034 import org.apache.activemq.command.ActiveMQDestination;
035 import org.apache.activemq.command.ConsumerControl;
036 import org.apache.activemq.command.ConsumerId;
037 import org.apache.activemq.command.ConsumerInfo;
038 import org.apache.activemq.command.DestinationInfo;
039 import org.apache.activemq.command.Message;
040 import org.apache.activemq.command.MessageAck;
041 import org.apache.activemq.command.MessageDispatchNotification;
042 import org.apache.activemq.command.MessagePull;
043 import org.apache.activemq.command.ProducerInfo;
044 import org.apache.activemq.command.RemoveSubscriptionInfo;
045 import org.apache.activemq.command.Response;
046 import org.apache.activemq.filter.DestinationFilter;
047 import org.apache.activemq.filter.DestinationMap;
048 import org.apache.activemq.security.SecurityContext;
049 import org.apache.activemq.thread.TaskRunnerFactory;
050 import org.apache.activemq.usage.SystemUsage;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 /**
055 *
056 */
057 public abstract class AbstractRegion implements Region {
058
059 private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class);
060
061 protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
062 protected final DestinationMap destinationMap = new DestinationMap();
063 protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
064 protected final SystemUsage usageManager;
065 protected final DestinationFactory destinationFactory;
066 protected final DestinationStatistics destinationStatistics;
067 protected final RegionBroker broker;
068 protected boolean autoCreateDestinations = true;
069 protected final TaskRunnerFactory taskRunnerFactory;
070 protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
071 protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
072 protected boolean started;
073
074 public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
075 TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
076 if (broker == null) {
077 throw new IllegalArgumentException("null broker");
078 }
079 this.broker = broker;
080 this.destinationStatistics = destinationStatistics;
081 this.usageManager = memoryManager;
082 this.taskRunnerFactory = taskRunnerFactory;
083 if (destinationFactory == null) {
084 throw new IllegalArgumentException("null destinationFactory");
085 }
086 this.destinationFactory = destinationFactory;
087 }
088
089 public final void start() throws Exception {
090 started = true;
091
092 Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
093 for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
094 ActiveMQDestination dest = iter.next();
095
096 ConnectionContext context = new ConnectionContext();
097 context.setBroker(broker.getBrokerService().getBroker());
098 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
099 context.getBroker().addDestination(context, dest, false);
100 }
101 destinationsLock.readLock().lock();
102 try{
103 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
104 Destination dest = i.next();
105 dest.start();
106 }
107 } finally {
108 destinationsLock.readLock().unlock();
109 }
110 }
111
112 public void stop() throws Exception {
113 started = false;
114 destinationsLock.readLock().lock();
115 try{
116 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
117 Destination dest = i.next();
118 dest.stop();
119 }
120 } finally {
121 destinationsLock.readLock().unlock();
122 }
123 destinations.clear();
124 }
125
126 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
127 boolean createIfTemporary) throws Exception {
128
129 destinationsLock.writeLock().lock();
130 try {
131 Destination dest = destinations.get(destination);
132 if (dest == null) {
133 if (destination.isTemporary() == false || createIfTemporary) {
134 if (LOG.isDebugEnabled()) {
135 LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
136 }
137 dest = createDestination(context, destination);
138 // intercept if there is a valid interceptor defined
139 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
140 if (destinationInterceptor != null) {
141 dest = destinationInterceptor.intercept(dest);
142 }
143 dest.start();
144 destinations.put(destination, dest);
145 destinationMap.put(destination, dest);
146 addSubscriptionsForDestination(context, dest);
147 }
148 if (dest == null) {
149 throw new JMSException("The destination " + destination + " does not exist.");
150 }
151 }
152 return dest;
153 } finally {
154 destinationsLock.writeLock().unlock();
155 }
156 }
157
158 public Map<ConsumerId, Subscription> getSubscriptions() {
159 return subscriptions;
160 }
161
162 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
163 throws Exception {
164
165 List<Subscription> rc = new ArrayList<Subscription>();
166 // Add all consumers that are interested in the destination.
167 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
168 Subscription sub = iter.next();
169 if (sub.matches(dest.getActiveMQDestination())) {
170 dest.addSubscription(context, sub);
171 rc.add(sub);
172 }
173 }
174 return rc;
175
176 }
177
178 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
179 throws Exception {
180
181 // No timeout.. then try to shut down right way, fails if there are
182 // current subscribers.
183 if (timeout == 0) {
184 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
185 Subscription sub = iter.next();
186 if (sub.matches(destination)) {
187 throw new JMSException("Destination still has an active subscription: " + destination);
188 }
189 }
190 }
191
192 if (timeout > 0) {
193 // TODO: implement a way to notify the subscribers that we want to
194 // take the down
195 // the destination and that they should un-subscribe.. Then wait up
196 // to timeout time before
197 // dropping the subscription.
198 }
199
200 if (LOG.isDebugEnabled()) {
201 LOG.debug(broker.getBrokerName() + " removing destination: " + destination);
202 }
203
204 destinationsLock.writeLock().lock();
205 try {
206 Destination dest = destinations.remove(destination);
207 if (dest != null) {
208 // timeout<0 or we timed out, we now force any remaining
209 // subscriptions to un-subscribe.
210 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
211 Subscription sub = iter.next();
212 if (sub.matches(destination)) {
213 dest.removeSubscription(context, sub, 0l);
214 }
215 }
216 destinationMap.removeAll(destination);
217 dispose(context, dest);
218 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
219 if (destinationInterceptor != null) {
220 destinationInterceptor.remove(dest);
221 }
222
223 } else {
224 if (LOG.isDebugEnabled()) {
225 LOG.debug("Cannot remove a destination that doesn't exist: " + destination);
226 }
227 }
228 } finally {
229 destinationsLock.writeLock().unlock();
230 }
231 }
232
233 /**
234 * Provide an exact or wildcard lookup of destinations in the region
235 *
236 * @return a set of matching destination objects.
237 */
238 @SuppressWarnings("unchecked")
239 public Set<Destination> getDestinations(ActiveMQDestination destination) {
240 destinationsLock.readLock().lock();
241 try{
242 return destinationMap.get(destination);
243 } finally {
244 destinationsLock.readLock().unlock();
245 }
246 }
247
248 public Map<ActiveMQDestination, Destination> getDestinationMap() {
249 destinationsLock.readLock().lock();
250 try{
251 return destinations;
252 } finally {
253 destinationsLock.readLock().unlock();
254 }
255 }
256
257 @SuppressWarnings("unchecked")
258 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
259 if (LOG.isDebugEnabled()) {
260 LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
261 + info.getDestination());
262 }
263 ActiveMQDestination destination = info.getDestination();
264 if (destination != null && !destination.isPattern() && !destination.isComposite()) {
265 // lets auto-create the destination
266 lookup(context, destination,true);
267 }
268
269 Object addGuard;
270 synchronized (consumerChangeMutexMap) {
271 addGuard = consumerChangeMutexMap.get(info.getConsumerId());
272 if (addGuard == null) {
273 addGuard = new Object();
274 consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
275 }
276 }
277 synchronized (addGuard) {
278 Subscription o = subscriptions.get(info.getConsumerId());
279 if (o != null) {
280 LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
281 return o;
282 }
283
284 // We may need to add some destinations that are in persistent store
285 // but not active
286 // in the broker.
287 //
288 // TODO: think about this a little more. This is good cause
289 // destinations are not loaded into
290 // memory until a client needs to use the queue, but a management
291 // agent viewing the
292 // broker will not see a destination that exists in persistent
293 // store. We may want to
294 // eagerly load all destinations into the broker but have an
295 // inactive state for the
296 // destination which has reduced memory usage.
297 //
298 DestinationFilter.parseFilter(info.getDestination());
299
300 Subscription sub = createSubscription(context, info);
301
302 subscriptions.put(info.getConsumerId(), sub);
303
304 // At this point we're done directly manipulating subscriptions,
305 // but we need to retain the synchronized block here. Consider
306 // otherwise what would happen if at this point a second
307 // thread added, then removed, as would be allowed with
308 // no mutex held. Remove is only essentially run once
309 // so everything after this point would be leaked.
310
311 // Add the subscription to all the matching queues.
312 // But copy the matches first - to prevent deadlocks
313 List<Destination> addList = new ArrayList<Destination>();
314 destinationsLock.readLock().lock();
315 try {
316 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
317 addList.add(dest);
318 }
319 } finally {
320 destinationsLock.readLock().unlock();
321 }
322
323 for (Destination dest : addList) {
324 dest.addSubscription(context, sub);
325 }
326
327 if (info.isBrowser()) {
328 ((QueueBrowserSubscription) sub).destinationsAdded();
329 }
330
331 return sub;
332 }
333 }
334
335 /**
336 * Get all the Destinations that are in storage
337 *
338 * @return Set of all stored destinations
339 */
340 @SuppressWarnings("rawtypes")
341 public Set getDurableDestinations() {
342 return destinationFactory.getDestinations();
343 }
344
345 /**
346 * @return all Destinations that don't have active consumers
347 */
348 protected Set<ActiveMQDestination> getInactiveDestinations() {
349 Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
350 destinationsLock.readLock().lock();
351 try {
352 inactiveDests.removeAll(destinations.keySet());
353 } finally {
354 destinationsLock.readLock().unlock();
355 }
356 return inactiveDests;
357 }
358
359 @SuppressWarnings("unchecked")
360 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
361 if (LOG.isDebugEnabled()) {
362 LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
363 + info.getDestination());
364 }
365
366 Subscription sub = subscriptions.remove(info.getConsumerId());
367 // The sub could be removed elsewhere - see ConnectionSplitBroker
368 if (sub != null) {
369
370 // remove the subscription from all the matching queues.
371 List<Destination> removeList = new ArrayList<Destination>();
372 destinationsLock.readLock().lock();
373 try {
374 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
375 removeList.add(dest);
376 }
377 } finally {
378 destinationsLock.readLock().unlock();
379 }
380 for (Destination dest : removeList) {
381 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
382 }
383
384 destroySubscription(sub);
385 }
386 synchronized (consumerChangeMutexMap) {
387 consumerChangeMutexMap.remove(info.getConsumerId());
388 }
389 }
390
391 protected void destroySubscription(Subscription sub) {
392 sub.destroy();
393 }
394
395 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
396 throw new JMSException("Invalid operation.");
397 }
398
399 public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
400 final ConnectionContext context = producerExchange.getConnectionContext();
401
402 if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
403 final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
404 producerExchange.setRegionDestination(regionDestination);
405 }
406
407 producerExchange.getRegionDestination().send(producerExchange, messageSend);
408 }
409
410 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
411 Subscription sub = consumerExchange.getSubscription();
412 if (sub == null) {
413 sub = subscriptions.get(ack.getConsumerId());
414 if (sub == null) {
415 if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
416 LOG.warn("Ack for non existent subscription, ack:" + ack);
417 throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
418 } else {
419 if (LOG.isDebugEnabled()) {
420 LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
421 }
422 return;
423 }
424 }
425 consumerExchange.setSubscription(sub);
426 }
427 sub.acknowledge(consumerExchange.getConnectionContext(), ack);
428 }
429
430 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
431 Subscription sub = subscriptions.get(pull.getConsumerId());
432 if (sub == null) {
433 throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
434 }
435 return sub.pullMessage(context, pull);
436 }
437
438 protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
439 Destination dest = null;
440
441 destinationsLock.readLock().lock();
442 try {
443 dest = destinations.get(destination);
444 } finally {
445 destinationsLock.readLock().unlock();
446 }
447
448 if (dest == null) {
449 if (isAutoCreateDestinations()) {
450 // Try to auto create the destination... re-invoke broker
451 // from the
452 // top so that the proper security checks are performed.
453 try {
454 context.getBroker().addDestination(context, destination, createTemporary);
455 dest = addDestination(context, destination, false);
456 } catch (DestinationAlreadyExistsException e) {
457 // if the destination already exists then lets ignore
458 // this error
459 }
460 // We should now have the dest created.
461 destinationsLock.readLock().lock();
462 try {
463 dest = destinations.get(destination);
464 } finally {
465 destinationsLock.readLock().unlock();
466 }
467 }
468
469 if (dest == null) {
470 throw new JMSException("The destination " + destination + " does not exist.");
471 }
472 }
473 return dest;
474 }
475
476 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
477 Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
478 if (sub != null) {
479 sub.processMessageDispatchNotification(messageDispatchNotification);
480 } else {
481 throw new JMSException("Slave broker out of sync with master - Subscription: "
482 + messageDispatchNotification.getConsumerId() + " on "
483 + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
484 + messageDispatchNotification.getMessageId());
485 }
486 }
487
488 /*
489 * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
490 * dispatch is deferred till the notification to ensure that the
491 * subscription chosen by the master is used. AMQ-2102
492 */
493 protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
494 throws Exception {
495 Destination dest = null;
496 destinationsLock.readLock().lock();
497 try {
498 dest = destinations.get(messageDispatchNotification.getDestination());
499 } finally {
500 destinationsLock.readLock().unlock();
501 }
502
503 if (dest != null) {
504 dest.processDispatchNotification(messageDispatchNotification);
505 } else {
506 throw new JMSException("Slave broker out of sync with master - Destination: "
507 + messageDispatchNotification.getDestination() + " does not exist for consumer "
508 + messageDispatchNotification.getConsumerId() + " with message: "
509 + messageDispatchNotification.getMessageId());
510 }
511 }
512
513 public void gc() {
514 for (Subscription sub : subscriptions.values()) {
515 sub.gc();
516 }
517
518 destinationsLock.readLock().lock();
519 try {
520 for (Destination dest : destinations.values()) {
521 dest.gc();
522 }
523 } finally {
524 destinationsLock.readLock().unlock();
525 }
526 }
527
528 protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
529
530 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
531 throws Exception {
532 return destinationFactory.createDestination(context, destination, destinationStatistics);
533 }
534
535 public boolean isAutoCreateDestinations() {
536 return autoCreateDestinations;
537 }
538
539 public void setAutoCreateDestinations(boolean autoCreateDestinations) {
540 this.autoCreateDestinations = autoCreateDestinations;
541 }
542
543 @SuppressWarnings("unchecked")
544 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
545 destinationsLock.readLock().lock();
546 try {
547 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
548 dest.addProducer(context, info);
549 }
550 } finally {
551 destinationsLock.readLock().unlock();
552 }
553 }
554
555 /**
556 * Removes a Producer.
557 *
558 * @param context
559 * the environment the operation is being executed under.
560 * @throws Exception
561 * TODO
562 */
563 @SuppressWarnings("unchecked")
564 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
565 destinationsLock.readLock().lock();
566 try {
567 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
568 dest.removeProducer(context, info);
569 }
570 } finally {
571 destinationsLock.readLock().unlock();
572 }
573 }
574
575 protected void dispose(ConnectionContext context, Destination dest) throws Exception {
576 dest.dispose(context);
577 dest.stop();
578 destinationFactory.removeDestination(dest);
579 }
580
581 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
582 Subscription sub = subscriptions.get(control.getConsumerId());
583 if (sub != null && sub instanceof AbstractSubscription) {
584 ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
585 if (LOG.isDebugEnabled()) {
586 LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: "
587 + control.getConsumerId());
588 }
589 try {
590 lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
591 } catch (Exception e) {
592 LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
593 }
594 }
595 }
596 }