001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import org.apache.activemq.command.ActiveMQDestination;
020 import org.apache.activemq.command.Message;
021 import org.apache.activemq.command.ProducerInfo;
022
023 /**
024 * This broker filter handles composite destinations. If a broker operation is
025 * invoked using a composite destination, this filter repeats the operation
026 * using each destination of the composite. HRC: I think this filter is
027 * dangerous to use to with the consumer operations. Multiple Subscription
028 * objects will be associated with a single JMS consumer each having a different
029 * idea of what the current pre-fetch dispatch size is. If this is used, then
030 * the client has to expect many more messages to be dispatched than the
031 * pre-fetch setting allows.
032 *
033 *
034 */
035 public class CompositeDestinationBroker extends BrokerFilter {
036
037 public CompositeDestinationBroker(Broker next) {
038 super(next);
039 }
040
041 /**
042 * A producer may register to send to multiple destinations via a composite
043 * destination.
044 */
045 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
046 // The destination may be null.
047 ActiveMQDestination destination = info.getDestination();
048 if (destination != null && destination.isComposite()) {
049 ActiveMQDestination[] destinations = destination.getCompositeDestinations();
050 for (int i = 0; i < destinations.length; i++) {
051 ProducerInfo copy = info.copy();
052 copy.setDestination(destinations[i]);
053 next.addProducer(context, copy);
054 }
055 } else {
056 next.addProducer(context, info);
057 }
058 }
059
060 /**
061 * A producer may de-register from sending to multiple destinations via a
062 * composite destination.
063 */
064 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
065 // The destination may be null.
066 ActiveMQDestination destination = info.getDestination();
067 if (destination != null && destination.isComposite()) {
068 ActiveMQDestination[] destinations = destination.getCompositeDestinations();
069 for (int i = 0; i < destinations.length; i++) {
070 ProducerInfo copy = info.copy();
071 copy.setDestination(destinations[i]);
072 next.removeProducer(context, copy);
073 }
074 } else {
075 next.removeProducer(context, info);
076 }
077 }
078
079 /**
080 *
081 */
082 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
083 ActiveMQDestination destination = message.getDestination();
084 if (destination.isComposite()) {
085 ActiveMQDestination[] destinations = destination.getCompositeDestinations();
086 for (int i = 0; i < destinations.length; i++) {
087 if (i != 0) {
088 message = message.copy();
089 message.setMemoryUsage(null);
090 }
091 message.setOriginalDestination(destination);
092 message.setDestination(destinations[i]);
093 next.send(producerExchange, message);
094 }
095 } else {
096 next.send(producerExchange, message);
097 }
098 }
099
100 }