001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Arrays;
022 import java.util.Iterator;
023 import java.util.List;
024 import org.apache.activemq.command.BrokerId;
025 import org.apache.activemq.command.ConsumerId;
026 import org.apache.activemq.command.ConsumerInfo;
027 import org.apache.activemq.filter.DestinationFilter;
028 import org.apache.activemq.transport.Transport;
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031
032 /**
033 * Consolidates subscriptions
034 *
035 *
036 */
037 public class ConduitBridge extends DemandForwardingBridge {
038 private static final Logger LOG = LoggerFactory.getLogger(ConduitBridge.class);
039
040 /**
041 * Constructor
042 *
043 * @param localBroker
044 * @param remoteBroker
045 */
046 public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
047 super(configuration, localBroker, remoteBroker);
048 }
049
050 @Override
051 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
052 if (addToAlreadyInterestedConsumers(info)) {
053 return null; // don't want this subscription added
054 }
055 //add our original id to ourselves
056 info.addNetworkConsumerId(info.getConsumerId());
057 info.setSelector(null);
058 return doCreateDemandSubscription(info);
059 }
060
061 protected boolean checkPaths(BrokerId[] first, BrokerId[] second) {
062 if (first == null || second == null)
063 return true;
064 if (Arrays.equals(first, second))
065 return true;
066 if (first[0].equals(second[0])
067 && first[first.length - 1].equals(second[second.length - 1]))
068 return false;
069 else
070 return true;
071 }
072
073 protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
074 // search through existing subscriptions and see if we have a match
075 boolean matched = false;
076 for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
077 DemandSubscription ds = (DemandSubscription)i.next();
078 DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
079 if (filter.matches(info.getDestination())) {
080 if (LOG.isDebugEnabled()) {
081 LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
082 + " with sub: " + info.getConsumerId());
083 }
084 // add the interest in the subscription
085 // ds.add(ds.getRemoteInfo().getConsumerId());
086 if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
087 ds.add(info.getConsumerId());
088 }
089 matched = true;
090 // continue - we want interest to any existing
091 // DemandSubscriptions
092 }
093 }
094 return matched;
095 }
096
097 @Override
098 protected void removeDemandSubscription(ConsumerId id) throws IOException {
099 List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>();
100
101 for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
102 DemandSubscription ds = (DemandSubscription)i.next();
103 if (ds.remove(id)) {
104 if (LOG.isDebugEnabled()) {
105 LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo());
106 }
107 }
108 if (ds.isEmpty()) {
109 tmpList.add(ds);
110 }
111 }
112 for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) {
113 DemandSubscription ds = i.next();
114 removeSubscription(ds);
115 if (LOG.isDebugEnabled()) {
116 LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());
117 }
118 }
119
120 }
121
122 }