001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020 import java.net.MalformedURLException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023 import java.net.UnknownHostException;
024 import java.util.HashMap;
025 import java.util.Map;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.Executor;
028
029 import org.apache.activemq.broker.BrokerService;
030 import org.apache.activemq.broker.BrokerServiceAware;
031 import org.apache.activemq.broker.SslContext;
032 import org.apache.activemq.util.FactoryFinder;
033 import org.apache.activemq.util.IOExceptionSupport;
034 import org.apache.activemq.util.IntrospectionSupport;
035 import org.apache.activemq.util.URISupport;
036 import org.apache.activemq.wireformat.WireFormat;
037 import org.apache.activemq.wireformat.WireFormatFactory;
038
039 public abstract class TransportFactory {
040
041 private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
042 private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
043 private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
044
045 private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
046 private static final String THREAD_NAME_FILTER = "threadName";
047
048 public abstract TransportServer doBind(URI location) throws IOException;
049
050 public Transport doConnect(URI location, Executor ex) throws Exception {
051 return doConnect(location);
052 }
053
054 public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
055 return doCompositeConnect(location);
056 }
057
058 /**
059 * Creates a normal transport.
060 *
061 * @param location
062 * @return the transport
063 * @throws Exception
064 */
065 public static Transport connect(URI location) throws Exception {
066 TransportFactory tf = findTransportFactory(location);
067 return tf.doConnect(location);
068 }
069
070 /**
071 * Creates a normal transport.
072 *
073 * @param location
074 * @param ex
075 * @return the transport
076 * @throws Exception
077 */
078 public static Transport connect(URI location, Executor ex) throws Exception {
079 TransportFactory tf = findTransportFactory(location);
080 return tf.doConnect(location, ex);
081 }
082
083 /**
084 * Creates a slimmed down transport that is more efficient so that it can be
085 * used by composite transports like reliable and HA.
086 *
087 * @param location
088 * @return the Transport
089 * @throws Exception
090 */
091 public static Transport compositeConnect(URI location) throws Exception {
092 TransportFactory tf = findTransportFactory(location);
093 return tf.doCompositeConnect(location);
094 }
095
096 /**
097 * Creates a slimmed down transport that is more efficient so that it can be
098 * used by composite transports like reliable and HA.
099 *
100 * @param location
101 * @param ex
102 * @return the Transport
103 * @throws Exception
104 */
105 public static Transport compositeConnect(URI location, Executor ex) throws Exception {
106 TransportFactory tf = findTransportFactory(location);
107 return tf.doCompositeConnect(location, ex);
108 }
109
110 public static TransportServer bind(URI location) throws IOException {
111 TransportFactory tf = findTransportFactory(location);
112 return tf.doBind(location);
113 }
114
115 /**
116 * @deprecated
117 */
118 public static TransportServer bind(String brokerId, URI location) throws IOException {
119 return bind(location);
120 }
121
122 public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
123 TransportFactory tf = findTransportFactory(location);
124 if( brokerService!=null && tf instanceof BrokerServiceAware ) {
125 ((BrokerServiceAware)tf).setBrokerService(brokerService);
126 }
127 try {
128 if( brokerService!=null ) {
129 SslContext.setCurrentSslContext(brokerService.getSslContext());
130 }
131 return tf.doBind(location);
132 } finally {
133 SslContext.setCurrentSslContext(null);
134 }
135 }
136
137 public Transport doConnect(URI location) throws Exception {
138 try {
139 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
140 WireFormat wf = createWireFormat(options);
141 Transport transport = createTransport(location, wf);
142 Transport rc = configure(transport, wf, options);
143 if (!options.isEmpty()) {
144 throw new IllegalArgumentException("Invalid connect parameters: " + options);
145 }
146 return rc;
147 } catch (URISyntaxException e) {
148 throw IOExceptionSupport.create(e);
149 }
150 }
151
152 public Transport doCompositeConnect(URI location) throws Exception {
153 try {
154 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
155 WireFormat wf = createWireFormat(options);
156 Transport transport = createTransport(location, wf);
157 Transport rc = compositeConfigure(transport, wf, options);
158 if (!options.isEmpty()) {
159 throw new IllegalArgumentException("Invalid connect parameters: " + options);
160 }
161 return rc;
162
163 } catch (URISyntaxException e) {
164 throw IOExceptionSupport.create(e);
165 }
166 }
167
168 /**
169 * Allow registration of a transport factory without wiring via META-INF classes
170 * @param scheme
171 * @param tf
172 */
173 public static void registerTransportFactory(String scheme, TransportFactory tf) {
174 TRANSPORT_FACTORYS.put(scheme, tf);
175 }
176
177 /**
178 * Factory method to create a new transport
179 *
180 * @throws IOException
181 * @throws UnknownHostException
182 */
183 protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
184 throw new IOException("createTransport() method not implemented!");
185 }
186
187 /**
188 * @param location
189 * @return
190 * @throws IOException
191 */
192 private static TransportFactory findTransportFactory(URI location) throws IOException {
193 String scheme = location.getScheme();
194 if (scheme == null) {
195 throw new IOException("Transport not scheme specified: [" + location + "]");
196 }
197 TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
198 if (tf == null) {
199 // Try to load if from a META-INF property.
200 try {
201 tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
202 TRANSPORT_FACTORYS.put(scheme, tf);
203 } catch (Throwable e) {
204 throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
205 }
206 }
207 return tf;
208 }
209
210 protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
211 WireFormatFactory factory = createWireFormatFactory(options);
212 WireFormat format = factory.createWireFormat();
213 return format;
214 }
215
216 protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
217 String wireFormat = (String)options.remove("wireFormat");
218 if (wireFormat == null) {
219 wireFormat = getDefaultWireFormatType();
220 }
221
222 try {
223 WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
224 IntrospectionSupport.setProperties(wff, options, "wireFormat.");
225 return wff;
226 } catch (Throwable e) {
227 throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
228 }
229 }
230
231 protected String getDefaultWireFormatType() {
232 return "default";
233 }
234
235 /**
236 * Fully configures and adds all need transport filters so that the
237 * transport can be used by the JMS client.
238 *
239 * @param transport
240 * @param wf
241 * @param options
242 * @return
243 * @throws Exception
244 */
245 @SuppressWarnings("rawtypes")
246 public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
247 transport = compositeConfigure(transport, wf, options);
248
249 transport = new MutexTransport(transport);
250 transport = new ResponseCorrelator(transport);
251
252 return transport;
253 }
254
255 /**
256 * Fully configures and adds all need transport filters so that the
257 * transport can be used by the ActiveMQ message broker. The main difference
258 * between this and the configure() method is that the broker does not issue
259 * requests to the client so the ResponseCorrelator is not needed.
260 *
261 * @param transport
262 * @param format
263 * @param options
264 * @return
265 * @throws Exception
266 */
267 @SuppressWarnings("rawtypes")
268 public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
269 if (options.containsKey(THREAD_NAME_FILTER)) {
270 transport = new ThreadNameFilter(transport);
271 }
272 transport = compositeConfigure(transport, format, options);
273 transport = new MutexTransport(transport);
274 return transport;
275 }
276
277 /**
278 * Similar to configure(...) but this avoid adding in the MutexTransport and
279 * ResponseCorrelator transport layers so that the resulting transport can
280 * more efficiently be used as part of a composite transport.
281 *
282 * @param transport
283 * @param format
284 * @param options
285 * @return
286 */
287 @SuppressWarnings("rawtypes")
288 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
289 if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
290 transport = new WriteTimeoutFilter(transport);
291 String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER);
292 if (soWriteTimeout!=null) {
293 ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
294 }
295 }
296 IntrospectionSupport.setProperties(transport, options);
297 return transport;
298 }
299
300 @SuppressWarnings("rawtypes")
301 protected String getOption(Map options, String key, String def) {
302 String rc = (String) options.remove(key);
303 if( rc == null ) {
304 rc = def;
305 }
306 return rc;
307 }
308 }