001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.net.URI;
020 import java.net.URISyntaxException;
021 import java.util.Collection;
022 import java.util.HashMap;
023 import java.util.HashSet;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Set;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.CopyOnWriteArrayList;
029
030 import javax.management.MalformedObjectNameException;
031 import javax.management.ObjectName;
032
033 import org.apache.activemq.Service;
034 import org.apache.activemq.broker.BrokerService;
035 import org.apache.activemq.broker.jmx.AnnotatedMBean;
036 import org.apache.activemq.broker.jmx.NetworkBridgeView;
037 import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
038 import org.apache.activemq.command.ActiveMQDestination;
039 import org.apache.activemq.command.ConsumerId;
040 import org.apache.activemq.transport.Transport;
041 import org.apache.activemq.transport.TransportFactory;
042 import org.apache.activemq.util.JMXSupport;
043 import org.apache.activemq.util.ServiceStopper;
044 import org.apache.activemq.util.ServiceSupport;
045 import org.slf4j.Logger;
046 import org.slf4j.LoggerFactory;
047
048 /**
049 *
050 */
051 public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
052
053 private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
054 protected URI localURI;
055 protected ConnectionFilter connectionFilter;
056 protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
057
058 protected ServiceSupport serviceSupport = new ServiceSupport() {
059
060 protected void doStart() throws Exception {
061 handleStart();
062 }
063
064 protected void doStop(ServiceStopper stopper) throws Exception {
065 handleStop(stopper);
066 }
067 };
068
069 private Set<ActiveMQDestination> durableDestinations;
070 private List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
071 private List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
072 private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
073 private BrokerService brokerService;
074 private ObjectName objectName;
075
076 public NetworkConnector() {
077 }
078
079 public NetworkConnector(URI localURI) {
080 this.localURI = localURI;
081 }
082
083 public URI getLocalUri() throws URISyntaxException {
084 return localURI;
085 }
086
087 public void setLocalUri(URI localURI) {
088 this.localURI = localURI;
089 }
090
091 /**
092 * @return Returns the durableDestinations.
093 */
094 public Set getDurableDestinations() {
095 return durableDestinations;
096 }
097
098 /**
099 * @param durableDestinations The durableDestinations to set.
100 */
101 public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) {
102 this.durableDestinations = durableDestinations;
103 }
104
105 /**
106 * @return Returns the excludedDestinations.
107 */
108 public List<ActiveMQDestination> getExcludedDestinations() {
109 return excludedDestinations;
110 }
111
112 /**
113 * @param excludedDestinations The excludedDestinations to set.
114 */
115 public void setExcludedDestinations(List<ActiveMQDestination> excludedDestinations) {
116 this.excludedDestinations = excludedDestinations;
117 }
118
119 public void addExcludedDestination(ActiveMQDestination destiantion) {
120 this.excludedDestinations.add(destiantion);
121 }
122
123 /**
124 * @return Returns the staticallyIncludedDestinations.
125 */
126 public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
127 return staticallyIncludedDestinations;
128 }
129
130 /**
131 * @param staticallyIncludedDestinations The staticallyIncludedDestinations
132 * to set.
133 */
134 public void setStaticallyIncludedDestinations(List<ActiveMQDestination> staticallyIncludedDestinations) {
135 this.staticallyIncludedDestinations = staticallyIncludedDestinations;
136 }
137
138 public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
139 this.staticallyIncludedDestinations.add(destiantion);
140 }
141
142 /**
143 * @return Returns the dynamicallyIncludedDestinations.
144 */
145 public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
146 return dynamicallyIncludedDestinations;
147 }
148
149 /**
150 * @param dynamicallyIncludedDestinations The
151 * dynamicallyIncludedDestinations to set.
152 */
153 public void setDynamicallyIncludedDestinations(List<ActiveMQDestination> dynamicallyIncludedDestinations) {
154 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
155 }
156
157 public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
158 this.dynamicallyIncludedDestinations.add(destiantion);
159 }
160
161 public ConnectionFilter getConnectionFilter() {
162 return connectionFilter;
163 }
164
165 public void setConnectionFilter(ConnectionFilter connectionFilter) {
166 this.connectionFilter = connectionFilter;
167 }
168
169 // Implementation methods
170 // -------------------------------------------------------------------------
171 protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) {
172 List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations();
173 ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]);
174 result.setDynamicallyIncludedDestinations(dests);
175 destsList = getExcludedDestinations();
176 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
177 result.setExcludedDestinations(dests);
178 destsList = getStaticallyIncludedDestinations();
179 dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
180 result.setStaticallyIncludedDestinations(dests);
181 if (durableDestinations != null) {
182
183 HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>();
184 for (ActiveMQDestination d : durableDestinations) {
185 if( d.isTopic() ) {
186 topics.add(d);
187 }
188 }
189
190 ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
191 dest = (ActiveMQDestination[])topics.toArray(dest);
192 result.setDurableDestinations(dest);
193 }
194 return result;
195 }
196
197 protected Transport createLocalTransport() throws Exception {
198 return TransportFactory.connect(localURI);
199 }
200
201 public void start() throws Exception {
202 serviceSupport.start();
203 }
204
205 public void stop() throws Exception {
206 serviceSupport.stop();
207 }
208
209 protected void handleStart() throws Exception {
210 if (localURI == null) {
211 throw new IllegalStateException("You must configure the 'localURI' property");
212 }
213 LOG.info("Network Connector " + this + " Started");
214 }
215
216 protected void handleStop(ServiceStopper stopper) throws Exception {
217 LOG.info("Network Connector " + this + " Stopped");
218 }
219
220 public boolean isStarted() {
221 return serviceSupport.isStarted();
222 }
223
224 public boolean isStopped() {
225 return serviceSupport.isStopped();
226 }
227
228 public boolean isStopping() {
229 return serviceSupport.isStopping();
230 }
231
232 public ObjectName getObjectName() {
233 return objectName;
234 }
235
236 public void setObjectName(ObjectName objectName) {
237 this.objectName = objectName;
238 }
239
240 public BrokerService getBrokerService() {
241 return brokerService;
242 }
243
244 public void setBrokerService(BrokerService brokerService) {
245 this.brokerService = brokerService;
246 }
247
248 protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
249 if (!getBrokerService().isUseJmx()) {
250 return;
251 }
252 NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
253 try {
254 ObjectName objectName = createNetworkBridgeObjectName(bridge);
255 AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName);
256 } catch (Throwable e) {
257 LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
258 }
259 }
260
261 protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
262 if (!getBrokerService().isUseJmx()) {
263 return;
264 }
265 try {
266 ObjectName objectName = createNetworkBridgeObjectName(bridge);
267 getBrokerService().getManagementContext().unregisterMBean(objectName);
268 } catch (Throwable e) {
269 LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
270 }
271 }
272
273
274 @SuppressWarnings("unchecked")
275 protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
276 ObjectName connectorName = getObjectName();
277 Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
278 return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + "," + "Type=NetworkBridge,"
279 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + "," + "Name="
280 + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
281 }
282
283 // ask all the bridges as we can't know to which this consumer is tied
284 public boolean removeDemandSubscription(ConsumerId consumerId) {
285 boolean removeSucceeded = false;
286 for (NetworkBridge bridge : bridges.values()) {
287 if (bridge instanceof DemandForwardingBridgeSupport) {
288 DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge;
289 if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) {
290 removeSucceeded = true;
291 break;
292 }
293 }
294 }
295 return removeSucceeded;
296 }
297
298 public Collection<NetworkBridge> activeBridges() {
299 return bridges.values();
300 }
301
302 }