001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020 import java.util.concurrent.atomic.AtomicLong;
021
022 import org.apache.activemq.Service;
023 import org.apache.activemq.command.ActiveMQQueue;
024 import org.apache.activemq.command.ActiveMQTopic;
025 import org.apache.activemq.command.BrokerId;
026 import org.apache.activemq.command.BrokerInfo;
027 import org.apache.activemq.command.Command;
028 import org.apache.activemq.command.ConnectionId;
029 import org.apache.activemq.command.ConnectionInfo;
030 import org.apache.activemq.command.ConsumerInfo;
031 import org.apache.activemq.command.ExceptionResponse;
032 import org.apache.activemq.command.Message;
033 import org.apache.activemq.command.MessageAck;
034 import org.apache.activemq.command.MessageDispatch;
035 import org.apache.activemq.command.ProducerInfo;
036 import org.apache.activemq.command.Response;
037 import org.apache.activemq.command.SessionInfo;
038 import org.apache.activemq.command.ShutdownInfo;
039 import org.apache.activemq.transport.DefaultTransportListener;
040 import org.apache.activemq.transport.FutureResponse;
041 import org.apache.activemq.transport.ResponseCallback;
042 import org.apache.activemq.transport.Transport;
043 import org.apache.activemq.util.IdGenerator;
044 import org.apache.activemq.util.ServiceStopper;
045 import org.apache.activemq.util.ServiceSupport;
046 import org.slf4j.Logger;
047 import org.slf4j.LoggerFactory;
048
049 /**
050 * Forwards all messages from the local broker to the remote broker.
051 *
052 * @org.apache.xbean.XBean
053 *
054 *
055 */
056 public class ForwardingBridge implements Service {
057
058 private static final IdGenerator ID_GENERATOR = new IdGenerator();
059 private static final Logger LOG = LoggerFactory.getLogger(ForwardingBridge.class);
060
061 final AtomicLong enqueueCounter = new AtomicLong();
062 final AtomicLong dequeueCounter = new AtomicLong();
063 ConnectionInfo connectionInfo;
064 SessionInfo sessionInfo;
065 ProducerInfo producerInfo;
066 ConsumerInfo queueConsumerInfo;
067 ConsumerInfo topicConsumerInfo;
068 BrokerId localBrokerId;
069 BrokerId remoteBrokerId;
070 BrokerInfo localBrokerInfo;
071 BrokerInfo remoteBrokerInfo;
072
073 private final Transport localBroker;
074 private final Transport remoteBroker;
075 private String clientId;
076 private int prefetchSize = 1000;
077 private boolean dispatchAsync;
078 private String destinationFilter = ">";
079 private NetworkBridgeListener bridgeFailedListener;
080
081 public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
082 this.localBroker = localBroker;
083 this.remoteBroker = remoteBroker;
084 }
085
086 public void start() throws Exception {
087 LOG.info("Starting a network connection between " + localBroker + " and " + remoteBroker
088 + " has been established.");
089
090 localBroker.setTransportListener(new DefaultTransportListener() {
091 public void onCommand(Object o) {
092 Command command = (Command)o;
093 serviceLocalCommand(command);
094 }
095
096 public void onException(IOException error) {
097 serviceLocalException(error);
098 }
099 });
100
101 remoteBroker.setTransportListener(new DefaultTransportListener() {
102 public void onCommand(Object o) {
103 Command command = (Command)o;
104 serviceRemoteCommand(command);
105 }
106
107 public void onException(IOException error) {
108 serviceRemoteException(error);
109 }
110 });
111
112 localBroker.start();
113 remoteBroker.start();
114 }
115
116 protected void triggerStartBridge() throws IOException {
117 Thread thead = new Thread() {
118 public void run() {
119 try {
120 startBridge();
121 } catch (IOException e) {
122 LOG.error("Failed to start network bridge: " + e, e);
123 }
124 }
125 };
126 thead.start();
127 }
128
129 /**
130 * @throws IOException
131 */
132 final void startBridge() throws IOException {
133 connectionInfo = new ConnectionInfo();
134 connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId()));
135 connectionInfo.setClientId(clientId);
136 localBroker.oneway(connectionInfo);
137 remoteBroker.oneway(connectionInfo);
138
139 sessionInfo = new SessionInfo(connectionInfo, 1);
140 localBroker.oneway(sessionInfo);
141 remoteBroker.oneway(sessionInfo);
142
143 queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
144 queueConsumerInfo.setDispatchAsync(dispatchAsync);
145 queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
146 queueConsumerInfo.setPrefetchSize(prefetchSize);
147 queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
148 localBroker.oneway(queueConsumerInfo);
149
150 producerInfo = new ProducerInfo(sessionInfo, 1);
151 producerInfo.setResponseRequired(false);
152 remoteBroker.oneway(producerInfo);
153
154 if (connectionInfo.getClientId() != null) {
155 topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
156 topicConsumerInfo.setDispatchAsync(dispatchAsync);
157 topicConsumerInfo.setSubscriptionName("topic-bridge");
158 topicConsumerInfo.setRetroactive(true);
159 topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter));
160 topicConsumerInfo.setPrefetchSize(prefetchSize);
161 topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
162 localBroker.oneway(topicConsumerInfo);
163 }
164 LOG.info("Network connection between " + localBroker + " and " + remoteBroker
165 + " has been established.");
166 }
167
168 public void stop() throws Exception {
169 try {
170 if (connectionInfo != null) {
171 localBroker.request(connectionInfo.createRemoveCommand());
172 remoteBroker.request(connectionInfo.createRemoveCommand());
173 }
174 localBroker.setTransportListener(null);
175 remoteBroker.setTransportListener(null);
176 localBroker.oneway(new ShutdownInfo());
177 remoteBroker.oneway(new ShutdownInfo());
178 } finally {
179 ServiceStopper ss = new ServiceStopper();
180 ss.stop(localBroker);
181 ss.stop(remoteBroker);
182 ss.throwFirstException();
183 }
184 }
185
186 public void serviceRemoteException(Throwable error) {
187 LOG.info("Unexpected remote exception: " + error);
188 LOG.debug("Exception trace: ", error);
189 }
190
191 protected void serviceRemoteCommand(Command command) {
192 try {
193 if (command.isBrokerInfo()) {
194 synchronized (this) {
195 remoteBrokerInfo = (BrokerInfo)command;
196 remoteBrokerId = remoteBrokerInfo.getBrokerId();
197 if (localBrokerId != null) {
198 if (localBrokerId.equals(remoteBrokerId)) {
199 LOG.info("Disconnecting loop back connection.");
200 ServiceSupport.dispose(this);
201 } else {
202 triggerStartBridge();
203 }
204 }
205 }
206 } else {
207 LOG.warn("Unexpected remote command: " + command);
208 }
209 } catch (IOException e) {
210 serviceLocalException(e);
211 }
212 }
213
214 public void serviceLocalException(Throwable error) {
215 LOG.info("Unexpected local exception: " + error);
216 LOG.debug("Exception trace: ", error);
217 fireBridgeFailed();
218 }
219
220 protected void serviceLocalCommand(Command command) {
221 try {
222 if (command.isMessageDispatch()) {
223
224 enqueueCounter.incrementAndGet();
225
226 final MessageDispatch md = (MessageDispatch)command;
227 Message message = md.getMessage();
228 message.setProducerId(producerInfo.getProducerId());
229
230 if (message.getOriginalTransactionId() == null) {
231 message.setOriginalTransactionId(message.getTransactionId());
232 }
233 message.setTransactionId(null);
234
235 if (!message.isResponseRequired()) {
236 // If the message was originally sent using async send, we
237 // will preserve that QOS
238 // by bridging it using an async send (small chance of
239 // message loss).
240 remoteBroker.oneway(message);
241 dequeueCounter.incrementAndGet();
242 localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
243
244 } else {
245
246 // The message was not sent using async send, so we should
247 // only ack the local
248 // broker when we get confirmation that the remote broker
249 // has received the message.
250 ResponseCallback callback = new ResponseCallback() {
251 public void onCompletion(FutureResponse future) {
252 try {
253 Response response = future.getResult();
254 if (response.isException()) {
255 ExceptionResponse er = (ExceptionResponse)response;
256 serviceLocalException(er.getException());
257 } else {
258 dequeueCounter.incrementAndGet();
259 localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
260 }
261 } catch (IOException e) {
262 serviceLocalException(e);
263 }
264 }
265 };
266
267 remoteBroker.asyncRequest(message, callback);
268 }
269
270 // Ack on every message since we don't know if the broker is
271 // blocked due to memory
272 // usage and is waiting for an Ack to un-block him.
273
274 // Acking a range is more efficient, but also more prone to
275 // locking up a server
276 // Perhaps doing something like the following should be policy
277 // based.
278 // if(
279 // md.getConsumerId().equals(queueConsumerInfo.getConsumerId())
280 // ) {
281 // queueDispatched++;
282 // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2)
283 // ) {
284 // localBroker.oneway(new MessageAck(md,
285 // MessageAck.STANDARD_ACK_TYPE, queueDispatched));
286 // queueDispatched=0;
287 // }
288 // } else {
289 // topicDispatched++;
290 // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2)
291 // ) {
292 // localBroker.oneway(new MessageAck(md,
293 // MessageAck.STANDARD_ACK_TYPE, topicDispatched));
294 // topicDispatched=0;
295 // }
296 // }
297 } else if (command.isBrokerInfo()) {
298 synchronized (this) {
299 localBrokerInfo = (BrokerInfo)command;
300 localBrokerId = localBrokerInfo.getBrokerId();
301 if (remoteBrokerId != null) {
302 if (remoteBrokerId.equals(localBrokerId)) {
303 LOG.info("Disconnecting loop back connection.");
304 ServiceSupport.dispose(this);
305 } else {
306 triggerStartBridge();
307 }
308 }
309 }
310 } else {
311 LOG.debug("Unexpected local command: " + command);
312 }
313 } catch (IOException e) {
314 serviceLocalException(e);
315 }
316 }
317
318 public String getClientId() {
319 return clientId;
320 }
321
322 public void setClientId(String clientId) {
323 this.clientId = clientId;
324 }
325
326 public int getPrefetchSize() {
327 return prefetchSize;
328 }
329
330 public void setPrefetchSize(int prefetchSize) {
331 this.prefetchSize = prefetchSize;
332 }
333
334 public boolean isDispatchAsync() {
335 return dispatchAsync;
336 }
337
338 public void setDispatchAsync(boolean dispatchAsync) {
339 this.dispatchAsync = dispatchAsync;
340 }
341
342 public String getDestinationFilter() {
343 return destinationFilter;
344 }
345
346 public void setDestinationFilter(String destinationFilter) {
347 this.destinationFilter = destinationFilter;
348 }
349
350 public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) {
351 this.bridgeFailedListener = listener;
352 }
353
354 private void fireBridgeFailed() {
355 NetworkBridgeListener l = this.bridgeFailedListener;
356 if (l != null) {
357 l.bridgeFailed();
358 }
359 }
360
361 public String getRemoteAddress() {
362 return remoteBroker.getRemoteAddress();
363 }
364
365 public String getLocalAddress() {
366 return localBroker.getRemoteAddress();
367 }
368
369 public String getLocalBrokerName() {
370 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
371 }
372
373 public String getRemoteBrokerName() {
374 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
375 }
376
377 public long getDequeueCounter() {
378 return dequeueCounter.get();
379 }
380
381 public long getEnqueueCounter() {
382 return enqueueCounter.get();
383 }
384
385 }