001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.nio;
018
019 import java.io.EOFException;
020 import java.io.IOException;
021 import java.io.InterruptedIOException;
022 import java.io.OutputStream;
023 import java.nio.ByteBuffer;
024 import java.nio.channels.WritableByteChannel;
025
026 import org.apache.activemq.transport.tcp.TimeStampStream;
027
028 import javax.net.ssl.SSLEngine;
029
030 /**
031 * An optimized buffered outputstream for Tcp
032 */
033 public class NIOOutputStream extends OutputStream implements TimeStampStream {
034
035 private static final int BUFFER_SIZE = 8192;
036
037 private final WritableByteChannel out;
038 private final byte[] buffer;
039 private final ByteBuffer byteBuffer;
040
041 private int count;
042 private boolean closed;
043 private volatile long writeTimestamp = -1;//concurrent reads of this value
044
045 private SSLEngine engine;
046
047 /**
048 * Constructor
049 *
050 * @param out
051 */
052 public NIOOutputStream(WritableByteChannel out) {
053 this(out, BUFFER_SIZE);
054 }
055
056 /**
057 * Creates a new buffered output stream to write data to the specified
058 * underlying output stream with the specified buffer size.
059 *
060 * @param out the underlying output stream.
061 * @param size the buffer size.
062 * @throws IllegalArgumentException if size <= 0.
063 */
064 public NIOOutputStream(WritableByteChannel out, int size) {
065 this.out = out;
066 if (size <= 0) {
067 throw new IllegalArgumentException("Buffer size <= 0");
068 }
069 buffer = new byte[size];
070 byteBuffer = ByteBuffer.wrap(buffer);
071 }
072
073 /**
074 * write a byte on to the stream
075 *
076 * @param b - byte to write
077 * @throws IOException
078 */
079 public void write(int b) throws IOException {
080 checkClosed();
081 if (availableBufferToWrite() < 1) {
082 flush();
083 }
084 buffer[count++] = (byte)b;
085 }
086
087 /**
088 * write a byte array to the stream
089 *
090 * @param b the byte buffer
091 * @param off the offset into the buffer
092 * @param len the length of data to write
093 * @throws IOException
094 */
095 public void write(byte b[], int off, int len) throws IOException {
096 checkClosed();
097 if (availableBufferToWrite() < len) {
098 flush();
099 }
100 if (buffer.length >= len) {
101 System.arraycopy(b, off, buffer, count, len);
102 count += len;
103 } else {
104 write(ByteBuffer.wrap(b, off, len));
105 }
106 }
107
108 /**
109 * flush the data to the output stream This doesn't call flush on the
110 * underlying outputstream, because Tcp is particularly efficent at doing
111 * this itself ....
112 *
113 * @throws IOException
114 */
115 public void flush() throws IOException {
116 if (count > 0 && out != null) {
117 byteBuffer.position(0);
118 byteBuffer.limit(count);
119 write(byteBuffer);
120 count = 0;
121 }
122 }
123
124 /**
125 * close this stream
126 *
127 * @throws IOException
128 */
129 public void close() throws IOException {
130 super.close();
131 if (engine != null) {
132 engine.closeOutbound();
133 }
134 closed = true;
135 }
136
137 /**
138 * Checks that the stream has not been closed
139 *
140 * @throws IOException
141 */
142 protected void checkClosed() throws IOException {
143 if (closed) {
144 throw new EOFException("Cannot write to the stream any more it has already been closed");
145 }
146 }
147
148 /**
149 * @return the amount free space in the buffer
150 */
151 private int availableBufferToWrite() {
152 return buffer.length - count;
153 }
154
155 protected void write(ByteBuffer data) throws IOException {
156 ByteBuffer plain;
157 if (engine != null) {
158 plain = ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
159 plain.clear();
160 engine.wrap(data, plain);
161 plain.flip();
162 } else {
163 plain = data;
164 }
165
166 int remaining = plain.remaining();
167 int lastRemaining = remaining - 1;
168 long delay = 1;
169 try {
170 writeTimestamp = System.currentTimeMillis();
171 while (remaining > 0) {
172
173 // We may need to do a little bit of sleeping to avoid a busy loop.
174 // Slow down if no data was written out..
175 if (remaining == lastRemaining) {
176 try {
177 // Use exponential rollback to increase sleep time.
178 Thread.sleep(delay);
179 delay *= 2;
180 if (delay > 1000) {
181 delay = 1000;
182 }
183 } catch (InterruptedException e) {
184 throw new InterruptedIOException();
185 }
186 } else {
187 delay = 1;
188 }
189 lastRemaining = remaining;
190
191 // Since the write is non-blocking, all the data may not have been
192 // written.
193 out.write(plain);
194 remaining = data.remaining();
195 }
196 } finally {
197 writeTimestamp = -1;
198 }
199 }
200
201
202 /* (non-Javadoc)
203 * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
204 */
205 public boolean isWriting() {
206 return writeTimestamp > 0;
207 }
208
209 /* (non-Javadoc)
210 * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
211 */
212 public long getWriteTimestamp() {
213 return writeTimestamp;
214 }
215
216 public void setEngine(SSLEngine engine) {
217 this.engine = engine;
218 }
219 }