001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.tcp;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.net.UnknownHostException;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import javax.net.ServerSocketFactory;
027 import javax.net.SocketFactory;
028
029 import org.apache.activemq.openwire.OpenWireFormat;
030 import org.apache.activemq.transport.InactivityMonitor;
031 import org.apache.activemq.transport.Transport;
032 import org.apache.activemq.transport.TransportFactory;
033 import org.apache.activemq.transport.TransportLoggerFactory;
034 import org.apache.activemq.transport.TransportServer;
035 import org.apache.activemq.transport.WireFormatNegotiator;
036 import org.apache.activemq.util.IOExceptionSupport;
037 import org.apache.activemq.util.IntrospectionSupport;
038 import org.apache.activemq.util.URISupport;
039 import org.apache.activemq.wireformat.WireFormat;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042
043 /**
044 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
045 *
046 */
047 public class TcpTransportFactory extends TransportFactory {
048 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportFactory.class);
049
050 public TransportServer doBind(final URI location) throws IOException {
051 try {
052 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
053
054 ServerSocketFactory serverSocketFactory = createServerSocketFactory();
055 TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
056 server.setWireFormatFactory(createWireFormatFactory(options));
057 IntrospectionSupport.setProperties(server, options);
058 Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
059 server.setTransportOption(transportOptions);
060 server.bind();
061
062 return server;
063 } catch (URISyntaxException e) {
064 throw IOExceptionSupport.create(e);
065 }
066 }
067
068 /**
069 * Allows subclasses of TcpTransportFactory to create custom instances of
070 * TcpTransportServer.
071 *
072 * @param location
073 * @param serverSocketFactory
074 * @return
075 * @throws IOException
076 * @throws URISyntaxException
077 */
078 protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
079 return new TcpTransportServer(this, location, serverSocketFactory);
080 }
081
082 @SuppressWarnings("rawtypes")
083 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
084
085 TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
086 IntrospectionSupport.setProperties(tcpTransport, options);
087
088 Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
089 tcpTransport.setSocketOptions(socketOptions);
090
091 if (tcpTransport.isTrace()) {
092 try {
093 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, tcpTransport.getLogWriterName(),
094 tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
095 } catch (Throwable e) {
096 LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
097 }
098 }
099
100 boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
101 if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
102 transport = createInactivityMonitor(transport, format);
103 IntrospectionSupport.setProperties(transport, options);
104 }
105
106 // Only need the WireFormatNegotiator if using openwire
107 if (format instanceof OpenWireFormat) {
108 transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
109 }
110
111 return super.compositeConfigure(transport, format, options);
112 }
113
114 /**
115 * Returns true if the inactivity monitor should be used on the transport
116 */
117 protected boolean isUseInactivityMonitor(Transport transport) {
118 return true;
119 }
120
121 protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
122 URI localLocation = null;
123 String path = location.getPath();
124 // see if the path is a local URI location
125 if (path != null && path.length() > 0) {
126 int localPortIndex = path.indexOf(':');
127 try {
128 Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
129 String localString = location.getScheme() + ":/" + path;
130 localLocation = new URI(localString);
131 } catch (Exception e) {
132 LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage());
133 if(LOG.isDebugEnabled()) {
134 LOG.debug("Failure detail", e);
135 }
136 }
137 }
138 SocketFactory socketFactory = createSocketFactory();
139 return createTcpTransport(wf, socketFactory, location, localLocation);
140 }
141
142 /**
143 * Allows subclasses of TcpTransportFactory to provide a create custom
144 * TcpTransport intances.
145 *
146 * @param location
147 * @param wf
148 * @param socketFactory
149 * @param localLocation
150 * @return
151 * @throws UnknownHostException
152 * @throws IOException
153 */
154 protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
155 return new TcpTransport(wf, socketFactory, location, localLocation);
156 }
157
158 protected ServerSocketFactory createServerSocketFactory() throws IOException {
159 return ServerSocketFactory.getDefault();
160 }
161
162 protected SocketFactory createSocketFactory() throws IOException {
163 return SocketFactory.getDefault();
164 }
165
166 protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
167 return new InactivityMonitor(transport, format);
168 }
169 }