001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.discovery;
018
019 import java.net.URI;
020 import java.net.URISyntaxException;
021 import java.util.Map;
022 import java.util.concurrent.ConcurrentHashMap;
023 import org.apache.activemq.command.DiscoveryEvent;
024 import org.apache.activemq.transport.CompositeTransport;
025 import org.apache.activemq.transport.TransportFilter;
026 import org.apache.activemq.util.ServiceStopper;
027 import org.apache.activemq.util.URISupport;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 /**
032 * A {@link ReliableTransportChannel} which uses a {@link DiscoveryAgent} to
033 * discover remote broker instances and dynamically connect to them.
034 *
035 *
036 */
037 public class DiscoveryTransport extends TransportFilter implements DiscoveryListener {
038
039 private static final Logger LOG = LoggerFactory.getLogger(DiscoveryTransport.class);
040
041 private final CompositeTransport next;
042 private DiscoveryAgent discoveryAgent;
043 private final ConcurrentHashMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>();
044
045 private Map<String, String> parameters;
046
047 public DiscoveryTransport(CompositeTransport next) {
048 super(next);
049 this.next = next;
050 }
051
052 @Override
053 public void start() throws Exception {
054 if (discoveryAgent == null) {
055 throw new IllegalStateException("discoveryAgent not configured");
056 }
057
058 // lets pass into the agent the broker name and connection details
059 discoveryAgent.setDiscoveryListener(this);
060 discoveryAgent.start();
061 next.start();
062 }
063
064 @Override
065 public void stop() throws Exception {
066 ServiceStopper ss = new ServiceStopper();
067 ss.stop(discoveryAgent);
068 ss.stop(next);
069 ss.throwFirstException();
070 }
071
072 public void onServiceAdd(DiscoveryEvent event) {
073 String url = event.getServiceName();
074 if (url != null) {
075 try {
076 URI uri = new URI(url);
077 LOG.info("Adding new broker connection URL: " + uri);
078 uri = URISupport.applyParameters(uri, parameters, DISCOVERED_OPTION_PREFIX);
079 serviceURIs.put(event.getServiceName(), uri);
080 next.add(false,new URI[] {uri});
081 } catch (URISyntaxException e) {
082 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
083 }
084 }
085 }
086
087 public void onServiceRemove(DiscoveryEvent event) {
088 URI uri = serviceURIs.get(event.getServiceName());
089 if (uri != null) {
090 next.remove(false,new URI[] {uri});
091 }
092 }
093
094 public DiscoveryAgent getDiscoveryAgent() {
095 return discoveryAgent;
096 }
097
098 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
099 this.discoveryAgent = discoveryAgent;
100 }
101
102 public void setParameters(Map<String, String> parameters) {
103 this.parameters = parameters;
104 }
105
106 }