001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.util.Collections;
022 import java.util.HashMap;
023 import java.util.Map;
024 import javax.jms.IllegalStateException;
025 import javax.jms.InvalidDestinationException;
026 import javax.jms.JMSException;
027 import org.apache.activemq.command.ActiveMQBytesMessage;
028 import org.apache.activemq.command.ActiveMQDestination;
029 import org.apache.activemq.command.ActiveMQMessage;
030 import org.apache.activemq.command.CommandTypes;
031 import org.apache.activemq.command.ConsumerId;
032 import org.apache.activemq.command.ConsumerInfo;
033 import org.apache.activemq.command.MessageAck;
034 import org.apache.activemq.command.MessageDispatch;
035 import org.apache.activemq.command.ProducerId;
036 import org.apache.activemq.selector.SelectorParser;
037 import org.apache.activemq.util.IOExceptionSupport;
038 import org.apache.activemq.util.IntrospectionSupport;
039 import org.apache.activemq.util.JMSExceptionSupport;
040
041 /**
042 *
043 */
044 public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
045
046 private final ActiveMQConnection connection;
047 private final ConsumerInfo info;
048 // These are the messages waiting to be delivered to the client
049 private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
050
051 private int deliveredCounter;
052 private MessageDispatch lastDelivered;
053 private boolean eosReached;
054 private byte buffer[];
055 private int pos;
056 private Map<String, Object> jmsProperties;
057
058 private ProducerId producerId;
059 private long nextSequenceId;
060 private long timeout;
061 private boolean firstReceived;
062
063 public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout)
064 throws JMSException {
065 this.connection = connection;
066
067 if (dest == null) {
068 throw new InvalidDestinationException("Don't understand null destinations");
069 } else if (dest.isTemporary()) {
070 String physicalName = dest.getPhysicalName();
071
072 if (physicalName == null) {
073 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
074 }
075
076 String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
077
078 if (physicalName.indexOf(connectionID) < 0) {
079 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
080 }
081
082 if (connection.isDeleted(dest)) {
083 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
084 }
085 }
086
087 if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
088 this.timeout = timeout;
089
090 this.info = new ConsumerInfo(consumerId);
091 this.info.setSubscriptionName(name);
092
093 if (selector != null && selector.trim().length() != 0) {
094 selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
095 } else {
096 selector = "JMSType='org.apache.activemq.Stream'";
097 }
098
099 SelectorParser.parse(selector);
100 this.info.setSelector(selector);
101
102 this.info.setPrefetchSize(prefetch);
103 this.info.setNoLocal(noLocal);
104 this.info.setBrowser(false);
105 this.info.setDispatchAsync(false);
106
107 // Allows the options on the destination to configure the consumerInfo
108 if (dest.getOptions() != null) {
109 Map<String, String> options = new HashMap<String, String>(dest.getOptions());
110 IntrospectionSupport.setProperties(this.info, options, "consumer.");
111 }
112
113 this.info.setDestination(dest);
114
115 this.connection.addInputStream(this);
116 this.connection.addDispatcher(info.getConsumerId(), this);
117 this.connection.syncSendPacket(info);
118 unconsumedMessages.start();
119 }
120
121 @Override
122 public void close() throws IOException {
123 if (!unconsumedMessages.isClosed()) {
124 try {
125 if (lastDelivered != null) {
126 MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
127 connection.asyncSendPacket(ack);
128 }
129 dispose();
130 this.connection.syncSendPacket(info.createRemoveCommand());
131 } catch (JMSException e) {
132 throw IOExceptionSupport.create(e);
133 }
134 }
135 }
136
137 public void dispose() {
138 if (!unconsumedMessages.isClosed()) {
139 unconsumedMessages.close();
140 this.connection.removeDispatcher(info.getConsumerId());
141 this.connection.removeInputStream(this);
142 }
143 }
144
145 /**
146 * Return the JMS Properties which where used to send the InputStream
147 *
148 * @return jmsProperties
149 * @throws IOException
150 */
151 public Map<String, Object> getJMSProperties() throws IOException {
152 if (jmsProperties == null) {
153 fillBuffer();
154 }
155 return jmsProperties;
156 }
157
158 public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
159 checkClosed();
160 MessageDispatch md;
161 try {
162 if (firstReceived || timeout == -1) {
163 md = unconsumedMessages.dequeue(-1);
164 firstReceived = true;
165 } else {
166 md = unconsumedMessages.dequeue(timeout);
167 if (md == null) throw new ReadTimeoutException();
168 }
169 } catch (InterruptedException e) {
170 Thread.currentThread().interrupt();
171 throw JMSExceptionSupport.create(e);
172 }
173
174 if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
175 return null;
176 }
177
178 deliveredCounter++;
179 if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
180 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
181 connection.asyncSendPacket(ack);
182 deliveredCounter = 0;
183 lastDelivered = null;
184 } else {
185 lastDelivered = md;
186 }
187
188 return (ActiveMQMessage)md.getMessage();
189 }
190
191 /**
192 * @throws IllegalStateException
193 */
194 protected void checkClosed() throws IllegalStateException {
195 if (unconsumedMessages.isClosed()) {
196 throw new IllegalStateException("The Consumer is closed");
197 }
198 }
199
200 /**
201 *
202 * @see InputStream#read()
203 * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
204 */
205 @Override
206 public int read() throws IOException {
207 fillBuffer();
208 if (eosReached || buffer.length == 0) {
209 return -1;
210 }
211
212 return buffer[pos++] & 0xff;
213 }
214
215 /**
216 *
217 * @see InputStream#read(byte[], int, int)
218 * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
219 */
220 @Override
221 public int read(byte[] b, int off, int len) throws IOException {
222 fillBuffer();
223 if (eosReached || buffer.length == 0) {
224 return -1;
225 }
226
227 int max = Math.min(len, buffer.length - pos);
228 System.arraycopy(buffer, pos, b, off, max);
229
230 pos += max;
231 return max;
232 }
233
234 private void fillBuffer() throws IOException {
235 if (eosReached || (buffer != null && buffer.length > pos)) {
236 return;
237 }
238 try {
239 while (true) {
240 ActiveMQMessage m = receive();
241 if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
242 // First message.
243 long producerSequenceId = m.getMessageId().getProducerSequenceId();
244 if (producerId == null) {
245 // We have to start a stream at sequence id = 0
246 if (producerSequenceId != 0) {
247 continue;
248 }
249 nextSequenceId++;
250 producerId = m.getMessageId().getProducerId();
251 } else {
252 // Verify it's the next message of the sequence.
253 if (!m.getMessageId().getProducerId().equals(producerId)) {
254 throw new IOException("Received an unexpected message: invalid producer: " + m);
255 }
256 if (producerSequenceId != nextSequenceId++) {
257 throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
258 }
259 }
260
261 // Read the buffer in.
262 ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
263 buffer = new byte[(int)bm.getBodyLength()];
264 bm.readBytes(buffer);
265 pos = 0;
266 if (jmsProperties == null) {
267 jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties()));
268 }
269 } else {
270 eosReached = true;
271 if (jmsProperties == null) {
272 // no properties found
273 jmsProperties = Collections.emptyMap();
274 }
275 }
276 return;
277 }
278 } catch (JMSException e) {
279 eosReached = true;
280 if (jmsProperties == null) {
281 // no properties found
282 jmsProperties = Collections.emptyMap();
283 }
284 throw IOExceptionSupport.create(e);
285 }
286 }
287
288 public void dispatch(MessageDispatch md) {
289 unconsumedMessages.enqueue(md);
290 }
291
292 @Override
293 public String toString() {
294 return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
295 }
296
297
298 /**
299 * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
300 *
301 */
302 public class ReadTimeoutException extends IOException {
303 public ReadTimeoutException() {
304 super();
305 }
306 }
307 }