001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.ft;
018
019 import java.util.Map;
020 import java.util.concurrent.ConcurrentHashMap;
021 import java.util.concurrent.atomic.AtomicBoolean;
022
023 import org.apache.activemq.broker.Connection;
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.broker.ConsumerBrokerExchange;
026 import org.apache.activemq.broker.InsertableMutableBrokerFilter;
027 import org.apache.activemq.broker.MutableBrokerFilter;
028 import org.apache.activemq.broker.ProducerBrokerExchange;
029 import org.apache.activemq.broker.region.Subscription;
030 import org.apache.activemq.command.Command;
031 import org.apache.activemq.command.ConnectionControl;
032 import org.apache.activemq.command.ConnectionInfo;
033 import org.apache.activemq.command.ConsumerId;
034 import org.apache.activemq.command.ConsumerInfo;
035 import org.apache.activemq.command.DestinationInfo;
036 import org.apache.activemq.command.ExceptionResponse;
037 import org.apache.activemq.command.Message;
038 import org.apache.activemq.command.MessageAck;
039 import org.apache.activemq.command.MessageDispatch;
040 import org.apache.activemq.command.MessageDispatchNotification;
041 import org.apache.activemq.command.ProducerInfo;
042 import org.apache.activemq.command.RemoveInfo;
043 import org.apache.activemq.command.RemoveSubscriptionInfo;
044 import org.apache.activemq.command.Response;
045 import org.apache.activemq.command.SessionInfo;
046 import org.apache.activemq.command.TransactionId;
047 import org.apache.activemq.command.TransactionInfo;
048 import org.apache.activemq.transport.MutexTransport;
049 import org.apache.activemq.transport.ResponseCorrelator;
050 import org.apache.activemq.transport.Transport;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 /**
055 * The Message Broker which passes messages to a slave
056 *
057 *
058 */
059 public class MasterBroker extends InsertableMutableBrokerFilter {
060
061 private static final Logger LOG = LoggerFactory.getLogger(MasterBroker.class);
062 private Transport slave;
063 private AtomicBoolean started = new AtomicBoolean(false);
064
065 private Map<ConsumerId, ConsumerId> consumers = new ConcurrentHashMap<ConsumerId, ConsumerId>();
066
067 /**
068 * Constructor
069 *
070 * @param parent
071 * @param transport
072 */
073 public MasterBroker(MutableBrokerFilter parent, Transport transport) {
074 super(parent);
075 this.slave = transport;
076 this.slave = new MutexTransport(slave);
077 this.slave = new ResponseCorrelator(slave);
078 this.slave.setTransportListener(transport.getTransportListener());
079 }
080
081 /**
082 * start processing this broker
083 */
084 public void startProcessing() {
085 started.set(true);
086 try {
087 Connection[] connections = getClients();
088 ConnectionControl command = new ConnectionControl();
089 command.setFaultTolerant(true);
090 if (connections != null) {
091 for (int i = 0; i < connections.length; i++) {
092 if (connections[i].isActive() && connections[i].isManageable()) {
093 connections[i].dispatchAsync(command);
094 }
095 }
096 }
097 } catch (Exception e) {
098 LOG.error("Failed to get Connections", e);
099 }
100 }
101
102 /**
103 * stop the broker
104 *
105 * @throws Exception
106 */
107 public void stop() throws Exception {
108 stopProcessing();
109 }
110
111 /**
112 * stop processing this broker
113 */
114 public void stopProcessing() {
115 if (started.compareAndSet(true, false)) {
116 remove();
117 }
118 }
119
120 /**
121 * A client is establishing a connection with the broker.
122 *
123 * @param context
124 * @param info
125 * @throws Exception
126 */
127 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
128 super.addConnection(context, info);
129 sendAsyncToSlave(info);
130 }
131
132 /**
133 * A client is disconnecting from the broker.
134 *
135 * @param context the environment the operation is being executed under.
136 * @param info
137 * @param error null if the client requested the disconnect or the error
138 * that caused the client to disconnect.
139 * @throws Exception
140 */
141 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
142 super.removeConnection(context, info, error);
143 sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
144 }
145
146 /**
147 * Adds a session.
148 *
149 * @param context
150 * @param info
151 * @throws Exception
152 */
153 public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
154 super.addSession(context, info);
155 sendAsyncToSlave(info);
156 }
157
158 /**
159 * Removes a session.
160 *
161 * @param context
162 * @param info
163 * @throws Exception
164 */
165 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
166 super.removeSession(context, info);
167 sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
168 }
169
170 /**
171 * Adds a producer.
172 *
173 * @param context the enviorment the operation is being executed under.
174 * @param info
175 * @throws Exception
176 */
177 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
178 super.addProducer(context, info);
179 sendAsyncToSlave(info);
180 }
181
182 /**
183 * Removes a producer.
184 *
185 * @param context the environment the operation is being executed under.
186 * @param info
187 * @throws Exception
188 */
189 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
190 super.removeProducer(context, info);
191 sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
192 }
193
194 /**
195 * add a consumer
196 *
197 * @param context
198 * @param info
199 * @return the associated subscription
200 * @throws Exception
201 */
202 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
203 sendSyncToSlave(info);
204 consumers.put(info.getConsumerId(), info.getConsumerId());
205 return super.addConsumer(context, info);
206 }
207
208 @Override
209 public void removeConsumer(ConnectionContext context, ConsumerInfo info)
210 throws Exception {
211 super.removeConsumer(context, info);
212 consumers.remove(info.getConsumerId());
213 sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
214 }
215
216 /**
217 * remove a subscription
218 *
219 * @param context
220 * @param info
221 * @throws Exception
222 */
223 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
224 super.removeSubscription(context, info);
225 sendAsyncToSlave(info);
226 }
227
228 @Override
229 public void addDestinationInfo(ConnectionContext context,
230 DestinationInfo info) throws Exception {
231 super.addDestinationInfo(context, info);
232 if (info.getDestination().isTemporary()) {
233 sendAsyncToSlave(info);
234 }
235 }
236
237 @Override
238 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
239 super.removeDestinationInfo(context, info);
240 if (info.getDestination().isTemporary()) {
241 sendAsyncToSlave(info);
242 }
243 }
244
245 /**
246 * begin a transaction
247 *
248 * @param context
249 * @param xid
250 * @throws Exception
251 */
252 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
253 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN);
254 sendAsyncToSlave(info);
255 super.beginTransaction(context, xid);
256 }
257
258 /**
259 * Prepares a transaction. Only valid for xa transactions.
260 *
261 * @param context
262 * @param xid
263 * @return the state
264 * @throws Exception
265 */
266 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
267 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE);
268 sendSyncToSlave(info);
269 int result = super.prepareTransaction(context, xid);
270 return result;
271 }
272
273 /**
274 * Rollsback a transaction.
275 *
276 * @param context
277 * @param xid
278 * @throws Exception
279 */
280 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
281 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK);
282 sendAsyncToSlave(info);
283 super.rollbackTransaction(context, xid);
284 }
285
286 /**
287 * Commits a transaction.
288 *
289 * @param context
290 * @param xid
291 * @param onePhase
292 * @throws Exception
293 */
294 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
295 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.COMMIT_ONE_PHASE);
296 sendSyncToSlave(info);
297 super.commitTransaction(context, xid, onePhase);
298 }
299
300 /**
301 * Forgets a transaction.
302 *
303 * @param context
304 * @param xid
305 * @throws Exception
306 */
307 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
308 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET);
309 sendAsyncToSlave(info);
310 super.forgetTransaction(context, xid);
311 }
312
313 /**
314 * Notifiy the Broker that a dispatch will happen
315 * Do in 'pre' so that slave will avoid getting ack before dispatch
316 * similar logic to send() below.
317 * @param messageDispatch
318 */
319 public void preProcessDispatch(MessageDispatch messageDispatch) {
320 super.preProcessDispatch(messageDispatch);
321 MessageDispatchNotification mdn = new MessageDispatchNotification();
322 mdn.setConsumerId(messageDispatch.getConsumerId());
323 mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
324 mdn.setDestination(messageDispatch.getDestination());
325 if (messageDispatch.getMessage() != null) {
326 Message msg = messageDispatch.getMessage();
327 mdn.setMessageId(msg.getMessageId());
328 if (consumers.containsKey(messageDispatch.getConsumerId())) {
329 sendSyncToSlave(mdn);
330 }
331 }
332 }
333
334 /**
335 * @param context
336 * @param message
337 * @throws Exception
338 */
339 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
340 /**
341 * A message can be dispatched before the super.send() method returns so -
342 * here the order is switched to avoid problems on the slave with
343 * receiving acks for messages not received yet
344 * copy ensures we don't mess with the correlator and command ids
345 */
346 sendSyncToSlave(message.copy());
347 super.send(producerExchange, message);
348 }
349
350 /**
351 * @param context
352 * @param ack
353 * @throws Exception
354 */
355 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
356 sendToSlave(ack);
357 super.acknowledge(consumerExchange, ack);
358 }
359
360 public boolean isFaultTolerantConfiguration() {
361 return true;
362 }
363
364 protected void sendToSlave(Message message) {
365 if (message.isResponseRequired()) {
366 sendSyncToSlave(message);
367 } else {
368 sendAsyncToSlave(message);
369 }
370 }
371
372 protected void sendToSlave(MessageAck ack) {
373 if (ack.isResponseRequired()) {
374 sendAsyncToSlave(ack);
375 } else {
376 sendSyncToSlave(ack);
377 }
378 }
379
380 protected void sendAsyncToSlave(Command command) {
381 try {
382 slave.oneway(command);
383 } catch (Throwable e) {
384 LOG.error("Slave Failed", e);
385 stopProcessing();
386 }
387 }
388
389 protected void sendSyncToSlave(Command command) {
390 try {
391 Response response = (Response)slave.request(command);
392 if (response.isException()) {
393 ExceptionResponse er = (ExceptionResponse)response;
394 LOG.error("Slave Failed", er.getException());
395 }
396 } catch (Throwable e) {
397 LOG.error("Slave Failed", e);
398 }
399 }
400 }