001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.util.ArrayList;
020 import java.util.List;
021 import org.apache.activemq.broker.region.Destination;
022 import org.apache.activemq.broker.region.Subscription;
023 import org.apache.activemq.command.ActiveMQDestination;
024 import org.apache.activemq.command.BrokerInfo;
025 import org.apache.activemq.command.ConnectionInfo;
026 import org.apache.activemq.command.ConsumerInfo;
027 import org.apache.activemq.command.Message;
028 import org.apache.activemq.command.MessageAck;
029 import org.apache.activemq.command.ProducerInfo;
030 import org.apache.activemq.command.RemoveSubscriptionInfo;
031 import org.apache.activemq.command.SessionInfo;
032 import org.apache.activemq.command.TransactionId;
033
034 /**
035 * Used to add listeners for Broker actions
036 *
037 *
038 */
039 public class BrokerBroadcaster extends BrokerFilter {
040 protected volatile Broker[] listeners = new Broker[0];
041
042 public BrokerBroadcaster(Broker next) {
043 super(next);
044 }
045
046 @Override
047 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
048 next.acknowledge(consumerExchange, ack);
049 Broker brokers[] = getListeners();
050 for (int i = 0; i < brokers.length; i++) {
051 brokers[i].acknowledge(consumerExchange, ack);
052 }
053 }
054
055 @Override
056 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
057 next.addConnection(context, info);
058 Broker brokers[] = getListeners();
059 for (int i = 0; i < brokers.length; i++) {
060 brokers[i].addConnection(context, info);
061 }
062 }
063
064 @Override
065 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
066 Subscription answer = next.addConsumer(context, info);
067 Broker brokers[] = getListeners();
068 for (int i = 0; i < brokers.length; i++) {
069 brokers[i].addConsumer(context, info);
070 }
071 return answer;
072 }
073
074 @Override
075 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
076 next.addProducer(context, info);
077 Broker brokers[] = getListeners();
078 for (int i = 0; i < brokers.length; i++) {
079 brokers[i].addProducer(context, info);
080 }
081 }
082
083 @Override
084 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
085 next.commitTransaction(context, xid, onePhase);
086 Broker brokers[] = getListeners();
087 for (int i = 0; i < brokers.length; i++) {
088 brokers[i].commitTransaction(context, xid, onePhase);
089 }
090 }
091
092 @Override
093 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
094 next.removeSubscription(context, info);
095 Broker brokers[] = getListeners();
096 for (int i = 0; i < brokers.length; i++) {
097 brokers[i].removeSubscription(context, info);
098 }
099 }
100
101 @Override
102 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
103 int result = next.prepareTransaction(context, xid);
104 Broker brokers[] = getListeners();
105 for (int i = 0; i < brokers.length; i++) {
106 // TODO decide what to do with return values
107 brokers[i].prepareTransaction(context, xid);
108 }
109 return result;
110 }
111
112 @Override
113 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
114 next.removeConnection(context, info, error);
115 Broker brokers[] = getListeners();
116 for (int i = 0; i < brokers.length; i++) {
117 brokers[i].removeConnection(context, info, error);
118 }
119 }
120
121 @Override
122 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
123 next.removeConsumer(context, info);
124 Broker brokers[] = getListeners();
125 for (int i = 0; i < brokers.length; i++) {
126 brokers[i].removeConsumer(context, info);
127 }
128 }
129
130 @Override
131 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
132 next.removeProducer(context, info);
133 Broker brokers[] = getListeners();
134 for (int i = 0; i < brokers.length; i++) {
135 brokers[i].removeProducer(context, info);
136 }
137 }
138
139 @Override
140 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
141 next.rollbackTransaction(context, xid);
142 Broker brokers[] = getListeners();
143 for (int i = 0; i < brokers.length; i++) {
144 brokers[i].rollbackTransaction(context, xid);
145 }
146 }
147
148 @Override
149 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
150 next.send(producerExchange, messageSend);
151 Broker brokers[] = getListeners();
152 for (int i = 0; i < brokers.length; i++) {
153 brokers[i].send(producerExchange, messageSend);
154 }
155 }
156
157 @Override
158 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
159 next.beginTransaction(context, xid);
160 Broker brokers[] = getListeners();
161 for (int i = 0; i < brokers.length; i++) {
162 brokers[i].beginTransaction(context, xid);
163 }
164 }
165
166 @Override
167 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
168 next.forgetTransaction(context, transactionId);
169 Broker brokers[] = getListeners();
170 for (int i = 0; i < brokers.length; i++) {
171 brokers[i].forgetTransaction(context, transactionId);
172 }
173 }
174
175 @Override
176 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
177 Destination result = next.addDestination(context, destination,createIfTemporary);
178 Broker brokers[] = getListeners();
179 for (int i = 0; i < brokers.length; i++) {
180 brokers[i].addDestination(context, destination,createIfTemporary);
181 }
182 return result;
183 }
184
185 @Override
186 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
187 next.removeDestination(context, destination, timeout);
188 Broker brokers[] = getListeners();
189 for (int i = 0; i < brokers.length; i++) {
190 brokers[i].removeDestination(context, destination, timeout);
191 }
192 }
193
194 @Override
195 public void start() throws Exception {
196 next.start();
197 Broker brokers[] = getListeners();
198 for (int i = 0; i < brokers.length; i++) {
199 brokers[i].start();
200 }
201 }
202
203 @Override
204 public void stop() throws Exception {
205 next.stop();
206 Broker brokers[] = getListeners();
207 for (int i = 0; i < brokers.length; i++) {
208 brokers[i].stop();
209 }
210 }
211
212 @Override
213 public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
214 next.addSession(context, info);
215 Broker brokers[] = getListeners();
216 for (int i = 0; i < brokers.length; i++) {
217 brokers[i].addSession(context, info);
218 }
219 }
220
221 @Override
222 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
223 next.removeSession(context, info);
224 Broker brokers[] = getListeners();
225 for (int i = 0; i < brokers.length; i++) {
226 brokers[i].removeSession(context, info);
227 }
228 }
229
230 @Override
231 public void gc() {
232 next.gc();
233 Broker brokers[] = getListeners();
234 for (int i = 0; i < brokers.length; i++) {
235 brokers[i].gc();
236 }
237 }
238
239 @Override
240 public void addBroker(Connection connection, BrokerInfo info) {
241 next.addBroker(connection, info);
242 Broker brokers[] = getListeners();
243 for (int i = 0; i < brokers.length; i++) {
244 brokers[i].addBroker(connection, info);
245 }
246 }
247
248 protected Broker[] getListeners() {
249 return listeners;
250 }
251
252 public synchronized void addListener(Broker broker) {
253 List<Broker> tmp = getListenersAsList();
254 tmp.add(broker);
255 listeners = tmp.toArray(new Broker[tmp.size()]);
256 }
257
258 public synchronized void removeListener(Broker broker) {
259 List<Broker> tmp = getListenersAsList();
260 tmp.remove(broker);
261 listeners = tmp.toArray(new Broker[tmp.size()]);
262 }
263
264 protected List<Broker> getListenersAsList() {
265 List<Broker> tmp = new ArrayList<Broker>();
266 Broker brokers[] = getListeners();
267 for (int i = 0; i < brokers.length; i++) {
268 tmp.add(brokers[i]);
269 }
270 return tmp;
271 }
272 }