001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.udp;
018
019 import java.io.EOFException;
020 import java.io.IOException;
021 import java.net.BindException;
022 import java.net.DatagramSocket;
023 import java.net.InetAddress;
024 import java.net.InetSocketAddress;
025 import java.net.SocketAddress;
026 import java.net.SocketException;
027 import java.net.URI;
028 import java.net.UnknownHostException;
029 import java.nio.channels.AsynchronousCloseException;
030 import java.nio.channels.DatagramChannel;
031
032 import org.apache.activemq.Service;
033 import org.apache.activemq.command.Command;
034 import org.apache.activemq.command.Endpoint;
035 import org.apache.activemq.openwire.OpenWireFormat;
036 import org.apache.activemq.transport.Transport;
037 import org.apache.activemq.transport.TransportThreadSupport;
038 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
039 import org.apache.activemq.transport.reliable.ReplayBuffer;
040 import org.apache.activemq.transport.reliable.ReplayStrategy;
041 import org.apache.activemq.transport.reliable.Replayer;
042 import org.apache.activemq.util.InetAddressUtil;
043 import org.apache.activemq.util.IntSequenceGenerator;
044 import org.apache.activemq.util.ServiceStopper;
045 import org.slf4j.Logger;
046 import org.slf4j.LoggerFactory;
047
048 /**
049 * An implementation of the {@link Transport} interface using raw UDP
050 *
051 *
052 */
053 public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
054 private static final Logger LOG = LoggerFactory.getLogger(UdpTransport.class);
055
056 private static final int MAX_BIND_ATTEMPTS = 50;
057 private static final long BIND_ATTEMPT_DELAY = 100;
058
059 private CommandChannel commandChannel;
060 private OpenWireFormat wireFormat;
061 private ByteBufferPool bufferPool;
062 private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy();
063 private ReplayBuffer replayBuffer;
064 private int datagramSize = 4 * 1024;
065 private SocketAddress targetAddress;
066 private SocketAddress originalTargetAddress;
067 private DatagramChannel channel;
068 private boolean trace;
069 private boolean useLocalHost = false;
070 private int port;
071 private int minmumWireFormatVersion;
072 private String description;
073 private IntSequenceGenerator sequenceGenerator;
074 private boolean replayEnabled = true;
075
076 protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
077 this.wireFormat = wireFormat;
078 }
079
080 public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
081 this(wireFormat);
082 this.targetAddress = createAddress(remoteLocation);
083 description = remoteLocation.toString() + "@";
084 }
085
086 public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
087 this(wireFormat);
088 this.targetAddress = socketAddress;
089 this.description = getProtocolName() + "ServerConnection@";
090 }
091
092 /**
093 * Used by the server transport
094 */
095 public UdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
096 this(wireFormat);
097 this.port = port;
098 this.targetAddress = null;
099 this.description = getProtocolName() + "Server@";
100 }
101
102 /**
103 * Creates a replayer for working with the reliable transport
104 */
105 public Replayer createReplayer() throws IOException {
106 if (replayEnabled) {
107 return getCommandChannel();
108 }
109 return null;
110 }
111
112 /**
113 * A one way asynchronous send
114 */
115 public void oneway(Object command) throws IOException {
116 oneway(command, targetAddress);
117 }
118
119 /**
120 * A one way asynchronous send to a given address
121 */
122 public void oneway(Object command, SocketAddress address) throws IOException {
123 if (LOG.isDebugEnabled()) {
124 LOG.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
125 }
126 checkStarted();
127 commandChannel.write((Command)command, address);
128 }
129
130 /**
131 * @return pretty print of 'this'
132 */
133 public String toString() {
134 if (description != null) {
135 return description + port;
136 } else {
137 return getProtocolUriScheme() + targetAddress + "@" + port;
138 }
139 }
140
141 /**
142 * reads packets from a Socket
143 */
144 public void run() {
145 LOG.trace("Consumer thread starting for: " + toString());
146 while (!isStopped()) {
147 try {
148 Command command = commandChannel.read();
149 doConsume(command);
150 } catch (AsynchronousCloseException e) {
151 // DatagramChannel closed
152 try {
153 stop();
154 } catch (Exception e2) {
155 LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
156 }
157 } catch (SocketException e) {
158 // DatagramSocket closed
159 LOG.debug("Socket closed: " + e, e);
160 try {
161 stop();
162 } catch (Exception e2) {
163 LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
164 }
165 } catch (EOFException e) {
166 // DataInputStream closed
167 LOG.debug("Socket closed: " + e, e);
168 try {
169 stop();
170 } catch (Exception e2) {
171 LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
172 }
173 } catch (Exception e) {
174 try {
175 stop();
176 } catch (Exception e2) {
177 LOG.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
178 }
179 if (e instanceof IOException) {
180 onException((IOException)e);
181 } else {
182 LOG.error("Caught: " + e, e);
183 e.printStackTrace();
184 }
185 }
186 }
187 }
188
189 /**
190 * We have received the WireFormatInfo from the server on the actual channel
191 * we should use for all future communication with the server, so lets set
192 * the target to be the actual channel that the server has chosen for us to
193 * talk on.
194 */
195 public void setTargetEndpoint(Endpoint newTarget) {
196 if (newTarget instanceof DatagramEndpoint) {
197 DatagramEndpoint endpoint = (DatagramEndpoint)newTarget;
198 SocketAddress address = endpoint.getAddress();
199 if (address != null) {
200 if (originalTargetAddress == null) {
201 originalTargetAddress = targetAddress;
202 }
203 targetAddress = address;
204 commandChannel.setTargetAddress(address);
205 }
206 }
207 }
208
209 // Properties
210 // -------------------------------------------------------------------------
211 public boolean isTrace() {
212 return trace;
213 }
214
215 public void setTrace(boolean trace) {
216 this.trace = trace;
217 }
218
219 public int getDatagramSize() {
220 return datagramSize;
221 }
222
223 public void setDatagramSize(int datagramSize) {
224 this.datagramSize = datagramSize;
225 }
226
227 public boolean isUseLocalHost() {
228 return useLocalHost;
229 }
230
231 /**
232 * Sets whether 'localhost' or the actual local host name should be used to
233 * make local connections. On some operating systems such as Macs its not
234 * possible to connect as the local host name so localhost is better.
235 */
236 public void setUseLocalHost(boolean useLocalHost) {
237 this.useLocalHost = useLocalHost;
238 }
239
240 public CommandChannel getCommandChannel() throws IOException {
241 if (commandChannel == null) {
242 commandChannel = createCommandChannel();
243 }
244 return commandChannel;
245 }
246
247 /**
248 * Sets the implementation of the command channel to use.
249 */
250 public void setCommandChannel(CommandDatagramChannel commandChannel) {
251 this.commandChannel = commandChannel;
252 }
253
254 public ReplayStrategy getReplayStrategy() {
255 return replayStrategy;
256 }
257
258 /**
259 * Sets the strategy used to replay missed datagrams
260 */
261 public void setReplayStrategy(ReplayStrategy replayStrategy) {
262 this.replayStrategy = replayStrategy;
263 }
264
265 public int getPort() {
266 return port;
267 }
268
269 /**
270 * Sets the port to connect on
271 */
272 public void setPort(int port) {
273 this.port = port;
274 }
275
276 public int getMinmumWireFormatVersion() {
277 return minmumWireFormatVersion;
278 }
279
280 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
281 this.minmumWireFormatVersion = minmumWireFormatVersion;
282 }
283
284 public OpenWireFormat getWireFormat() {
285 return wireFormat;
286 }
287
288 public IntSequenceGenerator getSequenceGenerator() {
289 if (sequenceGenerator == null) {
290 sequenceGenerator = new IntSequenceGenerator();
291 }
292 return sequenceGenerator;
293 }
294
295 public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) {
296 this.sequenceGenerator = sequenceGenerator;
297 }
298
299 public boolean isReplayEnabled() {
300 return replayEnabled;
301 }
302
303 /**
304 * Sets whether or not replay should be enabled when using the reliable
305 * transport. i.e. should we maintain a buffer of messages that can be
306 * replayed?
307 */
308 public void setReplayEnabled(boolean replayEnabled) {
309 this.replayEnabled = replayEnabled;
310 }
311
312 public ByteBufferPool getBufferPool() {
313 if (bufferPool == null) {
314 bufferPool = new DefaultBufferPool();
315 }
316 return bufferPool;
317 }
318
319 public void setBufferPool(ByteBufferPool bufferPool) {
320 this.bufferPool = bufferPool;
321 }
322
323 public ReplayBuffer getReplayBuffer() {
324 return replayBuffer;
325 }
326
327 public void setReplayBuffer(ReplayBuffer replayBuffer) throws IOException {
328 this.replayBuffer = replayBuffer;
329 getCommandChannel().setReplayBuffer(replayBuffer);
330 }
331
332 // Implementation methods
333 // -------------------------------------------------------------------------
334
335 /**
336 * Creates an address from the given URI
337 */
338 protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
339 String host = resolveHostName(remoteLocation.getHost());
340 return new InetSocketAddress(host, remoteLocation.getPort());
341 }
342
343 protected String resolveHostName(String host) throws UnknownHostException {
344 String localName = InetAddressUtil.getLocalHostName();
345 if (localName != null && isUseLocalHost()) {
346 if (localName.equals(host)) {
347 return "localhost";
348 }
349 }
350 return host;
351 }
352
353 protected void doStart() throws Exception {
354 getCommandChannel().start();
355
356 super.doStart();
357 }
358
359 protected CommandChannel createCommandChannel() throws IOException {
360 SocketAddress localAddress = createLocalAddress();
361 channel = DatagramChannel.open();
362
363 channel = connect(channel, targetAddress);
364
365 DatagramSocket socket = channel.socket();
366 bind(socket, localAddress);
367 if (port == 0) {
368 port = socket.getLocalPort();
369 }
370
371 return createCommandDatagramChannel();
372 }
373
374 protected CommandChannel createCommandDatagramChannel() {
375 return new CommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getChannel(), getBufferPool());
376 }
377
378 protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {
379 channel.configureBlocking(true);
380
381 if (LOG.isDebugEnabled()) {
382 LOG.debug("Binding to address: " + localAddress);
383 }
384
385 //
386 // We have noticed that on some platfoms like linux, after you close
387 // down
388 // a previously bound socket, it can take a little while before we can
389 // bind it again.
390 //
391 for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) {
392 try {
393 socket.bind(localAddress);
394 return;
395 } catch (BindException e) {
396 if (i + 1 == MAX_BIND_ATTEMPTS) {
397 throw e;
398 }
399 try {
400 Thread.sleep(BIND_ATTEMPT_DELAY);
401 } catch (InterruptedException e1) {
402 Thread.currentThread().interrupt();
403 throw e;
404 }
405 }
406 }
407
408 }
409
410 protected DatagramChannel connect(DatagramChannel channel, SocketAddress targetAddress2) throws IOException {
411 // TODO
412 // connect to default target address to avoid security checks each time
413 // channel = channel.connect(targetAddress);
414
415 return channel;
416 }
417
418 protected SocketAddress createLocalAddress() {
419 return new InetSocketAddress(port);
420 }
421
422 protected void doStop(ServiceStopper stopper) throws Exception {
423 if (channel != null) {
424 channel.close();
425 }
426 }
427
428 protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
429 return new DatagramHeaderMarshaller();
430 }
431
432 protected String getProtocolName() {
433 return "Udp";
434 }
435
436 protected String getProtocolUriScheme() {
437 return "udp://";
438 }
439
440 protected SocketAddress getTargetAddress() {
441 return targetAddress;
442 }
443
444 protected DatagramChannel getChannel() {
445 return channel;
446 }
447
448 protected void setChannel(DatagramChannel channel) {
449 this.channel = channel;
450 }
451
452 public InetSocketAddress getLocalSocketAddress() {
453 if (channel == null) {
454 return null;
455 } else {
456 return (InetSocketAddress)channel.socket().getLocalSocketAddress();
457 }
458 }
459
460 public String getRemoteAddress() {
461 if (targetAddress != null) {
462 return "" + targetAddress;
463 }
464 return null;
465 }
466
467 public int getReceiveCounter() {
468 if (commandChannel == null) {
469 return 0;
470 }
471 return commandChannel.getReceiveCounter();
472 }
473 }