001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.tcp;
018
019 import java.io.FilterInputStream;
020 import java.io.IOException;
021 import java.io.InputStream;
022
023 /**
024 * An optimized buffered input stream for Tcp
025 *
026 *
027 */
028 public class TcpBufferedInputStream extends FilterInputStream {
029 private static final int DEFAULT_BUFFER_SIZE = 8192;
030 protected byte internalBuffer[];
031 protected int count;
032 protected int position;
033
034 public TcpBufferedInputStream(InputStream in) {
035 this(in, DEFAULT_BUFFER_SIZE);
036 }
037
038 public TcpBufferedInputStream(InputStream in, int size) {
039 super(in);
040 if (size <= 0) {
041 throw new IllegalArgumentException("Buffer size <= 0");
042 }
043 internalBuffer = new byte[size];
044 }
045
046 protected void fill() throws IOException {
047 byte[] buffer = internalBuffer;
048 count = 0;
049 position = 0;
050 int n = in.read(buffer, position, buffer.length - position);
051 if (n > 0) {
052 count = n + position;
053 }
054 }
055
056 public int read() throws IOException {
057 if (position >= count) {
058 fill();
059 if (position >= count) {
060 return -1;
061 }
062 }
063 return internalBuffer[position++] & 0xff;
064 }
065
066 private int readStream(byte[] b, int off, int len) throws IOException {
067 int avail = count - position;
068 if (avail <= 0) {
069 if (len >= internalBuffer.length) {
070 return in.read(b, off, len);
071 }
072 fill();
073 avail = count - position;
074 if (avail <= 0) {
075 return -1;
076 }
077 }
078 int cnt = (avail < len) ? avail : len;
079 System.arraycopy(internalBuffer, position, b, off, cnt);
080 position += cnt;
081 return cnt;
082 }
083
084 public int read(byte b[], int off, int len) throws IOException {
085 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
086 throw new IndexOutOfBoundsException();
087 } else if (len == 0) {
088 return 0;
089 }
090 int n = 0;
091 for (;;) {
092 int nread = readStream(b, off + n, len - n);
093 if (nread <= 0) {
094 return (n == 0) ? nread : n;
095 }
096 n += nread;
097 if (n >= len) {
098 return n;
099 }
100 // if not closed but no bytes available, return
101 InputStream input = in;
102 if (input != null && input.available() <= 0) {
103 return n;
104 }
105 }
106 }
107
108 public long skip(long n) throws IOException {
109 if (n <= 0) {
110 return 0;
111 }
112 long avail = count - position;
113 if (avail <= 0) {
114 return in.skip(n);
115 }
116 long skipped = (avail < n) ? avail : n;
117 position += skipped;
118 return skipped;
119 }
120
121 public int available() throws IOException {
122 return in.available() + (count - position);
123 }
124
125 public boolean markSupported() {
126 return false;
127 }
128
129 public void close() throws IOException {
130 if (in != null) {
131 in.close();
132 }
133 }
134 }