001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.udp;
018
019 import java.io.IOException;
020 import java.net.InetSocketAddress;
021 import java.net.SocketAddress;
022 import java.net.URI;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import org.apache.activemq.command.BrokerInfo;
027 import org.apache.activemq.command.Command;
028 import org.apache.activemq.openwire.OpenWireFormat;
029 import org.apache.activemq.transport.CommandJoiner;
030 import org.apache.activemq.transport.InactivityMonitor;
031 import org.apache.activemq.transport.Transport;
032 import org.apache.activemq.transport.TransportListener;
033 import org.apache.activemq.transport.TransportServer;
034 import org.apache.activemq.transport.TransportServerSupport;
035 import org.apache.activemq.transport.reliable.ReliableTransport;
036 import org.apache.activemq.transport.reliable.ReplayStrategy;
037 import org.apache.activemq.transport.reliable.Replayer;
038 import org.apache.activemq.util.ServiceStopper;
039 import org.slf4j.Logger;
040 import org.slf4j.LoggerFactory;
041
042 /**
043 * A UDP based implementation of {@link TransportServer}
044 *
045 *
046 */
047
048 public class UdpTransportServer extends TransportServerSupport {
049 private static final Logger LOG = LoggerFactory.getLogger(UdpTransportServer.class);
050
051 private UdpTransport serverTransport;
052 private ReplayStrategy replayStrategy;
053 private Transport configuredTransport;
054 private boolean usingWireFormatNegotiation;
055 private Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>();
056
057 public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
058 super(connectURI);
059 this.serverTransport = serverTransport;
060 this.configuredTransport = configuredTransport;
061 this.replayStrategy = replayStrategy;
062 }
063
064 public String toString() {
065 return "UdpTransportServer@" + serverTransport;
066 }
067
068 public void run() {
069 }
070
071 public UdpTransport getServerTransport() {
072 return serverTransport;
073 }
074
075 public void setBrokerInfo(BrokerInfo brokerInfo) {
076 }
077
078 protected void doStart() throws Exception {
079 LOG.info("Starting " + this);
080
081 configuredTransport.setTransportListener(new TransportListener() {
082 public void onCommand(Object o) {
083 final Command command = (Command)o;
084 processInboundConnection(command);
085 }
086
087 public void onException(IOException error) {
088 LOG.error("Caught: " + error, error);
089 }
090
091 public void transportInterupted() {
092 }
093
094 public void transportResumed() {
095 }
096 });
097 configuredTransport.start();
098 }
099
100 protected void doStop(ServiceStopper stopper) throws Exception {
101 configuredTransport.stop();
102 }
103
104 protected void processInboundConnection(Command command) {
105 DatagramEndpoint endpoint = (DatagramEndpoint)command.getFrom();
106 if (LOG.isDebugEnabled()) {
107 LOG.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command);
108 }
109 Transport transport = null;
110 synchronized (transports) {
111 transport = transports.get(endpoint);
112 if (transport == null) {
113 if (usingWireFormatNegotiation && !command.isWireFormatInfo()) {
114 LOG.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command);
115 } else {
116 if (LOG.isDebugEnabled()) {
117 LOG.debug("Creating a new UDP server connection");
118 }
119 try {
120 transport = createTransport(command, endpoint);
121 transport = configureTransport(transport);
122 transports.put(endpoint, transport);
123 } catch (IOException e) {
124 LOG.error("Caught: " + e, e);
125 getAcceptListener().onAcceptError(e);
126 }
127 }
128 } else {
129 LOG.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command);
130 }
131 }
132 }
133
134 protected Transport configureTransport(Transport transport) {
135 transport = new InactivityMonitor(transport, serverTransport.getWireFormat());
136 getAcceptListener().onAccept(transport);
137 return transport;
138 }
139
140 protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException {
141 if (endpoint == null) {
142 throw new IOException("No endpoint available for command: " + command);
143 }
144 final SocketAddress address = endpoint.getAddress();
145 final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
146 final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
147
148 final ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
149 reliableTransport.getReplayer();
150 reliableTransport.setReplayStrategy(replayStrategy);
151
152 // Joiner must be on outside as the inbound messages must be processed
153 // by the reliable transport first
154 return new CommandJoiner(reliableTransport, connectionWireFormat) {
155 public void start() throws Exception {
156 super.start();
157 reliableTransport.onCommand(command);
158 }
159 };
160
161 /**
162 * final WireFormatNegotiator wireFormatNegotiator = new
163 * WireFormatNegotiator(configuredTransport, transport.getWireFormat(),
164 * serverTransport .getMinmumWireFormatVersion()) { public void start()
165 * throws Exception { super.start(); log.debug("Starting a new server
166 * transport: " + this + " with command: " + command);
167 * onCommand(command); } // lets use the specific addressing of wire
168 * format protected void sendWireFormat(WireFormatInfo info) throws
169 * IOException { log.debug("#### we have negotiated the wireformat;
170 * sending a wireformat to: " + address); transport.oneway(info,
171 * address); } }; return wireFormatNegotiator;
172 */
173 }
174
175 public InetSocketAddress getSocketAddress() {
176 return serverTransport.getLocalSocketAddress();
177 }
178 }