001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport.tcp;
019
020 import java.io.FilterOutputStream;
021 import java.io.IOException;
022 import java.io.OutputStream;
023
024 /**
025 * An optimized buffered outputstream for Tcp
026 *
027 *
028 */
029
030 public class TcpBufferedOutputStream extends FilterOutputStream implements TimeStampStream {
031 private static final int BUFFER_SIZE = 8192;
032 private byte[] buffer;
033 private int bufferlen;
034 private int count;
035 private volatile long writeTimestamp = -1;//concurrent reads of this value
036
037
038 /**
039 * Constructor
040 *
041 * @param out
042 */
043 public TcpBufferedOutputStream(OutputStream out) {
044 this(out, BUFFER_SIZE);
045 }
046
047 /**
048 * Creates a new buffered output stream to write data to the specified
049 * underlying output stream with the specified buffer size.
050 *
051 * @param out the underlying output stream.
052 * @param size the buffer size.
053 * @throws IllegalArgumentException if size <= 0.
054 */
055 public TcpBufferedOutputStream(OutputStream out, int size) {
056 super(out);
057 if (size <= 0) {
058 throw new IllegalArgumentException("Buffer size <= 0");
059 }
060 buffer = new byte[size];
061 bufferlen = size;
062 }
063
064 /**
065 * write a byte on to the stream
066 *
067 * @param b - byte to write
068 * @throws IOException
069 */
070 public void write(int b) throws IOException {
071 if ((bufferlen - count) < 1) {
072 flush();
073 }
074 buffer[count++] = (byte)b;
075 }
076
077 /**
078 * write a byte array to the stream
079 *
080 * @param b the byte buffer
081 * @param off the offset into the buffer
082 * @param len the length of data to write
083 * @throws IOException
084 */
085 public void write(byte b[], int off, int len) throws IOException {
086 if (b != null) {
087 if ((bufferlen - count) < len) {
088 flush();
089 }
090 if (buffer.length >= len) {
091 System.arraycopy(b, off, buffer, count, len);
092 count += len;
093 } else {
094 try {
095 writeTimestamp = System.currentTimeMillis();
096 out.write(b, off, len);
097 } finally {
098 writeTimestamp = -1;
099 }
100 }
101 }
102 }
103
104 /**
105 * flush the data to the output stream This doesn't call flush on the
106 * underlying outputstream, because Tcp is particularly efficent at doing
107 * this itself ....
108 *
109 * @throws IOException
110 */
111 public void flush() throws IOException {
112 if (count > 0 && out != null) {
113 try {
114 writeTimestamp = System.currentTimeMillis();
115 out.write(buffer, 0, count);
116 } finally {
117 writeTimestamp = -1;
118 }
119 count = 0;
120 }
121 }
122
123 /**
124 * close this stream
125 *
126 * @throws IOException
127 */
128 public void close() throws IOException {
129 super.close();
130 }
131
132 /* (non-Javadoc)
133 * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
134 */
135 public boolean isWriting() {
136 return writeTimestamp > 0;
137 }
138
139 /* (non-Javadoc)
140 * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
141 */
142 public long getWriteTimestamp() {
143 return writeTimestamp;
144 }
145
146 }