001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020 import java.security.GeneralSecurityException;
021 import java.security.cert.X509Certificate;
022 import java.util.Arrays;
023 import java.util.Collection;
024 import java.util.List;
025 import java.util.Properties;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.CountDownLatch;
028 import java.util.concurrent.TimeUnit;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 import java.util.concurrent.atomic.AtomicLong;
031
032 import javax.management.ObjectName;
033 import org.apache.activemq.Service;
034 import org.apache.activemq.advisory.AdvisorySupport;
035 import org.apache.activemq.broker.BrokerService;
036 import org.apache.activemq.broker.BrokerServiceAware;
037 import org.apache.activemq.broker.TransportConnection;
038 import org.apache.activemq.broker.region.AbstractRegion;
039 import org.apache.activemq.broker.region.DurableTopicSubscription;
040 import org.apache.activemq.broker.region.Region;
041 import org.apache.activemq.broker.region.RegionBroker;
042 import org.apache.activemq.broker.region.Subscription;
043 import org.apache.activemq.broker.region.policy.PolicyEntry;
044 import org.apache.activemq.command.*;
045 import org.apache.activemq.filter.DestinationFilter;
046 import org.apache.activemq.filter.MessageEvaluationContext;
047 import org.apache.activemq.thread.DefaultThreadPools;
048 import org.apache.activemq.thread.TaskRunnerFactory;
049 import org.apache.activemq.transport.DefaultTransportListener;
050 import org.apache.activemq.transport.FutureResponse;
051 import org.apache.activemq.transport.ResponseCallback;
052 import org.apache.activemq.transport.Transport;
053 import org.apache.activemq.transport.TransportDisposedIOException;
054 import org.apache.activemq.transport.TransportFilter;
055 import org.apache.activemq.transport.tcp.SslTransport;
056 import org.apache.activemq.util.IdGenerator;
057 import org.apache.activemq.util.IntrospectionSupport;
058 import org.apache.activemq.util.LongSequenceGenerator;
059 import org.apache.activemq.util.MarshallingSupport;
060 import org.apache.activemq.util.ServiceStopper;
061 import org.apache.activemq.util.ServiceSupport;
062 import org.slf4j.Logger;
063 import org.slf4j.LoggerFactory;
064
065 /**
066 * A useful base class for implementing demand forwarding bridges.
067 */
068 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
069 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
070 private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory();
071 protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
072 protected final Transport localBroker;
073 protected final Transport remoteBroker;
074 protected final IdGenerator idGenerator = new IdGenerator();
075 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
076 protected ConnectionInfo localConnectionInfo;
077 protected ConnectionInfo remoteConnectionInfo;
078 protected SessionInfo localSessionInfo;
079 protected ProducerInfo producerInfo;
080 protected String remoteBrokerName = "Unknown";
081 protected String localClientId;
082 protected ConsumerInfo demandConsumerInfo;
083 protected int demandConsumerDispatched;
084 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
085 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
086 protected AtomicBoolean disposed = new AtomicBoolean();
087 protected BrokerId localBrokerId;
088 protected ActiveMQDestination[] excludedDestinations;
089 protected ActiveMQDestination[] dynamicallyIncludedDestinations;
090 protected ActiveMQDestination[] staticallyIncludedDestinations;
091 protected ActiveMQDestination[] durableDestinations;
092 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
093 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
094 protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
095 protected CountDownLatch startedLatch = new CountDownLatch(2);
096 protected CountDownLatch localStartedLatch = new CountDownLatch(1);
097 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
098 protected NetworkBridgeConfiguration configuration;
099 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
100
101 protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
102 protected Object brokerInfoMutex = new Object();
103 protected BrokerId remoteBrokerId;
104
105 final AtomicLong enqueueCounter = new AtomicLong();
106 final AtomicLong dequeueCounter = new AtomicLong();
107
108 private NetworkBridgeListener networkBridgeListener;
109 private boolean createdByDuplex;
110 private BrokerInfo localBrokerInfo;
111 private BrokerInfo remoteBrokerInfo;
112
113 private final AtomicBoolean started = new AtomicBoolean();
114 private TransportConnection duplexInitiatingConnection;
115 private BrokerService brokerService = null;
116 private ObjectName mbeanObjectName;
117
118 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
119 this.configuration = configuration;
120 this.localBroker = localBroker;
121 this.remoteBroker = remoteBroker;
122 }
123
124 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
125 this.localBrokerInfo = localBrokerInfo;
126 this.remoteBrokerInfo = remoteBrokerInfo;
127 this.duplexInitiatingConnection = connection;
128 start();
129 serviceRemoteCommand(remoteBrokerInfo);
130 }
131
132 public void start() throws Exception {
133 if (started.compareAndSet(false, true)) {
134 localBroker.setTransportListener(new DefaultTransportListener() {
135
136 @Override
137 public void onCommand(Object o) {
138 Command command = (Command) o;
139 serviceLocalCommand(command);
140 }
141
142 @Override
143 public void onException(IOException error) {
144 serviceLocalException(error);
145 }
146 });
147 remoteBroker.setTransportListener(new DefaultTransportListener() {
148
149 public void onCommand(Object o) {
150 Command command = (Command) o;
151 serviceRemoteCommand(command);
152 }
153
154 public void onException(IOException error) {
155 serviceRemoteException(error);
156 }
157
158 });
159
160 localBroker.start();
161 remoteBroker.start();
162 if (!disposed.get()) {
163 try {
164 triggerRemoteStartBridge();
165 } catch (IOException e) {
166 LOG.warn("Caught exception from remote start", e);
167 }
168 } else {
169 LOG.warn ("Bridge was disposed before the start() method was fully executed.");
170 throw new TransportDisposedIOException();
171 }
172 }
173 }
174
175 protected void triggerLocalStartBridge() throws IOException {
176 asyncTaskRunner.execute(new Runnable() {
177 public void run() {
178 final String originalName = Thread.currentThread().getName();
179 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
180 try {
181 startLocalBridge();
182 } catch (Throwable e) {
183 serviceLocalException(e);
184 } finally {
185 Thread.currentThread().setName(originalName);
186 }
187 }
188 });
189 }
190
191 protected void triggerRemoteStartBridge() throws IOException {
192 asyncTaskRunner.execute(new Runnable() {
193 public void run() {
194 final String originalName = Thread.currentThread().getName();
195 Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
196 try {
197 startRemoteBridge();
198 } catch (Exception e) {
199 serviceRemoteException(e);
200 } finally {
201 Thread.currentThread().setName(originalName);
202 }
203 }
204 });
205 }
206
207 private void startLocalBridge() throws Throwable {
208 if (localBridgeStarted.compareAndSet(false, true)) {
209 synchronized (this) {
210 if (LOG.isTraceEnabled()) {
211 LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
212 }
213 if (!disposed.get()) {
214 localConnectionInfo = new ConnectionInfo();
215 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
216 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
217 localConnectionInfo.setClientId(localClientId);
218 localConnectionInfo.setUserName(configuration.getUserName());
219 localConnectionInfo.setPassword(configuration.getPassword());
220 Transport originalTransport = remoteBroker;
221 while (originalTransport instanceof TransportFilter) {
222 originalTransport = ((TransportFilter) originalTransport).getNext();
223 }
224 if (originalTransport instanceof SslTransport) {
225 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
226 localConnectionInfo.setTransportContext(peerCerts);
227 }
228 // sync requests that may fail
229 Object resp = localBroker.request(localConnectionInfo);
230 if (resp instanceof ExceptionResponse) {
231 throw ((ExceptionResponse)resp).getException();
232 }
233 localSessionInfo = new SessionInfo(localConnectionInfo, 1);
234 localBroker.oneway(localSessionInfo);
235
236 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
237 NetworkBridgeListener l = this.networkBridgeListener;
238 if (l != null) {
239 l.onStart(this);
240 }
241 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
242
243 } else {
244 LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
245 }
246 startedLatch.countDown();
247 localStartedLatch.countDown();
248 if (!disposed.get()) {
249 setupStaticDestinations();
250 } else {
251 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
252 }
253 }
254 }
255 }
256
257 protected void startRemoteBridge() throws Exception {
258 if (remoteBridgeStarted.compareAndSet(false, true)) {
259 if (LOG.isTraceEnabled()) {
260 LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker=" + remoteBroker);
261 }
262 synchronized (this) {
263 if (!isCreatedByDuplex()) {
264 BrokerInfo brokerInfo = new BrokerInfo();
265 brokerInfo.setBrokerName(configuration.getBrokerName());
266 brokerInfo.setBrokerURL(configuration.getBrokerURL());
267 brokerInfo.setNetworkConnection(true);
268 brokerInfo.setDuplexConnection(configuration.isDuplex());
269 // set our properties
270 Properties props = new Properties();
271 IntrospectionSupport.getProperties(configuration, props, null);
272 String str = MarshallingSupport.propertiesToString(props);
273 brokerInfo.setNetworkProperties(str);
274 brokerInfo.setBrokerId(this.localBrokerId);
275 remoteBroker.oneway(brokerInfo);
276 }
277 if (remoteConnectionInfo != null) {
278 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
279 }
280 remoteConnectionInfo = new ConnectionInfo();
281 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
282 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
283 remoteConnectionInfo.setUserName(configuration.getUserName());
284 remoteConnectionInfo.setPassword(configuration.getPassword());
285 remoteBroker.oneway(remoteConnectionInfo);
286
287 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
288 remoteBroker.oneway(remoteSessionInfo);
289 producerInfo = new ProducerInfo(remoteSessionInfo, 1);
290 producerInfo.setResponseRequired(false);
291 remoteBroker.oneway(producerInfo);
292 // Listen to consumer advisory messages on the remote broker to
293 // determine demand.
294 if (!configuration.isStaticBridge()) {
295 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
296 demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
297 String advisoryTopic = configuration.getDestinationFilter();
298 if (configuration.isBridgeTempDestinations()) {
299 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
300 }
301 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
302 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
303 remoteBroker.oneway(demandConsumerInfo);
304 }
305 startedLatch.countDown();
306 }
307 }
308 }
309
310 public void stop() throws Exception {
311 if (started.compareAndSet(true, false)) {
312 if (disposed.compareAndSet(false, true)) {
313 LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
314 NetworkBridgeListener l = this.networkBridgeListener;
315 if (l != null) {
316 l.onStop(this);
317 }
318 try {
319 remoteBridgeStarted.set(false);
320 final CountDownLatch sendShutdown = new CountDownLatch(1);
321 asyncTaskRunner.execute(new Runnable() {
322 public void run() {
323 try {
324 localBroker.oneway(new ShutdownInfo());
325 sendShutdown.countDown();
326 remoteBroker.oneway(new ShutdownInfo());
327 } catch (Throwable e) {
328 LOG.debug("Caught exception sending shutdown", e);
329 } finally {
330 sendShutdown.countDown();
331 }
332
333 }
334 });
335 if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
336 LOG.info("Network Could not shutdown in a timely manner");
337 }
338 } finally {
339 ServiceStopper ss = new ServiceStopper();
340 ss.stop(remoteBroker);
341 ss.stop(localBroker);
342 // Release the started Latch since another thread could be
343 // stuck waiting for it to start up.
344 startedLatch.countDown();
345 startedLatch.countDown();
346 localStartedLatch.countDown();
347 ss.throwFirstException();
348 }
349 }
350 if (remoteBrokerInfo != null) {
351 brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
352 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
353 LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
354 }
355 }
356 }
357
358 public void serviceRemoteException(Throwable error) {
359 if (!disposed.get()) {
360 if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
361 LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
362 } else {
363 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
364 }
365 LOG.debug("The remote Exception was: " + error, error);
366 asyncTaskRunner.execute(new Runnable() {
367 public void run() {
368 ServiceSupport.dispose(getControllingService());
369 }
370 });
371 fireBridgeFailed();
372 }
373 }
374
375 protected void serviceRemoteCommand(Command command) {
376 if (!disposed.get()) {
377 try {
378 if (command.isMessageDispatch()) {
379 waitStarted();
380 MessageDispatch md = (MessageDispatch) command;
381 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
382 ackAdvisory(md.getMessage());
383 } else if (command.isBrokerInfo()) {
384 lastConnectSucceeded.set(true);
385 remoteBrokerInfo = (BrokerInfo) command;
386 Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
387 try {
388 IntrospectionSupport.getProperties(configuration, props, null);
389 if (configuration.getExcludedDestinations() != null) {
390 excludedDestinations = configuration.getExcludedDestinations().toArray(
391 new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
392 }
393 if (configuration.getStaticallyIncludedDestinations() != null) {
394 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
395 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
396 }
397 if (configuration.getDynamicallyIncludedDestinations() != null) {
398 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
399 .toArray(
400 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
401 .size()]);
402 }
403 } catch (Throwable t) {
404 LOG.error("Error mapping remote destinations", t);
405 }
406 serviceRemoteBrokerInfo(command);
407 // Let the local broker know the remote broker's ID.
408 localBroker.oneway(command);
409 // new peer broker (a consumer can work with remote broker also)
410 brokerService.getBroker().addBroker(null, remoteBrokerInfo);
411 } else if (command.getClass() == ConnectionError.class) {
412 ConnectionError ce = (ConnectionError) command;
413 serviceRemoteException(ce.getException());
414 } else {
415 if (isDuplex()) {
416 if (command.isMessage()) {
417 ActiveMQMessage message = (ActiveMQMessage) command;
418 if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
419 || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
420 serviceRemoteConsumerAdvisory(message.getDataStructure());
421 ackAdvisory(message);
422 } else {
423 if (!isPermissableDestination(message.getDestination(), true)) {
424 return;
425 }
426 if (message.isResponseRequired()) {
427 Response reply = new Response();
428 reply.setCorrelationId(message.getCommandId());
429 localBroker.oneway(message);
430 remoteBroker.oneway(reply);
431 } else {
432 localBroker.oneway(message);
433 }
434 }
435 } else {
436 switch (command.getDataStructureType()) {
437 case ConnectionInfo.DATA_STRUCTURE_TYPE:
438 case SessionInfo.DATA_STRUCTURE_TYPE:
439 case ProducerInfo.DATA_STRUCTURE_TYPE:
440 localBroker.oneway(command);
441 break;
442 case MessageAck.DATA_STRUCTURE_TYPE:
443 MessageAck ack = (MessageAck) command;
444 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
445 if (localSub != null) {
446 ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
447 localBroker.oneway(ack);
448 } else {
449 LOG.warn("Matching local subscription not found for ack: " + ack);
450 }
451 break;
452 case ConsumerInfo.DATA_STRUCTURE_TYPE:
453 localStartedLatch.await();
454 if (started.get()) {
455 if (!addConsumerInfo((ConsumerInfo) command)) {
456 if (LOG.isDebugEnabled()) {
457 LOG.debug("Ignoring ConsumerInfo: " + command);
458 }
459 } else {
460 if (LOG.isTraceEnabled()) {
461 LOG.trace("Adding ConsumerInfo: " + command);
462 }
463 }
464 } else {
465 // received a subscription whilst stopping
466 LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
467 }
468 break;
469 case ShutdownInfo.DATA_STRUCTURE_TYPE:
470 // initiator is shutting down, controlled case
471 // abortive close dealt with by inactivity monitor
472 LOG.info("Stopping network bridge on shutdown of remote broker");
473 serviceRemoteException(new IOException(command.toString()));
474 break;
475 default:
476 if (LOG.isDebugEnabled()) {
477 LOG.debug("Ignoring remote command: " + command);
478 }
479 }
480 }
481 } else {
482 switch (command.getDataStructureType()) {
483 case KeepAliveInfo.DATA_STRUCTURE_TYPE:
484 case WireFormatInfo.DATA_STRUCTURE_TYPE:
485 case ShutdownInfo.DATA_STRUCTURE_TYPE:
486 break;
487 default:
488 LOG.warn("Unexpected remote command: " + command);
489 }
490 }
491 }
492 } catch (Throwable e) {
493 if (LOG.isDebugEnabled()) {
494 LOG.debug("Exception processing remote command: " + command, e);
495 }
496 serviceRemoteException(e);
497 }
498 }
499 }
500
501 private void ackAdvisory(Message message) throws IOException {
502 demandConsumerDispatched++;
503 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
504 MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
505 ack.setConsumerId(demandConsumerInfo.getConsumerId());
506 remoteBroker.oneway(ack);
507 demandConsumerDispatched = 0;
508 }
509 }
510
511 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
512 final int networkTTL = configuration.getNetworkTTL();
513 if (data.getClass() == ConsumerInfo.class) {
514 // Create a new local subscription
515 ConsumerInfo info = (ConsumerInfo) data;
516 BrokerId[] path = info.getBrokerPath();
517
518 if (info.isBrowser()) {
519 if (LOG.isDebugEnabled()) {
520 LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
521 }
522 return;
523 }
524
525 if (path != null && path.length >= networkTTL) {
526 if (LOG.isDebugEnabled()) {
527 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
528 }
529 return;
530 }
531 if (contains(path, localBrokerPath[0])) {
532 // Ignore this consumer as it's a consumer we locally sent to the broker.
533 if (LOG.isDebugEnabled()) {
534 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
535 }
536 return;
537 }
538 if (!isPermissableDestination(info.getDestination())) {
539 // ignore if not in the permitted or in the excluded list
540 if (LOG.isDebugEnabled()) {
541 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
542 }
543 return;
544 }
545
546 // in a cyclic network there can be multiple bridges per broker that can propagate
547 // a network subscription so there is a need to synchronise on a shared entity
548 synchronized (brokerService.getVmConnectorURI()) {
549 if (addConsumerInfo(info)) {
550 if (LOG.isDebugEnabled()) {
551 LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
552 }
553 } else {
554 if (LOG.isDebugEnabled()) {
555 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
556 }
557 }
558 }
559 } else if (data.getClass() == DestinationInfo.class) {
560 // It's a destination info - we want to pass up
561 // information about temporary destinations
562 DestinationInfo destInfo = (DestinationInfo) data;
563 BrokerId[] path = destInfo.getBrokerPath();
564 if (path != null && path.length >= networkTTL) {
565 if (LOG.isDebugEnabled()) {
566 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
567 }
568 return;
569 }
570 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
571 // Ignore this consumer as it's a consumer we locally sent to
572 // the broker.
573 if (LOG.isDebugEnabled()) {
574 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
575 }
576 return;
577 }
578 destInfo.setConnectionId(localConnectionInfo.getConnectionId());
579 if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
580 // re-set connection id so comes from here
581 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
582 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
583 }
584 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
585 if (LOG.isTraceEnabled()) {
586 LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
587 }
588 localBroker.oneway(destInfo);
589 } else if (data.getClass() == RemoveInfo.class) {
590 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
591 removeDemandSubscription(id);
592 }
593 }
594
595 public void serviceLocalException(Throwable error) {
596 if (!disposed.get()) {
597 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
598 LOG.debug("The local Exception was:" + error, error);
599 asyncTaskRunner.execute(new Runnable() {
600 public void run() {
601 ServiceSupport.dispose(getControllingService());
602 }
603 });
604 fireBridgeFailed();
605 }
606 }
607
608 protected Service getControllingService() {
609 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
610 }
611
612 protected void addSubscription(DemandSubscription sub) throws IOException {
613 if (sub != null) {
614 localBroker.oneway(sub.getLocalInfo());
615 }
616 }
617
618 protected void removeSubscription(final DemandSubscription sub) throws IOException {
619 if (sub != null) {
620 if (LOG.isDebugEnabled()) {
621 LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
622 }
623 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
624 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
625
626 // continue removal in separate thread to free up this thread for outstanding responses
627 asyncTaskRunner.execute(new Runnable() {
628 public void run() {
629 sub.waitForCompletion();
630 try {
631 localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
632 } catch (IOException e) {
633 LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
634 }
635 }
636 });
637 }
638 }
639
640 protected Message configureMessage(MessageDispatch md) {
641 Message message = md.getMessage().copy();
642 // Update the packet to show where it came from.
643 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
644 message.setProducerId(producerInfo.getProducerId());
645 message.setDestination(md.getDestination());
646 if (message.getOriginalTransactionId() == null) {
647 message.setOriginalTransactionId(message.getTransactionId());
648 }
649 message.setTransactionId(null);
650 return message;
651 }
652
653 protected void serviceLocalCommand(Command command) {
654 if (!disposed.get()) {
655 try {
656 if (command.isMessageDispatch()) {
657 enqueueCounter.incrementAndGet();
658 final MessageDispatch md = (MessageDispatch) command;
659 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
660 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
661
662 if (suppressMessageDispatch(md, sub)) {
663 if (LOG.isDebugEnabled()) {
664 LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
665 }
666 // still ack as it may be durable
667 try {
668 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
669 } finally {
670 sub.decrementOutstandingResponses();
671 }
672 return;
673 }
674
675 Message message = configureMessage(md);
676 if (LOG.isDebugEnabled()) {
677 LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
678 }
679
680 if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {
681
682 // If the message was originally sent using async
683 // send, we will preserve that QOS
684 // by bridging it using an async send (small chance
685 // of message loss).
686 try {
687 remoteBroker.oneway(message);
688 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
689 dequeueCounter.incrementAndGet();
690 } finally {
691 sub.decrementOutstandingResponses();
692 }
693
694 } else {
695
696 // The message was not sent using async send, so we
697 // should only ack the local
698 // broker when we get confirmation that the remote
699 // broker has received the message.
700 ResponseCallback callback = new ResponseCallback() {
701 public void onCompletion(FutureResponse future) {
702 try {
703 Response response = future.getResult();
704 if (response.isException()) {
705 ExceptionResponse er = (ExceptionResponse) response;
706 serviceLocalException(er.getException());
707 } else {
708 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
709 dequeueCounter.incrementAndGet();
710 }
711 } catch (IOException e) {
712 serviceLocalException(e);
713 } finally {
714 sub.decrementOutstandingResponses();
715 }
716 }
717 };
718
719 remoteBroker.asyncRequest(message, callback);
720
721 }
722 } else {
723 if (LOG.isDebugEnabled()) {
724 LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
725 }
726 }
727 } else if (command.isBrokerInfo()) {
728 localBrokerInfo = (BrokerInfo) command;
729 serviceLocalBrokerInfo(command);
730 } else if (command.isShutdownInfo()) {
731 LOG.info(configuration.getBrokerName() + " Shutting down");
732 stop();
733 } else if (command.getClass() == ConnectionError.class) {
734 ConnectionError ce = (ConnectionError) command;
735 serviceLocalException(ce.getException());
736 } else {
737 switch (command.getDataStructureType()) {
738 case WireFormatInfo.DATA_STRUCTURE_TYPE:
739 break;
740 default:
741 LOG.warn("Unexpected local command: " + command);
742 }
743 }
744 } catch (Throwable e) {
745 LOG.warn("Caught an exception processing local command", e);
746 serviceLocalException(e);
747 }
748 }
749 }
750
751 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
752 boolean suppress = false;
753 // for durable subs, suppression via filter leaves dangling acks so we need to
754 // check here and allow the ack irrespective
755 if (sub.getLocalInfo().isDurable()) {
756 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
757 messageEvalContext.setMessageReference(md.getMessage());
758 messageEvalContext.setDestination(md.getDestination());
759 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
760 }
761 return suppress;
762 }
763
764 /**
765 * @return Returns the dynamicallyIncludedDestinations.
766 */
767 public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
768 return dynamicallyIncludedDestinations;
769 }
770
771 /**
772 * @param dynamicallyIncludedDestinations The
773 * dynamicallyIncludedDestinations to set.
774 */
775 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
776 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
777 }
778
779 /**
780 * @return Returns the excludedDestinations.
781 */
782 public ActiveMQDestination[] getExcludedDestinations() {
783 return excludedDestinations;
784 }
785
786 /**
787 * @param excludedDestinations The excludedDestinations to set.
788 */
789 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
790 this.excludedDestinations = excludedDestinations;
791 }
792
793 /**
794 * @return Returns the staticallyIncludedDestinations.
795 */
796 public ActiveMQDestination[] getStaticallyIncludedDestinations() {
797 return staticallyIncludedDestinations;
798 }
799
800 /**
801 * @param staticallyIncludedDestinations The staticallyIncludedDestinations
802 * to set.
803 */
804 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
805 this.staticallyIncludedDestinations = staticallyIncludedDestinations;
806 }
807
808 /**
809 * @return Returns the durableDestinations.
810 */
811 public ActiveMQDestination[] getDurableDestinations() {
812 return durableDestinations;
813 }
814
815 /**
816 * @param durableDestinations The durableDestinations to set.
817 */
818 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
819 this.durableDestinations = durableDestinations;
820 }
821
822 /**
823 * @return Returns the localBroker.
824 */
825 public Transport getLocalBroker() {
826 return localBroker;
827 }
828
829 /**
830 * @return Returns the remoteBroker.
831 */
832 public Transport getRemoteBroker() {
833 return remoteBroker;
834 }
835
836 /**
837 * @return the createdByDuplex
838 */
839 public boolean isCreatedByDuplex() {
840 return this.createdByDuplex;
841 }
842
843 /**
844 * @param createdByDuplex the createdByDuplex to set
845 */
846 public void setCreatedByDuplex(boolean createdByDuplex) {
847 this.createdByDuplex = createdByDuplex;
848 }
849
850 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
851 if (brokerPath != null) {
852 for (int i = 0; i < brokerPath.length; i++) {
853 if (brokerId.equals(brokerPath[i])) {
854 return true;
855 }
856 }
857 }
858 return false;
859 }
860
861 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
862 if (brokerPath == null || brokerPath.length == 0) {
863 return pathsToAppend;
864 }
865 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
866 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
867 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
868 return rc;
869 }
870
871 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
872 if (brokerPath == null || brokerPath.length == 0) {
873 return new BrokerId[] { idToAppend };
874 }
875 BrokerId rc[] = new BrokerId[brokerPath.length + 1];
876 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
877 rc[brokerPath.length] = idToAppend;
878 return rc;
879 }
880
881 protected boolean isPermissableDestination(ActiveMQDestination destination) {
882 return isPermissableDestination(destination, false);
883 }
884
885 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
886 // Are we not bridging temp destinations?
887 if (destination.isTemporary()) {
888 if (allowTemporary) {
889 return true;
890 } else {
891 return configuration.isBridgeTempDestinations();
892 }
893 }
894
895 ActiveMQDestination[] dests = staticallyIncludedDestinations;
896 if (dests != null && dests.length > 0) {
897 for (int i = 0; i < dests.length; i++) {
898 ActiveMQDestination match = dests[i];
899 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
900 if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
901 return true;
902 }
903 }
904 }
905
906 dests = excludedDestinations;
907 if (dests != null && dests.length > 0) {
908 for (int i = 0; i < dests.length; i++) {
909 ActiveMQDestination match = dests[i];
910 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
911 if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
912 return false;
913 }
914 }
915 }
916
917 dests = dynamicallyIncludedDestinations;
918 if (dests != null && dests.length > 0) {
919 for (int i = 0; i < dests.length; i++) {
920 ActiveMQDestination match = dests[i];
921 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
922 if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
923 return true;
924 }
925 }
926
927 return false;
928 }
929 return true;
930 }
931
932 /**
933 * Subscriptions for these destinations are always created
934 */
935 protected void setupStaticDestinations() {
936 ActiveMQDestination[] dests = staticallyIncludedDestinations;
937 if (dests != null) {
938 for (int i = 0; i < dests.length; i++) {
939 ActiveMQDestination dest = dests[i];
940 DemandSubscription sub = createDemandSubscription(dest);
941 try {
942 addSubscription(sub);
943 } catch (IOException e) {
944 LOG.error("Failed to add static destination " + dest, e);
945 }
946 if (LOG.isTraceEnabled()) {
947 LOG.trace("bridging messages for static destination: " + dest);
948 }
949 }
950 }
951 }
952
953 protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
954 boolean consumerAdded = false;
955 ConsumerInfo info = consumerInfo.copy();
956 addRemoteBrokerToBrokerPath(info);
957 DemandSubscription sub = createDemandSubscription(info);
958 if (sub != null) {
959 if (duplicateSuppressionIsRequired(sub)) {
960 undoMapRegistration(sub);
961 } else {
962 addSubscription(sub);
963 consumerAdded = true;
964 }
965 }
966 return consumerAdded;
967 }
968
969 private void undoMapRegistration(DemandSubscription sub) {
970 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
971 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
972 }
973
974 /*
975 * check our existing subs networkConsumerIds against the list of network ids in this subscription
976 * A match means a duplicate which we suppress for topics and maybe for queues
977 */
978 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
979 final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
980 boolean suppress = false;
981
982 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
983 consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
984 return suppress;
985 }
986
987 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
988 Collection<Subscription> currentSubs =
989 getRegionSubscriptions(consumerInfo.getDestination());
990 for (Subscription sub : currentSubs) {
991 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
992 if (!networkConsumers.isEmpty()) {
993 if (matchFound(candidateConsumers, networkConsumers)) {
994 if (isInActiveDurableSub(sub)) {
995 suppress = false;
996 } else {
997 suppress = hasLowerPriority(sub, candidate.getLocalInfo());
998 }
999 break;
1000 }
1001 }
1002 }
1003 return suppress;
1004 }
1005
1006 private boolean isInActiveDurableSub(Subscription sub) {
1007 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive());
1008 }
1009
1010 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1011 boolean suppress = false;
1012
1013 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1014 if (LOG.isDebugEnabled()) {
1015 LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
1016 + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
1017 + existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
1018 }
1019 suppress = true;
1020 } else {
1021 // remove the existing lower priority duplicate and allow this candidate
1022 try {
1023 removeDuplicateSubscription(existingSub);
1024
1025 if (LOG.isDebugEnabled()) {
1026 LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
1027 + " with sub from " + remoteBrokerName
1028 + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
1029 + candidateInfo.getNetworkConsumerIds());
1030 }
1031 } catch (IOException e) {
1032 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1033 }
1034 }
1035 return suppress;
1036 }
1037
1038 private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1039 for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1040 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1041 break;
1042 }
1043 }
1044 }
1045
1046 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1047 boolean found = false;
1048 for (ConsumerId aliasConsumer : networkConsumers) {
1049 if (candidateConsumers.contains(aliasConsumer)) {
1050 found = true;
1051 break;
1052 }
1053 }
1054 return found;
1055 }
1056
1057 private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1058 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1059 Region region;
1060 Collection<Subscription> subs;
1061
1062 region = null;
1063 switch ( dest.getDestinationType() )
1064 {
1065 case ActiveMQDestination.QUEUE_TYPE:
1066 region = region_broker.getQueueRegion();
1067 break;
1068
1069 case ActiveMQDestination.TOPIC_TYPE:
1070 region = region_broker.getTopicRegion();
1071 break;
1072
1073 case ActiveMQDestination.TEMP_QUEUE_TYPE:
1074 region = region_broker.getTempQueueRegion();
1075 break;
1076
1077 case ActiveMQDestination.TEMP_TOPIC_TYPE:
1078 region = region_broker.getTempTopicRegion();
1079 break;
1080 }
1081
1082 if ( region instanceof AbstractRegion )
1083 subs = ((AbstractRegion) region).getSubscriptions().values();
1084 else
1085 subs = null;
1086
1087 return subs;
1088 }
1089
1090 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1091 //add our original id to ourselves
1092 info.addNetworkConsumerId(info.getConsumerId());
1093 return doCreateDemandSubscription(info);
1094 }
1095
1096 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1097 DemandSubscription result = new DemandSubscription(info);
1098 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1099 if (info.getDestination().isTemporary()) {
1100 // reset the local connection Id
1101
1102 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1103 dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1104 }
1105
1106 if (configuration.isDecreaseNetworkConsumerPriority()) {
1107 byte priority = (byte) configuration.getConsumerPriorityBase();
1108 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1109 // The longer the path to the consumer, the less it's consumer priority.
1110 priority -= info.getBrokerPath().length + 1;
1111 }
1112 result.getLocalInfo().setPriority(priority);
1113 if (LOG.isDebugEnabled()) {
1114 LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1115 }
1116 }
1117 configureDemandSubscription(info, result);
1118 return result;
1119 }
1120
1121 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1122 ConsumerInfo info = new ConsumerInfo();
1123 info.setDestination(destination);
1124 // the remote info held by the DemandSubscription holds the original
1125 // consumerId,
1126 // the local info get's overwritten
1127
1128 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1129 DemandSubscription result = null;
1130 try {
1131 result = createDemandSubscription(info);
1132 } catch (IOException e) {
1133 LOG.error("Failed to create DemandSubscription ", e);
1134 }
1135 return result;
1136 }
1137
1138 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1139 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1140 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1141 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1142 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1143
1144 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1145 if (!info.isDurable()) {
1146 // This works for now since we use a VM connection to the local broker.
1147 // may need to change if we ever subscribe to a remote broker.
1148 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1149 } else {
1150 // need to ack this message if it is ignored as it is durable so
1151 // we check before we send. see: suppressMessageDispatch()
1152 }
1153 }
1154
1155 protected void removeDemandSubscription(ConsumerId id) throws IOException {
1156 DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1157 if (LOG.isDebugEnabled()) {
1158 LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
1159 }
1160 if (sub != null) {
1161 removeSubscription(sub);
1162 if (LOG.isDebugEnabled()) {
1163 LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo());
1164 }
1165 }
1166 }
1167
1168 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1169 boolean removeDone = false;
1170 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1171 if (sub != null) {
1172 try {
1173 removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1174 removeDone = true;
1175 } catch (IOException e) {
1176 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1177 }
1178 }
1179 return removeDone;
1180 }
1181
1182 protected void waitStarted() throws InterruptedException {
1183 startedLatch.await();
1184 }
1185
1186 protected void clearDownSubscriptions() {
1187 subscriptionMapByLocalId.clear();
1188 subscriptionMapByRemoteId.clear();
1189 }
1190
1191 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1192 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1193 if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1194 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1195 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1196 filterFactory = entry.getNetworkBridgeFilterFactory();
1197 }
1198 }
1199 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
1200 }
1201
1202 protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
1203 synchronized (brokerInfoMutex) {
1204 if (remoteBrokerId != null) {
1205 if (remoteBrokerId.equals(localBrokerId)) {
1206 if (LOG.isTraceEnabled()) {
1207 LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1208 }
1209 waitStarted();
1210 ServiceSupport.dispose(this);
1211 }
1212 }
1213 }
1214 }
1215
1216 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1217 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1218 }
1219
1220 protected void serviceRemoteBrokerInfo(Command command) throws IOException {
1221 synchronized (brokerInfoMutex) {
1222 BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
1223 remoteBrokerId = remoteBrokerInfo.getBrokerId();
1224 remoteBrokerPath[0] = remoteBrokerId;
1225 remoteBrokerName = remoteBrokerInfo.getBrokerName();
1226 if (localBrokerId != null) {
1227 if (localBrokerId.equals(remoteBrokerId)) {
1228 if (LOG.isTraceEnabled()) {
1229 LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1230 }
1231 ServiceSupport.dispose(this);
1232 }
1233 }
1234 if (!disposed.get()) {
1235 triggerLocalStartBridge();
1236 }
1237 }
1238 }
1239
1240 protected BrokerId[] getRemoteBrokerPath() {
1241 return remoteBrokerPath;
1242 }
1243
1244 public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1245 this.networkBridgeListener = listener;
1246 }
1247
1248 private void fireBridgeFailed() {
1249 NetworkBridgeListener l = this.networkBridgeListener;
1250 if (l != null) {
1251 l.bridgeFailed();
1252 }
1253 }
1254
1255 public String getRemoteAddress() {
1256 return remoteBroker.getRemoteAddress();
1257 }
1258
1259 public String getLocalAddress() {
1260 return localBroker.getRemoteAddress();
1261 }
1262
1263 public String getRemoteBrokerName() {
1264 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1265 }
1266
1267 public String getLocalBrokerName() {
1268 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1269 }
1270
1271 public long getDequeueCounter() {
1272 return dequeueCounter.get();
1273 }
1274
1275 public long getEnqueueCounter() {
1276 return enqueueCounter.get();
1277 }
1278
1279 protected boolean isDuplex() {
1280 return configuration.isDuplex() || createdByDuplex;
1281 }
1282
1283 public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1284 return subscriptionMapByRemoteId;
1285 }
1286
1287 public void setBrokerService(BrokerService brokerService) {
1288 this.brokerService = brokerService;
1289 this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1290 localBrokerPath[0] = localBrokerId;
1291 }
1292
1293 public void setMbeanObjectName(ObjectName objectName) {
1294 this.mbeanObjectName = objectName;
1295 }
1296
1297 public ObjectName getMbeanObjectName() {
1298 return mbeanObjectName;
1299 }
1300 }