001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.nio;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.net.Socket;
024 import java.net.URI;
025 import java.net.UnknownHostException;
026 import java.nio.ByteBuffer;
027 import java.nio.channels.SelectionKey;
028 import java.nio.channels.SocketChannel;
029
030 import javax.net.SocketFactory;
031
032 import org.apache.activemq.command.Command;
033 import org.apache.activemq.openwire.OpenWireFormat;
034 import org.apache.activemq.transport.Transport;
035 import org.apache.activemq.transport.tcp.TcpTransport;
036 import org.apache.activemq.util.IOExceptionSupport;
037 import org.apache.activemq.util.ServiceStopper;
038 import org.apache.activemq.wireformat.WireFormat;
039
040 /**
041 * An implementation of the {@link Transport} interface using raw tcp/ip
042 *
043 *
044 */
045 public class NIOTransport extends TcpTransport {
046
047 // private static final Logger log = LoggerFactory.getLogger(NIOTransport.class);
048 protected SocketChannel channel;
049 protected SelectorSelection selection;
050 protected ByteBuffer inputBuffer;
051 protected ByteBuffer currentBuffer;
052 protected int nextFrameSize;
053
054 public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
055 super(wireFormat, socketFactory, remoteLocation, localLocation);
056 }
057
058 public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
059 super(wireFormat, socket);
060 }
061
062 protected void initializeStreams() throws IOException {
063 channel = socket.getChannel();
064 channel.configureBlocking(false);
065
066 // listen for events telling us when the socket is readable.
067 selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
068 public void onSelect(SelectorSelection selection) {
069 serviceRead();
070 }
071
072 public void onError(SelectorSelection selection, Throwable error) {
073 if (error instanceof IOException) {
074 onException((IOException)error);
075 } else {
076 onException(IOExceptionSupport.create(error));
077 }
078 }
079 });
080
081 // Send the data via the channel
082 // inputBuffer = ByteBuffer.allocateDirect(8*1024);
083 inputBuffer = ByteBuffer.allocate(8 * 1024);
084 currentBuffer = inputBuffer;
085 nextFrameSize = -1;
086 currentBuffer.limit(4);
087 NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024);
088 this.dataOut = new DataOutputStream(outPutStream);
089 this.buffOut = outPutStream;
090 }
091
092 protected void serviceRead() {
093 try {
094 while (true) {
095
096 int readSize = channel.read(currentBuffer);
097 if (readSize == -1) {
098 onException(new EOFException());
099 selection.close();
100 break;
101 }
102 if (readSize == 0) {
103 break;
104 }
105
106 if (currentBuffer.hasRemaining()) {
107 continue;
108 }
109
110 // Are we trying to figure out the size of the next frame?
111 if (nextFrameSize == -1) {
112 assert inputBuffer == currentBuffer;
113
114 // If the frame is too big to fit in our direct byte buffer,
115 // Then allocate a non direct byte buffer of the right size
116 // for it.
117 inputBuffer.flip();
118 nextFrameSize = inputBuffer.getInt() + 4;
119
120 if (wireFormat instanceof OpenWireFormat) {
121 long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize();
122 if (nextFrameSize > maxFrameSize) {
123 throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
124 }
125 }
126
127 if (nextFrameSize > inputBuffer.capacity()) {
128 currentBuffer = ByteBuffer.allocate(nextFrameSize);
129 currentBuffer.putInt(nextFrameSize);
130 } else {
131 inputBuffer.limit(nextFrameSize);
132 }
133
134 } else {
135 currentBuffer.flip();
136
137 Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
138 doConsume((Command)command);
139
140 nextFrameSize = -1;
141 inputBuffer.clear();
142 inputBuffer.limit(4);
143 currentBuffer = inputBuffer;
144 }
145
146 }
147
148 } catch (IOException e) {
149 onException(e);
150 } catch (Throwable e) {
151 onException(IOExceptionSupport.create(e));
152 }
153 }
154
155 protected void doStart() throws Exception {
156 connect();
157 selection.setInterestOps(SelectionKey.OP_READ);
158 selection.enable();
159 }
160
161 protected void doStop(ServiceStopper stopper) throws Exception {
162 if (selection != null) {
163 selection.close();
164 selection = null;
165 }
166 super.doStop(stopper);
167 }
168 }