001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.mqtt;
018
019 import java.io.DataOutputStream;
020 import java.io.EOFException;
021 import java.io.IOException;
022 import java.net.Socket;
023 import java.net.URI;
024 import java.net.UnknownHostException;
025 import java.nio.ByteBuffer;
026 import java.nio.channels.SelectionKey;
027 import java.nio.channels.SocketChannel;
028
029 import javax.net.SocketFactory;
030 import org.apache.activemq.transport.nio.NIOOutputStream;
031 import org.apache.activemq.transport.nio.SelectorManager;
032 import org.apache.activemq.transport.nio.SelectorSelection;
033 import org.apache.activemq.transport.tcp.TcpTransport;
034 import org.apache.activemq.util.IOExceptionSupport;
035 import org.apache.activemq.util.ServiceStopper;
036 import org.apache.activemq.wireformat.WireFormat;
037 import org.fusesource.hawtbuf.DataByteArrayInputStream;
038
039 /**
040 * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using MQTT over NIO
041 */
042 public class MQTTNIOTransport extends TcpTransport {
043
044 private SocketChannel channel;
045 private SelectorSelection selection;
046
047 private ByteBuffer inputBuffer;
048 MQTTCodec codec;
049
050 public MQTTNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
051 super(wireFormat, socketFactory, remoteLocation, localLocation);
052 }
053
054 public MQTTNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
055 super(wireFormat, socket);
056 }
057
058 protected void initializeStreams() throws IOException {
059 channel = socket.getChannel();
060 channel.configureBlocking(false);
061 // listen for events telling us when the socket is readable.
062 selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
063 public void onSelect(SelectorSelection selection) {
064 if (!isStopped()) {
065 serviceRead();
066 }
067 }
068
069 public void onError(SelectorSelection selection, Throwable error) {
070 if (error instanceof IOException) {
071 onException((IOException) error);
072 } else {
073 onException(IOExceptionSupport.create(error));
074 }
075 }
076 });
077
078 inputBuffer = ByteBuffer.allocate(8 * 1024);
079 NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
080 this.dataOut = new DataOutputStream(outPutStream);
081 this.buffOut = outPutStream;
082 codec = new MQTTCodec(this);
083 }
084
085 private void serviceRead() {
086 try {
087
088 while (isStarted()) {
089 // read channel
090 int readSize = channel.read(inputBuffer);
091 // channel is closed, cleanup
092 if (readSize == -1) {
093 onException(new EOFException());
094 selection.close();
095 break;
096 }
097 // nothing more to read, break
098 if (readSize == 0) {
099 break;
100 }
101
102 inputBuffer.flip();
103 DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
104 codec.parse(dis, readSize);
105
106 // clear the buffer
107 inputBuffer.clear();
108
109 }
110 } catch (IOException e) {
111 onException(e);
112 } catch (Throwable e) {
113 onException(IOExceptionSupport.create(e));
114 }
115 }
116
117 protected void doStart() throws Exception {
118 connect();
119 selection.setInterestOps(SelectionKey.OP_READ);
120 selection.enable();
121 }
122
123 protected void doStop(ServiceStopper stopper) throws Exception {
124 try {
125 if (selection != null) {
126 selection.close();
127 }
128 } finally {
129 super.doStop(stopper);
130 }
131 }
132 }