001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.DataOutputStream;
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.net.Socket;
024 import java.net.URI;
025 import java.net.UnknownHostException;
026 import java.nio.ByteBuffer;
027 import java.nio.channels.SelectionKey;
028 import java.nio.channels.SocketChannel;
029
030 import javax.net.SocketFactory;
031
032 import org.apache.activemq.transport.Transport;
033 import org.apache.activemq.transport.nio.NIOOutputStream;
034 import org.apache.activemq.transport.nio.SelectorManager;
035 import org.apache.activemq.transport.nio.SelectorSelection;
036 import org.apache.activemq.transport.tcp.TcpTransport;
037 import org.apache.activemq.util.IOExceptionSupport;
038 import org.apache.activemq.util.ServiceStopper;
039 import org.apache.activemq.wireformat.WireFormat;
040
041 /**
042 * An implementation of the {@link Transport} interface for using Stomp over NIO
043 *
044 *
045 */
046 public class StompNIOTransport extends TcpTransport {
047
048 private SocketChannel channel;
049 private SelectorSelection selection;
050
051 private ByteBuffer inputBuffer;
052 StompCodec codec;
053
054 public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
055 super(wireFormat, socketFactory, remoteLocation, localLocation);
056 }
057
058 public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
059 super(wireFormat, socket);
060 }
061
062 protected void initializeStreams() throws IOException {
063 channel = socket.getChannel();
064 channel.configureBlocking(false);
065
066 // listen for events telling us when the socket is readable.
067 selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
068 public void onSelect(SelectorSelection selection) {
069 serviceRead();
070 }
071
072 public void onError(SelectorSelection selection, Throwable error) {
073 if (error instanceof IOException) {
074 onException((IOException)error);
075 } else {
076 onException(IOExceptionSupport.create(error));
077 }
078 }
079 });
080
081 inputBuffer = ByteBuffer.allocate(8 * 1024);
082 NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
083 this.dataOut = new DataOutputStream(outPutStream);
084 this.buffOut = outPutStream;
085 codec = new StompCodec(this);
086 }
087
088 private void serviceRead() {
089 try {
090
091 while (true) {
092 // read channel
093 int readSize = channel.read(inputBuffer);
094 // channel is closed, cleanup
095 if (readSize == -1) {
096 onException(new EOFException());
097 selection.close();
098 break;
099 }
100 // nothing more to read, break
101 if (readSize == 0) {
102 break;
103 }
104
105 inputBuffer.flip();
106
107 ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
108 codec.parse(input, readSize);
109
110 // clear the buffer
111 inputBuffer.clear();
112
113 }
114 } catch (IOException e) {
115 onException(e);
116 } catch (Throwable e) {
117 onException(IOExceptionSupport.create(e));
118 }
119 }
120
121 protected void doStart() throws Exception {
122 connect();
123 selection.setInterestOps(SelectionKey.OP_READ);
124 selection.enable();
125 }
126
127 protected void doStop(ServiceStopper stopper) throws Exception {
128 try {
129 if (selection != null) {
130 selection.close();
131 }
132 } finally {
133 super.doStop(stopper);
134 }
135 }
136 }