001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.udp;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.net.UnknownHostException;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import org.apache.activemq.openwire.OpenWireFormat;
027 import org.apache.activemq.transport.CommandJoiner;
028 import org.apache.activemq.transport.InactivityMonitor;
029 import org.apache.activemq.transport.Transport;
030 import org.apache.activemq.transport.TransportFactory;
031 import org.apache.activemq.transport.TransportLoggerFactory;
032 import org.apache.activemq.transport.TransportServer;
033 import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
034 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
035 import org.apache.activemq.transport.reliable.ReliableTransport;
036 import org.apache.activemq.transport.reliable.ReplayStrategy;
037 import org.apache.activemq.transport.reliable.Replayer;
038 import org.apache.activemq.transport.tcp.TcpTransportFactory;
039 import org.apache.activemq.util.IOExceptionSupport;
040 import org.apache.activemq.util.IntSequenceGenerator;
041 import org.apache.activemq.util.IntrospectionSupport;
042 import org.apache.activemq.util.URISupport;
043 import org.apache.activemq.wireformat.WireFormat;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 /**
048 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
049 *
050 */
051 public class UdpTransportFactory extends TransportFactory {
052
053 private static final Logger log = LoggerFactory.getLogger(TcpTransportFactory.class);
054
055 public TransportServer doBind(final URI location) throws IOException {
056 try {
057 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
058 if (options.containsKey("port")) {
059 throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax");
060 }
061 WireFormat wf = createWireFormat(options);
062 int port = location.getPort();
063 OpenWireFormat openWireFormat = asOpenWireFormat(wf);
064 UdpTransport transport = (UdpTransport) createTransport(location.getPort(), wf);
065
066 Transport configuredTransport = configure(transport, wf, options, true);
067 UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy());
068 return server;
069 } catch (URISyntaxException e) {
070 throw IOExceptionSupport.create(e);
071 } catch (Exception e) {
072 throw IOExceptionSupport.create(e);
073 }
074 }
075
076 public Transport configure(Transport transport, WireFormat format, Map options) throws Exception {
077 return configure(transport, format, options, false);
078 }
079
080 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
081 IntrospectionSupport.setProperties(transport, options);
082 final UdpTransport udpTransport = (UdpTransport)transport;
083
084 // deal with fragmentation
085 transport = new CommandJoiner(transport, asOpenWireFormat(format));
086
087 if (udpTransport.isTrace()) {
088 try {
089 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
090 } catch (Throwable e) {
091 log.error("Could not create TransportLogger object for: " + TransportLoggerFactory.defaultLogWriterName + ", reason: " + e, e);
092 }
093 }
094
095 transport = new InactivityMonitor(transport, format);
096
097 if (format instanceof OpenWireFormat) {
098 transport = configureClientSideNegotiator(transport, format, udpTransport);
099 }
100
101 return transport;
102 }
103
104 protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
105 OpenWireFormat wireFormat = asOpenWireFormat(wf);
106 return new UdpTransport(wireFormat, location);
107 }
108
109 protected Transport createTransport(int port, WireFormat wf) throws UnknownHostException, IOException {
110 OpenWireFormat wireFormat = asOpenWireFormat(wf);
111 return new UdpTransport(wireFormat, port);
112 }
113
114 /**
115 * Configures the transport
116 *
117 * @param acceptServer true if this transport is used purely as an 'accept'
118 * transport for new connections which work like TCP
119 * SocketServers where new connections spin up a new separate
120 * UDP transport
121 */
122 protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) throws Exception {
123 IntrospectionSupport.setProperties(transport, options);
124 UdpTransport udpTransport = (UdpTransport)transport;
125
126 OpenWireFormat openWireFormat = asOpenWireFormat(format);
127
128 if (udpTransport.isTrace()) {
129 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
130 }
131
132 transport = new InactivityMonitor(transport, format);
133
134 if (!acceptServer && format instanceof OpenWireFormat) {
135 transport = configureClientSideNegotiator(transport, format, udpTransport);
136 }
137
138 // deal with fragmentation
139
140 if (acceptServer) {
141 // lets not support a buffer of messages to enable reliable
142 // messaging on the 'accept server' transport
143 udpTransport.setReplayEnabled(false);
144
145 // we don't want to do reliable checks on this transport as we
146 // delegate to one that does
147 transport = new CommandJoiner(transport, openWireFormat);
148 return transport;
149 } else {
150 ReliableTransport reliableTransport = new ReliableTransport(transport, udpTransport);
151 Replayer replayer = reliableTransport.getReplayer();
152 reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
153
154 // Joiner must be on outside as the inbound messages must be
155 // processed by the reliable transport first
156 return new CommandJoiner(reliableTransport, openWireFormat);
157 }
158 }
159
160 protected ReplayStrategy createReplayStrategy(Replayer replayer) {
161 if (replayer != null) {
162 return new DefaultReplayStrategy(5);
163 }
164 return new ExceptionIfDroppedReplayStrategy(1);
165 }
166
167 protected ReplayStrategy createReplayStrategy() {
168 return new DefaultReplayStrategy(5);
169 }
170
171 protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
172 return new ResponseRedirectInterceptor(transport, udpTransport);
173 }
174
175 protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
176 OpenWireFormat answer = (OpenWireFormat)wf;
177 return answer;
178 }
179 }