001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.vm;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.HashMap;
023 import java.util.Map;
024 import java.util.concurrent.ConcurrentHashMap;
025
026 import org.apache.activemq.broker.BrokerFactory;
027 import org.apache.activemq.broker.BrokerFactoryHandler;
028 import org.apache.activemq.broker.BrokerRegistry;
029 import org.apache.activemq.broker.BrokerService;
030 import org.apache.activemq.broker.TransportConnector;
031 import org.apache.activemq.transport.MarshallingTransportFilter;
032 import org.apache.activemq.transport.Transport;
033 import org.apache.activemq.transport.TransportFactory;
034 import org.apache.activemq.transport.TransportServer;
035 import org.apache.activemq.util.IOExceptionSupport;
036 import org.apache.activemq.util.IntrospectionSupport;
037 import org.apache.activemq.util.ServiceSupport;
038 import org.apache.activemq.util.URISupport;
039 import org.apache.activemq.util.URISupport.CompositeData;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042 import org.slf4j.MDC;
043
044 public class VMTransportFactory extends TransportFactory {
045
046 public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
047 public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
048 public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
049 private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class);
050
051 BrokerFactoryHandler brokerFactoryHandler;
052
053 public Transport doConnect(URI location) throws Exception {
054 return VMTransportServer.configure(doCompositeConnect(location));
055 }
056
057 public Transport doCompositeConnect(URI location) throws Exception {
058 URI brokerURI;
059 String host;
060 Map<String, String> options;
061 boolean create = true;
062 int waitForStart = -1;
063 CompositeData data = URISupport.parseComposite(location);
064 if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
065 brokerURI = data.getComponents()[0];
066 CompositeData brokerData = URISupport.parseComposite(brokerURI);
067 host = (String)brokerData.getParameters().get("brokerName");
068 if (host == null) {
069 host = "localhost";
070 }
071 if (brokerData.getPath() != null) {
072 host = brokerData.getPath();
073 }
074 options = data.getParameters();
075 location = new URI("vm://" + host);
076 } else {
077 // If using the less complex vm://localhost?broker.persistent=true
078 // form
079 try {
080 host = extractHost(location);
081 options = URISupport.parseParameters(location);
082 String config = (String)options.remove("brokerConfig");
083 if (config != null) {
084 brokerURI = new URI(config);
085 } else {
086 Map brokerOptions = IntrospectionSupport.extractProperties(options, "broker.");
087 brokerURI = new URI("broker://()/" + host + "?"
088 + URISupport.createQueryString(brokerOptions));
089 }
090 if ("false".equals(options.remove("create"))) {
091 create = false;
092 }
093 String waitForStartString = options.remove("waitForStart");
094 if (waitForStartString != null) {
095 waitForStart = Integer.parseInt(waitForStartString);
096 }
097 } catch (URISyntaxException e1) {
098 throw IOExceptionSupport.create(e1);
099 }
100 location = new URI("vm://" + host);
101 }
102 if (host == null) {
103 host = "localhost";
104 }
105 VMTransportServer server = SERVERS.get(host);
106 // validate the broker is still active
107 if (!validateBroker(host) || server == null) {
108 BrokerService broker = null;
109 // Synchronize on the registry so that multiple concurrent threads
110 // doing this do not think that the broker has not been created and
111 // cause multiple brokers to be started.
112 synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
113 broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
114 if (broker == null) {
115 if (!create) {
116 throw new IOException("Broker named '" + host + "' does not exist.");
117 }
118 try {
119 if (brokerFactoryHandler != null) {
120 broker = brokerFactoryHandler.createBroker(brokerURI);
121 } else {
122 broker = BrokerFactory.createBroker(brokerURI);
123 }
124 broker.start();
125 MDC.put("activemq.broker", broker.getBrokerName());
126 } catch (URISyntaxException e) {
127 throw IOExceptionSupport.create(e);
128 }
129 BROKERS.put(host, broker);
130 BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
131 }
132
133 server = SERVERS.get(host);
134 if (server == null) {
135 server = (VMTransportServer)bind(location, true);
136 TransportConnector connector = new TransportConnector(server);
137 connector.setBrokerService(broker);
138 connector.setUri(location);
139 connector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
140 connector.start();
141 CONNECTORS.put(host, connector);
142 }
143
144 }
145 }
146
147 VMTransport vmtransport = server.connect();
148 IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options));
149 IntrospectionSupport.setProperties(vmtransport, options);
150 Transport transport = vmtransport;
151 if (vmtransport.isMarshal()) {
152 Map<String, String> optionsCopy = new HashMap<String, String>(options);
153 transport = new MarshallingTransportFilter(transport, createWireFormat(options),
154 createWireFormat(optionsCopy));
155 }
156 if (!options.isEmpty()) {
157 throw new IllegalArgumentException("Invalid connect parameters: " + options);
158 }
159 return transport;
160 }
161
162 private static String extractHost(URI location) {
163 String host = location.getHost();
164 if (host == null || host.length() == 0) {
165 host = location.getAuthority();
166 if (host == null || host.length() == 0) {
167 host = "localhost";
168 }
169 }
170 return host;
171 }
172
173 /**
174 * @param registry
175 * @param brokerName
176 * @param waitForStart - time in milliseconds to wait for a broker to appear
177 * @return
178 */
179 private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
180 BrokerService broker = null;
181 synchronized(registry.getRegistryMutext()) {
182 broker = registry.lookup(brokerName);
183 if (broker == null && waitForStart > 0) {
184 final long expiry = System.currentTimeMillis() + waitForStart;
185 while (broker == null && expiry > System.currentTimeMillis()) {
186 long timeout = Math.max(0, expiry - System.currentTimeMillis());
187 try {
188 LOG.debug("waiting for broker named: " + brokerName + " to start");
189 registry.getRegistryMutext().wait(timeout);
190 } catch (InterruptedException ignored) {
191 }
192 broker = registry.lookup(brokerName);
193 }
194 }
195 }
196 return broker;
197 }
198
199 public TransportServer doBind(URI location) throws IOException {
200 return bind(location, false);
201 }
202
203 /**
204 * @param location
205 * @return the TransportServer
206 * @throws IOException
207 */
208 private TransportServer bind(URI location, boolean dispose) throws IOException {
209 String host = extractHost(location);
210 LOG.debug("binding to broker: " + host);
211 VMTransportServer server = new VMTransportServer(location, dispose);
212 Object currentBoundValue = SERVERS.get(host);
213 if (currentBoundValue != null) {
214 throw new IOException("VMTransportServer already bound at: " + location);
215 }
216 SERVERS.put(host, server);
217 return server;
218 }
219
220 public static void stopped(VMTransportServer server) {
221 String host = extractHost(server.getBindURI());
222 stopped(host);
223 }
224
225 public static void stopped(String host) {
226 SERVERS.remove(host);
227 TransportConnector connector = CONNECTORS.remove(host);
228 if (connector != null) {
229 LOG.debug("Shutting down VM connectors for broker: " + host);
230 ServiceSupport.dispose(connector);
231 BrokerService broker = BROKERS.remove(host);
232 if (broker != null) {
233 ServiceSupport.dispose(broker);
234 }
235 MDC.remove("activemq.broker");
236 }
237 }
238
239 public BrokerFactoryHandler getBrokerFactoryHandler() {
240 return brokerFactoryHandler;
241 }
242
243 public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) {
244 this.brokerFactoryHandler = brokerFactoryHandler;
245 }
246
247 private boolean validateBroker(String host) {
248 boolean result = true;
249 if (BROKERS.containsKey(host) || SERVERS.containsKey(host) || CONNECTORS.containsKey(host)) {
250 // check the broker is still in the BrokerRegistry
251 TransportConnector connector = CONNECTORS.get(host);
252 if (BrokerRegistry.getInstance().lookup(host) == null
253 || (connector != null && connector.getBroker().isStopped())) {
254 result = false;
255 // clean-up
256 BROKERS.remove(host);
257 SERVERS.remove(host);
258 if (connector != null) {
259 CONNECTORS.remove(host);
260 if (connector != null) {
261 ServiceSupport.dispose(connector);
262 }
263 }
264 }
265 }
266 return result;
267 }
268 }