001 /**
002 gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.tcp;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.io.InterruptedIOException;
023 import java.net.InetAddress;
024 import java.net.InetSocketAddress;
025 import java.net.Socket;
026 import java.net.SocketAddress;
027 import java.net.SocketException;
028 import java.net.SocketTimeoutException;
029 import java.net.URI;
030 import java.net.UnknownHostException;
031 import java.util.HashMap;
032 import java.util.Map;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.atomic.AtomicReference;
036
037 import javax.net.SocketFactory;
038
039 import org.apache.activemq.Service;
040 import org.apache.activemq.thread.DefaultThreadPools;
041 import org.apache.activemq.transport.Transport;
042 import org.apache.activemq.transport.TransportLoggerFactory;
043 import org.apache.activemq.transport.TransportThreadSupport;
044 import org.apache.activemq.util.InetAddressUtil;
045 import org.apache.activemq.util.IntrospectionSupport;
046 import org.apache.activemq.util.ServiceStopper;
047 import org.apache.activemq.wireformat.WireFormat;
048 import org.slf4j.Logger;
049 import org.slf4j.LoggerFactory;
050
051 /**
052 * An implementation of the {@link Transport} interface using raw tcp/ip
053 *
054 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
055 *
056 */
057 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
058 private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
059 protected final URI remoteLocation;
060 protected final URI localLocation;
061 protected final WireFormat wireFormat;
062
063 protected int connectionTimeout = 30000;
064 protected int soTimeout;
065 protected int socketBufferSize = 64 * 1024;
066 protected int ioBufferSize = 8 * 1024;
067 protected boolean closeAsync=true;
068 protected Socket socket;
069 protected DataOutputStream dataOut;
070 protected DataInputStream dataIn;
071 protected TimeStampStream buffOut = null;
072
073 /**
074 * The Traffic Class to be set on the socket.
075 */
076 protected int trafficClass = 0;
077 /**
078 * Keeps track of attempts to set the Traffic Class on the socket.
079 */
080 private boolean trafficClassSet = false;
081 /**
082 * Prevents setting both the Differentiated Services and Type of Service
083 * transport options at the same time, since they share the same spot in
084 * the TCP/IP packet headers.
085 */
086 protected boolean diffServChosen = false;
087 protected boolean typeOfServiceChosen = false;
088 /**
089 * trace=true -> the Transport stack where this TcpTransport
090 * object will be, will have a TransportLogger layer
091 * trace=false -> the Transport stack where this TcpTransport
092 * object will be, will NOT have a TransportLogger layer, and therefore
093 * will never be able to print logging messages.
094 * This parameter is most probably set in Connection or TransportConnector URIs.
095 */
096 protected boolean trace = false;
097 /**
098 * Name of the LogWriter implementation to use.
099 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
100 * This parameter is most probably set in Connection or TransportConnector URIs.
101 */
102 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
103 /**
104 * Specifies if the TransportLogger will be manageable by JMX or not.
105 * Also, as long as there is at least 1 TransportLogger which is manageable,
106 * a TransportLoggerControl MBean will me created.
107 */
108 protected boolean dynamicManagement = false;
109 /**
110 * startLogging=true -> the TransportLogger object of the Transport stack
111 * will initially write messages to the log.
112 * startLogging=false -> the TransportLogger object of the Transport stack
113 * will initially NOT write messages to the log.
114 * This parameter only has an effect if trace == true.
115 * This parameter is most probably set in Connection or TransportConnector URIs.
116 */
117 protected boolean startLogging = true;
118 /**
119 * Specifies the port that will be used by the JMX server to manage
120 * the TransportLoggers.
121 * This should only be set in an URI by a client (producer or consumer) since
122 * a broker will already create a JMX server.
123 * It is useful for people who test a broker and clients in the same machine
124 * and want to control both via JMX; a different port will be needed.
125 */
126 protected int jmxPort = 1099;
127 protected boolean useLocalHost = false;
128 protected int minmumWireFormatVersion;
129 protected SocketFactory socketFactory;
130 protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
131
132 private Map<String, Object> socketOptions;
133 private int soLinger = Integer.MIN_VALUE;
134 private Boolean keepAlive;
135 private Boolean tcpNoDelay;
136 private Thread runnerThread;
137 private volatile int receiveCounter;
138
139 /**
140 * Connect to a remote Node - e.g. a Broker
141 *
142 * @param wireFormat
143 * @param socketFactory
144 * @param remoteLocation
145 * @param localLocation - e.g. local InetAddress and local port
146 * @throws IOException
147 * @throws UnknownHostException
148 */
149 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
150 URI localLocation) throws UnknownHostException, IOException {
151 this.wireFormat = wireFormat;
152 this.socketFactory = socketFactory;
153 try {
154 this.socket = socketFactory.createSocket();
155 } catch (SocketException e) {
156 this.socket = null;
157 }
158 this.remoteLocation = remoteLocation;
159 this.localLocation = localLocation;
160 setDaemon(false);
161 }
162
163 /**
164 * Initialize from a server Socket
165 *
166 * @param wireFormat
167 * @param socket
168 * @throws IOException
169 */
170 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
171 this.wireFormat = wireFormat;
172 this.socket = socket;
173 this.remoteLocation = null;
174 this.localLocation = null;
175 setDaemon(true);
176 }
177
178 /**
179 * A one way asynchronous send
180 */
181 public void oneway(Object command) throws IOException {
182 checkStarted();
183 wireFormat.marshal(command, dataOut);
184 dataOut.flush();
185 }
186
187 /**
188 * @return pretty print of 'this'
189 */
190 @Override
191 public String toString() {
192 return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort()
193 : (localLocation != null ? localLocation : remoteLocation)) ;
194 }
195
196 /**
197 * reads packets from a Socket
198 */
199 public void run() {
200 LOG.trace("TCP consumer thread for " + this + " starting");
201 this.runnerThread=Thread.currentThread();
202 try {
203 while (!isStopped()) {
204 doRun();
205 }
206 } catch (IOException e) {
207 stoppedLatch.get().countDown();
208 onException(e);
209 } catch (Throwable e){
210 stoppedLatch.get().countDown();
211 IOException ioe=new IOException("Unexpected error occured: " + e);
212 ioe.initCause(e);
213 onException(ioe);
214 }finally {
215 stoppedLatch.get().countDown();
216 }
217 }
218
219 protected void doRun() throws IOException {
220 try {
221 Object command = readCommand();
222 doConsume(command);
223 } catch (SocketTimeoutException e) {
224 } catch (InterruptedIOException e) {
225 }
226 }
227
228 protected Object readCommand() throws IOException {
229 return wireFormat.unmarshal(dataIn);
230 }
231
232 // Properties
233 // -------------------------------------------------------------------------
234 public String getDiffServ() {
235 // This is the value requested by the user by setting the Tcp Transport
236 // options. If the socket hasn't been created, then this value may not
237 // reflect the value returned by Socket.getTrafficClass().
238 return Integer.toString(this.trafficClass);
239 }
240
241 public void setDiffServ(String diffServ) throws IllegalArgumentException {
242 this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ);
243 this.diffServChosen = true;
244 }
245
246 public int getTypeOfService() {
247 // This is the value requested by the user by setting the Tcp Transport
248 // options. If the socket hasn't been created, then this value may not
249 // reflect the value returned by Socket.getTrafficClass().
250 return this.trafficClass;
251 }
252
253 public void setTypeOfService(int typeOfService) {
254 this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
255 this.typeOfServiceChosen = true;
256 }
257
258 public boolean isTrace() {
259 return trace;
260 }
261
262 public void setTrace(boolean trace) {
263 this.trace = trace;
264 }
265
266 public String getLogWriterName() {
267 return logWriterName;
268 }
269
270 public void setLogWriterName(String logFormat) {
271 this.logWriterName = logFormat;
272 }
273
274 public boolean isDynamicManagement() {
275 return dynamicManagement;
276 }
277
278 public void setDynamicManagement(boolean useJmx) {
279 this.dynamicManagement = useJmx;
280 }
281
282 public boolean isStartLogging() {
283 return startLogging;
284 }
285
286 public void setStartLogging(boolean startLogging) {
287 this.startLogging = startLogging;
288 }
289
290 public int getJmxPort() {
291 return jmxPort;
292 }
293
294 public void setJmxPort(int jmxPort) {
295 this.jmxPort = jmxPort;
296 }
297
298 public int getMinmumWireFormatVersion() {
299 return minmumWireFormatVersion;
300 }
301
302 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
303 this.minmumWireFormatVersion = minmumWireFormatVersion;
304 }
305
306 public boolean isUseLocalHost() {
307 return useLocalHost;
308 }
309
310 /**
311 * Sets whether 'localhost' or the actual local host name should be used to
312 * make local connections. On some operating systems such as Macs its not
313 * possible to connect as the local host name so localhost is better.
314 */
315 public void setUseLocalHost(boolean useLocalHost) {
316 this.useLocalHost = useLocalHost;
317 }
318
319 public int getSocketBufferSize() {
320 return socketBufferSize;
321 }
322
323 /**
324 * Sets the buffer size to use on the socket
325 */
326 public void setSocketBufferSize(int socketBufferSize) {
327 this.socketBufferSize = socketBufferSize;
328 }
329
330 public int getSoTimeout() {
331 return soTimeout;
332 }
333
334 /**
335 * Sets the socket timeout
336 */
337 public void setSoTimeout(int soTimeout) {
338 this.soTimeout = soTimeout;
339 }
340
341 public int getConnectionTimeout() {
342 return connectionTimeout;
343 }
344
345 /**
346 * Sets the timeout used to connect to the socket
347 */
348 public void setConnectionTimeout(int connectionTimeout) {
349 this.connectionTimeout = connectionTimeout;
350 }
351
352 public Boolean getKeepAlive() {
353 return keepAlive;
354 }
355
356 /**
357 * Enable/disable TCP KEEP_ALIVE mode
358 */
359 public void setKeepAlive(Boolean keepAlive) {
360 this.keepAlive = keepAlive;
361 }
362
363 /**
364 * Enable/disable soLinger
365 * @param soLinger enabled if > -1, disabled if == -1, system default otherwise
366 */
367 public void setSoLinger(int soLinger) {
368 this.soLinger = soLinger;
369 }
370
371 public int getSoLinger() {
372 return soLinger;
373 }
374
375 public Boolean getTcpNoDelay() {
376 return tcpNoDelay;
377 }
378
379 /**
380 * Enable/disable the TCP_NODELAY option on the socket
381 */
382 public void setTcpNoDelay(Boolean tcpNoDelay) {
383 this.tcpNoDelay = tcpNoDelay;
384 }
385
386 /**
387 * @return the ioBufferSize
388 */
389 public int getIoBufferSize() {
390 return this.ioBufferSize;
391 }
392
393 /**
394 * @param ioBufferSize the ioBufferSize to set
395 */
396 public void setIoBufferSize(int ioBufferSize) {
397 this.ioBufferSize = ioBufferSize;
398 }
399
400 /**
401 * @return the closeAsync
402 */
403 public boolean isCloseAsync() {
404 return closeAsync;
405 }
406
407 /**
408 * @param closeAsync the closeAsync to set
409 */
410 public void setCloseAsync(boolean closeAsync) {
411 this.closeAsync = closeAsync;
412 }
413
414 // Implementation methods
415 // -------------------------------------------------------------------------
416 protected String resolveHostName(String host) throws UnknownHostException {
417 if (isUseLocalHost()) {
418 String localName = InetAddressUtil.getLocalHostName();
419 if (localName != null && localName.equals(host)) {
420 return "localhost";
421 }
422 }
423 return host;
424 }
425
426 /**
427 * Configures the socket for use
428 *
429 * @param sock
430 * @throws SocketException, IllegalArgumentException if setting the options
431 * on the socket failed.
432 */
433 protected void initialiseSocket(Socket sock) throws SocketException,
434 IllegalArgumentException {
435 if (socketOptions != null) {
436 IntrospectionSupport.setProperties(socket, socketOptions);
437 }
438
439 try {
440 sock.setReceiveBufferSize(socketBufferSize);
441 sock.setSendBufferSize(socketBufferSize);
442 } catch (SocketException se) {
443 LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
444 LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
445 }
446 sock.setSoTimeout(soTimeout);
447
448 if (keepAlive != null) {
449 sock.setKeepAlive(keepAlive.booleanValue());
450 }
451
452 if (soLinger > -1) {
453 sock.setSoLinger(true, soLinger);
454 } else if (soLinger == -1) {
455 sock.setSoLinger(false, 0);
456 }
457 if (tcpNoDelay != null) {
458 sock.setTcpNoDelay(tcpNoDelay.booleanValue());
459 }
460 if (!this.trafficClassSet) {
461 this.trafficClassSet = setTrafficClass(sock);
462 }
463 }
464
465 @Override
466 protected void doStart() throws Exception {
467 connect();
468 stoppedLatch.set(new CountDownLatch(1));
469 super.doStart();
470 }
471
472 protected void connect() throws Exception {
473
474 if (socket == null && socketFactory == null) {
475 throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
476 }
477
478 InetSocketAddress localAddress = null;
479 InetSocketAddress remoteAddress = null;
480
481 if (localLocation != null) {
482 localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
483 localLocation.getPort());
484 }
485
486 if (remoteLocation != null) {
487 String host = resolveHostName(remoteLocation.getHost());
488 remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
489 }
490 // Set the traffic class before the socket is connected when possible so
491 // that the connection packets are given the correct traffic class.
492 this.trafficClassSet = setTrafficClass(socket);
493
494 if (socket != null) {
495
496 if (localAddress != null) {
497 socket.bind(localAddress);
498 }
499
500 // If it's a server accepted socket.. we don't need to connect it
501 // to a remote address.
502 if (remoteAddress != null) {
503 if (connectionTimeout >= 0) {
504 socket.connect(remoteAddress, connectionTimeout);
505 } else {
506 socket.connect(remoteAddress);
507 }
508 }
509
510 } else {
511 // For SSL sockets.. you can't create an unconnected socket :(
512 // This means the timout option are not supported either.
513 if (localAddress != null) {
514 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
515 localAddress.getAddress(), localAddress.getPort());
516 } else {
517 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
518 }
519 }
520
521 initialiseSocket(socket);
522 initializeStreams();
523 }
524
525 @Override
526 protected void doStop(ServiceStopper stopper) throws Exception {
527 if (LOG.isDebugEnabled()) {
528 LOG.debug("Stopping transport " + this);
529 }
530
531 // Closing the streams flush the sockets before closing.. if the socket
532 // is hung.. then this hangs the close.
533 // closeStreams();
534 if (socket != null) {
535 if (closeAsync) {
536 //closing the socket can hang also
537 final CountDownLatch latch = new CountDownLatch(1);
538
539 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
540 public void run() {
541 try {
542 socket.close();
543 } catch (IOException e) {
544 if (LOG.isDebugEnabled()) {
545 LOG.debug("Caught exception closing socket", e);
546 }
547 } finally {
548 latch.countDown();
549 }
550 }
551 });
552
553 try {
554 latch.await(1,TimeUnit.SECONDS);
555 } catch (InterruptedException e) {
556 Thread.currentThread().interrupt();
557 }
558
559 } else {
560
561 try {
562 socket.close();
563 } catch (IOException e) {
564 LOG.debug("Caught exception closing socket",e);
565 }
566 }
567 }
568 }
569
570 /**
571 * Override so that stop() blocks until the run thread is no longer running.
572 */
573 @Override
574 public void stop() throws Exception {
575 super.stop();
576 CountDownLatch countDownLatch = stoppedLatch.get();
577 if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
578 countDownLatch.await(1,TimeUnit.SECONDS);
579 }
580 }
581
582 protected void initializeStreams() throws Exception {
583 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
584 @Override
585 public int read() throws IOException {
586 receiveCounter++;
587 return super.read();
588 }
589 @Override
590 public int read(byte[] b, int off, int len) throws IOException {
591 receiveCounter++;
592 return super.read(b, off, len);
593 }
594 @Override
595 public long skip(long n) throws IOException {
596 receiveCounter++;
597 return super.skip(n);
598 }
599 @Override
600 protected void fill() throws IOException {
601 receiveCounter++;
602 super.fill();
603 }
604 };
605 this.dataIn = new DataInputStream(buffIn);
606 TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
607 this.dataOut = new DataOutputStream(outputStream);
608 this.buffOut = outputStream;
609 }
610
611 protected void closeStreams() throws IOException {
612 if (dataOut != null) {
613 dataOut.close();
614 }
615 if (dataIn != null) {
616 dataIn.close();
617 }
618 }
619
620 public void setSocketOptions(Map<String, Object> socketOptions) {
621 this.socketOptions = new HashMap<String, Object>(socketOptions);
622 }
623
624 public String getRemoteAddress() {
625 if (socket != null) {
626 SocketAddress address = socket.getRemoteSocketAddress();
627 if (address instanceof InetSocketAddress) {
628 return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort();
629 } else {
630 return "" + socket.getRemoteSocketAddress();
631 }
632 }
633 return null;
634 }
635
636 @Override
637 public <T> T narrow(Class<T> target) {
638 if (target == Socket.class) {
639 return target.cast(socket);
640 } else if ( target == TimeStampStream.class) {
641 return target.cast(buffOut);
642 }
643 return super.narrow(target);
644 }
645
646 public int getReceiveCounter() {
647 return receiveCounter;
648 }
649
650 /**
651 * @param sock The socket on which to set the Traffic Class.
652 * @return Whether or not the Traffic Class was set on the given socket.
653 * @throws SocketException if the system does not support setting the
654 * Traffic Class.
655 * @throws IllegalArgumentException if both the Differentiated Services and
656 * Type of Services transport options have been set on the same
657 * connection.
658 */
659 private boolean setTrafficClass(Socket sock) throws SocketException,
660 IllegalArgumentException {
661 if (sock == null
662 || (!this.diffServChosen && !this.typeOfServiceChosen)) {
663 return false;
664 }
665 if (this.diffServChosen && this.typeOfServiceChosen) {
666 throw new IllegalArgumentException("Cannot set both the "
667 + " Differentiated Services and Type of Services transport "
668 + " options on the same connection.");
669 }
670
671 sock.setTrafficClass(this.trafficClass);
672
673 int resultTrafficClass = sock.getTrafficClass();
674 if (this.trafficClass != resultTrafficClass) {
675 // In the case where the user has specified the ECN bits (e.g. in
676 // Type of Service) but the system won't allow the ECN bits to be
677 // set or in the case where setting the traffic class failed for
678 // other reasons, emit a warning.
679 if ((this.trafficClass >> 2) == (resultTrafficClass >> 2)
680 && (this.trafficClass & 3) != (resultTrafficClass & 3)) {
681 LOG.warn("Attempted to set the Traffic Class to "
682 + this.trafficClass + " but the result Traffic Class was "
683 + resultTrafficClass + ". Please check that your system "
684 + "allows you to set the ECN bits (the first two bits).");
685 } else {
686 LOG.warn("Attempted to set the Traffic Class to "
687 + this.trafficClass + " but the result Traffic Class was "
688 + resultTrafficClass + ". Please check that your system "
689 + "supports java.net.setTrafficClass.");
690 }
691 return false;
692 }
693 // Reset the guards that prevent both the Differentiated Services
694 // option and the Type of Service option from being set on the same
695 // connection.
696 this.diffServChosen = false;
697 this.typeOfServiceChosen = false;
698 return true;
699 }
700
701 public WireFormat getWireFormat() {
702 return wireFormat;
703 }
704 }