001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.command;
019
020 import java.io.BufferedInputStream;
021 import java.io.DataInputStream;
022 import java.io.DataOutputStream;
023 import java.io.EOFException;
024 import java.io.IOException;
025 import java.io.InputStream;
026 import java.io.OutputStream;
027 import java.util.zip.DeflaterOutputStream;
028 import java.util.zip.InflaterInputStream;
029
030 import javax.jms.JMSException;
031 import javax.jms.MessageEOFException;
032 import javax.jms.MessageFormatException;
033 import javax.jms.MessageNotReadableException;
034 import javax.jms.MessageNotWriteableException;
035 import javax.jms.StreamMessage;
036
037 import org.apache.activemq.ActiveMQConnection;
038 import org.apache.activemq.util.ByteArrayInputStream;
039 import org.apache.activemq.util.ByteArrayOutputStream;
040 import org.apache.activemq.util.ByteSequence;
041 import org.apache.activemq.util.JMSExceptionSupport;
042 import org.apache.activemq.util.MarshallingSupport;
043
044 /**
045 * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
046 * types in the Java programming language. It is filled and read sequentially.
047 * It inherits from the <CODE>Message</CODE> interface and adds a stream
048 * message body. Its methods are based largely on those found in
049 * <CODE>java.io.DataInputStream</CODE> and
050 * <CODE>java.io.DataOutputStream</CODE>. <p/>
051 * <P>
052 * The primitive types can be read or written explicitly using methods for each
053 * type. They may also be read or written generically as objects. For instance,
054 * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
055 * <CODE>StreamMessage.writeObject(new
056 * Integer(6))</CODE>. Both forms are
057 * provided, because the explicit form is convenient for static programming, and
058 * the object form is needed when types are not known at compile time. <p/>
059 * <P>
060 * When the message is first created, and when <CODE>clearBody</CODE> is
061 * called, the body of the message is in write-only mode. After the first call
062 * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
063 * After a message has been sent, the client that sent it can retain and modify
064 * it without affecting the message that has been sent. The same message object
065 * can be sent multiple times. When a message has been received, the provider
066 * has called <CODE>reset</CODE> so that the message body is in read-only mode
067 * for the client. <p/>
068 * <P>
069 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
070 * message body is cleared and the message body is in write-only mode. <p/>
071 * <P>
072 * If a client attempts to read a message in write-only mode, a
073 * <CODE>MessageNotReadableException</CODE> is thrown. <p/>
074 * <P>
075 * If a client attempts to write a message in read-only mode, a
076 * <CODE>MessageNotWriteableException</CODE> is thrown. <p/>
077 * <P>
078 * <CODE>StreamMessage</CODE> objects support the following conversion table.
079 * The marked cases must be supported. The unmarked cases must throw a
080 * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive
081 * conversions may throw a runtime exception if the primitive's
082 * <CODE>valueOf()</CODE> method does not accept it as a valid
083 * <CODE>String</CODE> representation of the primitive. <p/>
084 * <P>
085 * A value written as the row type can be read as the column type. <p/>
086 *
087 * <PRE>
088 * | | boolean byte short char int long float double String byte[]
089 * |----------------------------------------------------------------------
090 * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
091 * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
092 * X |----------------------------------------------------------------------
093 *
094 * </PRE>
095 *
096 * <p/>
097 * <P>
098 * Attempting to read a null value as a primitive type must be treated as
099 * calling the primitive's corresponding <code>valueOf(String)</code>
100 * conversion method with a null value. Since <code>char</code> does not
101 * support a <code>String</code> conversion, attempting to read a null value
102 * as a <code>char</code> must throw a <code>NullPointerException</code>.
103 *
104 * @openwire:marshaller code="27"
105 * @see javax.jms.Session#createStreamMessage()
106 * @see javax.jms.BytesMessage
107 * @see javax.jms.MapMessage
108 * @see javax.jms.Message
109 * @see javax.jms.ObjectMessage
110 * @see javax.jms.TextMessage
111 */
112 public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
113
114 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
115
116 protected transient DataOutputStream dataOut;
117 protected transient ByteArrayOutputStream bytesOut;
118 protected transient DataInputStream dataIn;
119 protected transient int remainingBytes = -1;
120
121 public Message copy() {
122 ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
123 copy(copy);
124 return copy;
125 }
126
127 private void copy(ActiveMQStreamMessage copy) {
128 storeContent();
129 super.copy(copy);
130 copy.dataOut = null;
131 copy.bytesOut = null;
132 copy.dataIn = null;
133 }
134
135 public void onSend() throws JMSException {
136 super.onSend();
137 storeContent();
138 }
139
140 private void storeContent() {
141 if (dataOut != null) {
142 try {
143 dataOut.close();
144 setContent(bytesOut.toByteSequence());
145 bytesOut = null;
146 dataOut = null;
147 } catch (IOException ioe) {
148 throw new RuntimeException(ioe);
149 }
150 }
151 }
152
153 public byte getDataStructureType() {
154 return DATA_STRUCTURE_TYPE;
155 }
156
157 public String getJMSXMimeType() {
158 return "jms/stream-message";
159 }
160
161 /**
162 * Clears out the message body. Clearing a message's body does not clear its
163 * header values or property entries. <p/>
164 * <P>
165 * If this message body was read-only, calling this method leaves the
166 * message body in the same state as an empty body in a newly created
167 * message.
168 *
169 * @throws JMSException if the JMS provider fails to clear the message body
170 * due to some internal error.
171 */
172
173 public void clearBody() throws JMSException {
174 super.clearBody();
175 this.dataOut = null;
176 this.dataIn = null;
177 this.bytesOut = null;
178 this.remainingBytes = -1;
179 }
180
181 /**
182 * Reads a <code>boolean</code> from the stream message.
183 *
184 * @return the <code>boolean</code> value read
185 * @throws JMSException if the JMS provider fails to read the message due to
186 * some internal error.
187 * @throws MessageEOFException if unexpected end of message stream has been
188 * reached.
189 * @throws MessageFormatException if this type conversion is invalid.
190 * @throws MessageNotReadableException if the message is in write-only mode.
191 */
192
193 public boolean readBoolean() throws JMSException {
194 initializeReading();
195 try {
196
197 this.dataIn.mark(10);
198 int type = this.dataIn.read();
199 if (type == -1) {
200 throw new MessageEOFException("reached end of data");
201 }
202 if (type == MarshallingSupport.BOOLEAN_TYPE) {
203 return this.dataIn.readBoolean();
204 }
205 if (type == MarshallingSupport.STRING_TYPE) {
206 return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
207 }
208 if (type == MarshallingSupport.NULL) {
209 this.dataIn.reset();
210 throw new NullPointerException("Cannot convert NULL value to boolean.");
211 } else {
212 this.dataIn.reset();
213 throw new MessageFormatException(" not a boolean type");
214 }
215 } catch (EOFException e) {
216 throw JMSExceptionSupport.createMessageEOFException(e);
217 } catch (IOException e) {
218 throw JMSExceptionSupport.createMessageFormatException(e);
219 }
220 }
221
222 /**
223 * Reads a <code>byte</code> value from the stream message.
224 *
225 * @return the next byte from the stream message as a 8-bit
226 * <code>byte</code>
227 * @throws JMSException if the JMS provider fails to read the message due to
228 * some internal error.
229 * @throws MessageEOFException if unexpected end of message stream has been
230 * reached.
231 * @throws MessageFormatException if this type conversion is invalid.
232 * @throws MessageNotReadableException if the message is in write-only mode.
233 */
234
235 public byte readByte() throws JMSException {
236 initializeReading();
237 try {
238
239 this.dataIn.mark(10);
240 int type = this.dataIn.read();
241 if (type == -1) {
242 throw new MessageEOFException("reached end of data");
243 }
244 if (type == MarshallingSupport.BYTE_TYPE) {
245 return this.dataIn.readByte();
246 }
247 if (type == MarshallingSupport.STRING_TYPE) {
248 return Byte.valueOf(this.dataIn.readUTF()).byteValue();
249 }
250 if (type == MarshallingSupport.NULL) {
251 this.dataIn.reset();
252 throw new NullPointerException("Cannot convert NULL value to byte.");
253 } else {
254 this.dataIn.reset();
255 throw new MessageFormatException(" not a byte type");
256 }
257 } catch (NumberFormatException mfe) {
258 try {
259 this.dataIn.reset();
260 } catch (IOException ioe) {
261 throw JMSExceptionSupport.create(ioe);
262 }
263 throw mfe;
264
265 } catch (EOFException e) {
266 throw JMSExceptionSupport.createMessageEOFException(e);
267 } catch (IOException e) {
268 throw JMSExceptionSupport.createMessageFormatException(e);
269 }
270 }
271
272 /**
273 * Reads a 16-bit integer from the stream message.
274 *
275 * @return a 16-bit integer from the stream message
276 * @throws JMSException if the JMS provider fails to read the message due to
277 * some internal error.
278 * @throws MessageEOFException if unexpected end of message stream has been
279 * reached.
280 * @throws MessageFormatException if this type conversion is invalid.
281 * @throws MessageNotReadableException if the message is in write-only mode.
282 */
283
284 public short readShort() throws JMSException {
285 initializeReading();
286 try {
287
288 this.dataIn.mark(17);
289 int type = this.dataIn.read();
290 if (type == -1) {
291 throw new MessageEOFException("reached end of data");
292 }
293 if (type == MarshallingSupport.SHORT_TYPE) {
294 return this.dataIn.readShort();
295 }
296 if (type == MarshallingSupport.BYTE_TYPE) {
297 return this.dataIn.readByte();
298 }
299 if (type == MarshallingSupport.STRING_TYPE) {
300 return Short.valueOf(this.dataIn.readUTF()).shortValue();
301 }
302 if (type == MarshallingSupport.NULL) {
303 this.dataIn.reset();
304 throw new NullPointerException("Cannot convert NULL value to short.");
305 } else {
306 this.dataIn.reset();
307 throw new MessageFormatException(" not a short type");
308 }
309 } catch (NumberFormatException mfe) {
310 try {
311 this.dataIn.reset();
312 } catch (IOException ioe) {
313 throw JMSExceptionSupport.create(ioe);
314 }
315 throw mfe;
316
317 } catch (EOFException e) {
318 throw JMSExceptionSupport.createMessageEOFException(e);
319 } catch (IOException e) {
320 throw JMSExceptionSupport.createMessageFormatException(e);
321 }
322
323 }
324
325 /**
326 * Reads a Unicode character value from the stream message.
327 *
328 * @return a Unicode character from the stream message
329 * @throws JMSException if the JMS provider fails to read the message due to
330 * some internal error.
331 * @throws MessageEOFException if unexpected end of message stream has been
332 * reached.
333 * @throws MessageFormatException if this type conversion is invalid
334 * @throws MessageNotReadableException if the message is in write-only mode.
335 */
336
337 public char readChar() throws JMSException {
338 initializeReading();
339 try {
340
341 this.dataIn.mark(17);
342 int type = this.dataIn.read();
343 if (type == -1) {
344 throw new MessageEOFException("reached end of data");
345 }
346 if (type == MarshallingSupport.CHAR_TYPE) {
347 return this.dataIn.readChar();
348 }
349 if (type == MarshallingSupport.NULL) {
350 this.dataIn.reset();
351 throw new NullPointerException("Cannot convert NULL value to char.");
352 } else {
353 this.dataIn.reset();
354 throw new MessageFormatException(" not a char type");
355 }
356 } catch (NumberFormatException mfe) {
357 try {
358 this.dataIn.reset();
359 } catch (IOException ioe) {
360 throw JMSExceptionSupport.create(ioe);
361 }
362 throw mfe;
363
364 } catch (EOFException e) {
365 throw JMSExceptionSupport.createMessageEOFException(e);
366 } catch (IOException e) {
367 throw JMSExceptionSupport.createMessageFormatException(e);
368 }
369 }
370
371 /**
372 * Reads a 32-bit integer from the stream message.
373 *
374 * @return a 32-bit integer value from the stream message, interpreted as an
375 * <code>int</code>
376 * @throws JMSException if the JMS provider fails to read the message due to
377 * some internal error.
378 * @throws MessageEOFException if unexpected end of message stream has been
379 * reached.
380 * @throws MessageFormatException if this type conversion is invalid.
381 * @throws MessageNotReadableException if the message is in write-only mode.
382 */
383
384 public int readInt() throws JMSException {
385 initializeReading();
386 try {
387
388 this.dataIn.mark(33);
389 int type = this.dataIn.read();
390 if (type == -1) {
391 throw new MessageEOFException("reached end of data");
392 }
393 if (type == MarshallingSupport.INTEGER_TYPE) {
394 return this.dataIn.readInt();
395 }
396 if (type == MarshallingSupport.SHORT_TYPE) {
397 return this.dataIn.readShort();
398 }
399 if (type == MarshallingSupport.BYTE_TYPE) {
400 return this.dataIn.readByte();
401 }
402 if (type == MarshallingSupport.STRING_TYPE) {
403 return Integer.valueOf(this.dataIn.readUTF()).intValue();
404 }
405 if (type == MarshallingSupport.NULL) {
406 this.dataIn.reset();
407 throw new NullPointerException("Cannot convert NULL value to int.");
408 } else {
409 this.dataIn.reset();
410 throw new MessageFormatException(" not an int type");
411 }
412 } catch (NumberFormatException mfe) {
413 try {
414 this.dataIn.reset();
415 } catch (IOException ioe) {
416 throw JMSExceptionSupport.create(ioe);
417 }
418 throw mfe;
419
420 } catch (EOFException e) {
421 throw JMSExceptionSupport.createMessageEOFException(e);
422 } catch (IOException e) {
423 throw JMSExceptionSupport.createMessageFormatException(e);
424 }
425 }
426
427 /**
428 * Reads a 64-bit integer from the stream message.
429 *
430 * @return a 64-bit integer value from the stream message, interpreted as a
431 * <code>long</code>
432 * @throws JMSException if the JMS provider fails to read the message due to
433 * some internal error.
434 * @throws MessageEOFException if unexpected end of message stream has been
435 * reached.
436 * @throws MessageFormatException if this type conversion is invalid.
437 * @throws MessageNotReadableException if the message is in write-only mode.
438 */
439
440 public long readLong() throws JMSException {
441 initializeReading();
442 try {
443
444 this.dataIn.mark(65);
445 int type = this.dataIn.read();
446 if (type == -1) {
447 throw new MessageEOFException("reached end of data");
448 }
449 if (type == MarshallingSupport.LONG_TYPE) {
450 return this.dataIn.readLong();
451 }
452 if (type == MarshallingSupport.INTEGER_TYPE) {
453 return this.dataIn.readInt();
454 }
455 if (type == MarshallingSupport.SHORT_TYPE) {
456 return this.dataIn.readShort();
457 }
458 if (type == MarshallingSupport.BYTE_TYPE) {
459 return this.dataIn.readByte();
460 }
461 if (type == MarshallingSupport.STRING_TYPE) {
462 return Long.valueOf(this.dataIn.readUTF()).longValue();
463 }
464 if (type == MarshallingSupport.NULL) {
465 this.dataIn.reset();
466 throw new NullPointerException("Cannot convert NULL value to long.");
467 } else {
468 this.dataIn.reset();
469 throw new MessageFormatException(" not a long type");
470 }
471 } catch (NumberFormatException mfe) {
472 try {
473 this.dataIn.reset();
474 } catch (IOException ioe) {
475 throw JMSExceptionSupport.create(ioe);
476 }
477 throw mfe;
478
479 } catch (EOFException e) {
480 throw JMSExceptionSupport.createMessageEOFException(e);
481 } catch (IOException e) {
482 throw JMSExceptionSupport.createMessageFormatException(e);
483 }
484 }
485
486 /**
487 * Reads a <code>float</code> from the stream message.
488 *
489 * @return a <code>float</code> value from the stream message
490 * @throws JMSException if the JMS provider fails to read the message due to
491 * some internal error.
492 * @throws MessageEOFException if unexpected end of message stream has been
493 * reached.
494 * @throws MessageFormatException if this type conversion is invalid.
495 * @throws MessageNotReadableException if the message is in write-only mode.
496 */
497
498 public float readFloat() throws JMSException {
499 initializeReading();
500 try {
501 this.dataIn.mark(33);
502 int type = this.dataIn.read();
503 if (type == -1) {
504 throw new MessageEOFException("reached end of data");
505 }
506 if (type == MarshallingSupport.FLOAT_TYPE) {
507 return this.dataIn.readFloat();
508 }
509 if (type == MarshallingSupport.STRING_TYPE) {
510 return Float.valueOf(this.dataIn.readUTF()).floatValue();
511 }
512 if (type == MarshallingSupport.NULL) {
513 this.dataIn.reset();
514 throw new NullPointerException("Cannot convert NULL value to float.");
515 } else {
516 this.dataIn.reset();
517 throw new MessageFormatException(" not a float type");
518 }
519 } catch (NumberFormatException mfe) {
520 try {
521 this.dataIn.reset();
522 } catch (IOException ioe) {
523 throw JMSExceptionSupport.create(ioe);
524 }
525 throw mfe;
526
527 } catch (EOFException e) {
528 throw JMSExceptionSupport.createMessageEOFException(e);
529 } catch (IOException e) {
530 throw JMSExceptionSupport.createMessageFormatException(e);
531 }
532 }
533
534 /**
535 * Reads a <code>double</code> from the stream message.
536 *
537 * @return a <code>double</code> value from the stream message
538 * @throws JMSException if the JMS provider fails to read the message due to
539 * some internal error.
540 * @throws MessageEOFException if unexpected end of message stream has been
541 * reached.
542 * @throws MessageFormatException if this type conversion is invalid.
543 * @throws MessageNotReadableException if the message is in write-only mode.
544 */
545
546 public double readDouble() throws JMSException {
547 initializeReading();
548 try {
549
550 this.dataIn.mark(65);
551 int type = this.dataIn.read();
552 if (type == -1) {
553 throw new MessageEOFException("reached end of data");
554 }
555 if (type == MarshallingSupport.DOUBLE_TYPE) {
556 return this.dataIn.readDouble();
557 }
558 if (type == MarshallingSupport.FLOAT_TYPE) {
559 return this.dataIn.readFloat();
560 }
561 if (type == MarshallingSupport.STRING_TYPE) {
562 return Double.valueOf(this.dataIn.readUTF()).doubleValue();
563 }
564 if (type == MarshallingSupport.NULL) {
565 this.dataIn.reset();
566 throw new NullPointerException("Cannot convert NULL value to double.");
567 } else {
568 this.dataIn.reset();
569 throw new MessageFormatException(" not a double type");
570 }
571 } catch (NumberFormatException mfe) {
572 try {
573 this.dataIn.reset();
574 } catch (IOException ioe) {
575 throw JMSExceptionSupport.create(ioe);
576 }
577 throw mfe;
578
579 } catch (EOFException e) {
580 throw JMSExceptionSupport.createMessageEOFException(e);
581 } catch (IOException e) {
582 throw JMSExceptionSupport.createMessageFormatException(e);
583 }
584 }
585
586 /**
587 * Reads a <CODE>String</CODE> from the stream message.
588 *
589 * @return a Unicode string from the stream message
590 * @throws JMSException if the JMS provider fails to read the message due to
591 * some internal error.
592 * @throws MessageEOFException if unexpected end of message stream has been
593 * reached.
594 * @throws MessageFormatException if this type conversion is invalid.
595 * @throws MessageNotReadableException if the message is in write-only mode.
596 */
597
598 public String readString() throws JMSException {
599 initializeReading();
600 try {
601
602 this.dataIn.mark(65);
603 int type = this.dataIn.read();
604 if (type == -1) {
605 throw new MessageEOFException("reached end of data");
606 }
607 if (type == MarshallingSupport.NULL) {
608 return null;
609 }
610 if (type == MarshallingSupport.BIG_STRING_TYPE) {
611 return MarshallingSupport.readUTF8(dataIn);
612 }
613 if (type == MarshallingSupport.STRING_TYPE) {
614 return this.dataIn.readUTF();
615 }
616 if (type == MarshallingSupport.LONG_TYPE) {
617 return new Long(this.dataIn.readLong()).toString();
618 }
619 if (type == MarshallingSupport.INTEGER_TYPE) {
620 return new Integer(this.dataIn.readInt()).toString();
621 }
622 if (type == MarshallingSupport.SHORT_TYPE) {
623 return new Short(this.dataIn.readShort()).toString();
624 }
625 if (type == MarshallingSupport.BYTE_TYPE) {
626 return new Byte(this.dataIn.readByte()).toString();
627 }
628 if (type == MarshallingSupport.FLOAT_TYPE) {
629 return new Float(this.dataIn.readFloat()).toString();
630 }
631 if (type == MarshallingSupport.DOUBLE_TYPE) {
632 return new Double(this.dataIn.readDouble()).toString();
633 }
634 if (type == MarshallingSupport.BOOLEAN_TYPE) {
635 return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
636 }
637 if (type == MarshallingSupport.CHAR_TYPE) {
638 return new Character(this.dataIn.readChar()).toString();
639 } else {
640 this.dataIn.reset();
641 throw new MessageFormatException(" not a String type");
642 }
643 } catch (NumberFormatException mfe) {
644 try {
645 this.dataIn.reset();
646 } catch (IOException ioe) {
647 throw JMSExceptionSupport.create(ioe);
648 }
649 throw mfe;
650
651 } catch (EOFException e) {
652 throw JMSExceptionSupport.createMessageEOFException(e);
653 } catch (IOException e) {
654 throw JMSExceptionSupport.createMessageFormatException(e);
655 }
656 }
657
658 /**
659 * Reads a byte array field from the stream message into the specified
660 * <CODE>byte[]</CODE> object (the read buffer). <p/>
661 * <P>
662 * To read the field value, <CODE>readBytes</CODE> should be successively
663 * called until it returns a value less than the length of the read buffer.
664 * The value of the bytes in the buffer following the last byte read is
665 * undefined. <p/>
666 * <P>
667 * If <CODE>readBytes</CODE> returns a value equal to the length of the
668 * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
669 * are no more bytes to be read, this call returns -1. <p/>
670 * <P>
671 * If the byte array field value is null, <CODE>readBytes</CODE> returns
672 * -1. <p/>
673 * <P>
674 * If the byte array field value is empty, <CODE>readBytes</CODE> returns
675 * 0. <p/>
676 * <P>
677 * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
678 * field value has been made, the full value of the field must be read
679 * before it is valid to read the next field. An attempt to read the next
680 * field before that has been done will throw a
681 * <CODE>MessageFormatException</CODE>. <p/>
682 * <P>
683 * To read the byte field value into a new <CODE>byte[]</CODE> object, use
684 * the <CODE>readObject</CODE> method.
685 *
686 * @param value the buffer into which the data is read
687 * @return the total number of bytes read into the buffer, or -1 if there is
688 * no more data because the end of the byte field has been reached
689 * @throws JMSException if the JMS provider fails to read the message due to
690 * some internal error.
691 * @throws MessageEOFException if unexpected end of message stream has been
692 * reached.
693 * @throws MessageFormatException if this type conversion is invalid.
694 * @throws MessageNotReadableException if the message is in write-only mode.
695 * @see #readObject()
696 */
697
698 public int readBytes(byte[] value) throws JMSException {
699
700 initializeReading();
701 try {
702 if (value == null) {
703 throw new NullPointerException();
704 }
705
706 if (remainingBytes == -1) {
707 this.dataIn.mark(value.length + 1);
708 int type = this.dataIn.read();
709 if (type == -1) {
710 throw new MessageEOFException("reached end of data");
711 }
712 if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
713 throw new MessageFormatException("Not a byte array");
714 }
715 remainingBytes = this.dataIn.readInt();
716 } else if (remainingBytes == 0) {
717 remainingBytes = -1;
718 return -1;
719 }
720
721 if (value.length <= remainingBytes) {
722 // small buffer
723 remainingBytes -= value.length;
724 this.dataIn.readFully(value);
725 return value.length;
726 } else {
727 // big buffer
728 int rc = this.dataIn.read(value, 0, remainingBytes);
729 remainingBytes = 0;
730 return rc;
731 }
732
733 } catch (EOFException e) {
734 JMSException jmsEx = new MessageEOFException(e.getMessage());
735 jmsEx.setLinkedException(e);
736 throw jmsEx;
737 } catch (IOException e) {
738 JMSException jmsEx = new MessageFormatException(e.getMessage());
739 jmsEx.setLinkedException(e);
740 throw jmsEx;
741 }
742 }
743
744 /**
745 * Reads an object from the stream message. <p/>
746 * <P>
747 * This method can be used to return, in objectified format, an object in
748 * the Java programming language ("Java object") that has been written to
749 * the stream with the equivalent <CODE>writeObject</CODE> method call, or
750 * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
751 * <P>
752 * Note that byte values are returned as <CODE>byte[]</CODE>, not
753 * <CODE>Byte[]</CODE>. <p/>
754 * <P>
755 * An attempt to call <CODE>readObject</CODE> to read a byte field value
756 * into a new <CODE>byte[]</CODE> object before the full value of the byte
757 * field has been read will throw a <CODE>MessageFormatException</CODE>.
758 *
759 * @return a Java object from the stream message, in objectified format (for
760 * example, if the object was written as an <CODE>int</CODE>, an
761 * <CODE>Integer</CODE> is returned)
762 * @throws JMSException if the JMS provider fails to read the message due to
763 * some internal error.
764 * @throws MessageEOFException if unexpected end of message stream has been
765 * reached.
766 * @throws MessageFormatException if this type conversion is invalid.
767 * @throws MessageNotReadableException if the message is in write-only mode.
768 * @see #readBytes(byte[] value)
769 */
770
771 public Object readObject() throws JMSException {
772 initializeReading();
773 try {
774 this.dataIn.mark(65);
775 int type = this.dataIn.read();
776 if (type == -1) {
777 throw new MessageEOFException("reached end of data");
778 }
779 if (type == MarshallingSupport.NULL) {
780 return null;
781 }
782 if (type == MarshallingSupport.BIG_STRING_TYPE) {
783 return MarshallingSupport.readUTF8(dataIn);
784 }
785 if (type == MarshallingSupport.STRING_TYPE) {
786 return this.dataIn.readUTF();
787 }
788 if (type == MarshallingSupport.LONG_TYPE) {
789 return Long.valueOf(this.dataIn.readLong());
790 }
791 if (type == MarshallingSupport.INTEGER_TYPE) {
792 return Integer.valueOf(this.dataIn.readInt());
793 }
794 if (type == MarshallingSupport.SHORT_TYPE) {
795 return Short.valueOf(this.dataIn.readShort());
796 }
797 if (type == MarshallingSupport.BYTE_TYPE) {
798 return Byte.valueOf(this.dataIn.readByte());
799 }
800 if (type == MarshallingSupport.FLOAT_TYPE) {
801 return new Float(this.dataIn.readFloat());
802 }
803 if (type == MarshallingSupport.DOUBLE_TYPE) {
804 return new Double(this.dataIn.readDouble());
805 }
806 if (type == MarshallingSupport.BOOLEAN_TYPE) {
807 return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
808 }
809 if (type == MarshallingSupport.CHAR_TYPE) {
810 return Character.valueOf(this.dataIn.readChar());
811 }
812 if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
813 int len = this.dataIn.readInt();
814 byte[] value = new byte[len];
815 this.dataIn.readFully(value);
816 return value;
817 } else {
818 this.dataIn.reset();
819 throw new MessageFormatException("unknown type");
820 }
821 } catch (NumberFormatException mfe) {
822 try {
823 this.dataIn.reset();
824 } catch (IOException ioe) {
825 throw JMSExceptionSupport.create(ioe);
826 }
827 throw mfe;
828
829 } catch (EOFException e) {
830 JMSException jmsEx = new MessageEOFException(e.getMessage());
831 jmsEx.setLinkedException(e);
832 throw jmsEx;
833 } catch (IOException e) {
834 JMSException jmsEx = new MessageFormatException(e.getMessage());
835 jmsEx.setLinkedException(e);
836 throw jmsEx;
837 }
838 }
839
840 /**
841 * Writes a <code>boolean</code> to the stream message. The value
842 * <code>true</code> is written as the value <code>(byte)1</code>; the
843 * value <code>false</code> is written as the value <code>(byte)0</code>.
844 *
845 * @param value the <code>boolean</code> value to be written
846 * @throws JMSException if the JMS provider fails to write the message due
847 * to some internal error.
848 * @throws MessageNotWriteableException if the message is in read-only mode.
849 */
850
851 public void writeBoolean(boolean value) throws JMSException {
852 initializeWriting();
853 try {
854 MarshallingSupport.marshalBoolean(dataOut, value);
855 } catch (IOException ioe) {
856 throw JMSExceptionSupport.create(ioe);
857 }
858 }
859
860 /**
861 * Writes a <code>byte</code> to the stream message.
862 *
863 * @param value the <code>byte</code> value to be written
864 * @throws JMSException if the JMS provider fails to write the message due
865 * to some internal error.
866 * @throws MessageNotWriteableException if the message is in read-only mode.
867 */
868
869 public void writeByte(byte value) throws JMSException {
870 initializeWriting();
871 try {
872 MarshallingSupport.marshalByte(dataOut, value);
873 } catch (IOException ioe) {
874 throw JMSExceptionSupport.create(ioe);
875 }
876 }
877
878 /**
879 * Writes a <code>short</code> to the stream message.
880 *
881 * @param value the <code>short</code> value to be written
882 * @throws JMSException if the JMS provider fails to write the message due
883 * to some internal error.
884 * @throws MessageNotWriteableException if the message is in read-only mode.
885 */
886
887 public void writeShort(short value) throws JMSException {
888 initializeWriting();
889 try {
890 MarshallingSupport.marshalShort(dataOut, value);
891 } catch (IOException ioe) {
892 throw JMSExceptionSupport.create(ioe);
893 }
894 }
895
896 /**
897 * Writes a <code>char</code> to the stream message.
898 *
899 * @param value the <code>char</code> value to be written
900 * @throws JMSException if the JMS provider fails to write the message due
901 * to some internal error.
902 * @throws MessageNotWriteableException if the message is in read-only mode.
903 */
904
905 public void writeChar(char value) throws JMSException {
906 initializeWriting();
907 try {
908 MarshallingSupport.marshalChar(dataOut, value);
909 } catch (IOException ioe) {
910 throw JMSExceptionSupport.create(ioe);
911 }
912 }
913
914 /**
915 * Writes an <code>int</code> to the stream message.
916 *
917 * @param value the <code>int</code> value to be written
918 * @throws JMSException if the JMS provider fails to write the message due
919 * to some internal error.
920 * @throws MessageNotWriteableException if the message is in read-only mode.
921 */
922
923 public void writeInt(int value) throws JMSException {
924 initializeWriting();
925 try {
926 MarshallingSupport.marshalInt(dataOut, value);
927 } catch (IOException ioe) {
928 throw JMSExceptionSupport.create(ioe);
929 }
930 }
931
932 /**
933 * Writes a <code>long</code> to the stream message.
934 *
935 * @param value the <code>long</code> value to be written
936 * @throws JMSException if the JMS provider fails to write the message due
937 * to some internal error.
938 * @throws MessageNotWriteableException if the message is in read-only mode.
939 */
940
941 public void writeLong(long value) throws JMSException {
942 initializeWriting();
943 try {
944 MarshallingSupport.marshalLong(dataOut, value);
945 } catch (IOException ioe) {
946 throw JMSExceptionSupport.create(ioe);
947 }
948 }
949
950 /**
951 * Writes a <code>float</code> to the stream message.
952 *
953 * @param value the <code>float</code> value to be written
954 * @throws JMSException if the JMS provider fails to write the message due
955 * to some internal error.
956 * @throws MessageNotWriteableException if the message is in read-only mode.
957 */
958
959 public void writeFloat(float value) throws JMSException {
960 initializeWriting();
961 try {
962 MarshallingSupport.marshalFloat(dataOut, value);
963 } catch (IOException ioe) {
964 throw JMSExceptionSupport.create(ioe);
965 }
966 }
967
968 /**
969 * Writes a <code>double</code> to the stream message.
970 *
971 * @param value the <code>double</code> value to be written
972 * @throws JMSException if the JMS provider fails to write the message due
973 * to some internal error.
974 * @throws MessageNotWriteableException if the message is in read-only mode.
975 */
976
977 public void writeDouble(double value) throws JMSException {
978 initializeWriting();
979 try {
980 MarshallingSupport.marshalDouble(dataOut, value);
981 } catch (IOException ioe) {
982 throw JMSExceptionSupport.create(ioe);
983 }
984 }
985
986 /**
987 * Writes a <code>String</code> to the stream message.
988 *
989 * @param value the <code>String</code> value to be written
990 * @throws JMSException if the JMS provider fails to write the message due
991 * to some internal error.
992 * @throws MessageNotWriteableException if the message is in read-only mode.
993 */
994
995 public void writeString(String value) throws JMSException {
996 initializeWriting();
997 try {
998 if (value == null) {
999 MarshallingSupport.marshalNull(dataOut);
1000 } else {
1001 MarshallingSupport.marshalString(dataOut, value);
1002 }
1003 } catch (IOException ioe) {
1004 throw JMSExceptionSupport.create(ioe);
1005 }
1006 }
1007
1008 /**
1009 * Writes a byte array field to the stream message. <p/>
1010 * <P>
1011 * The byte array <code>value</code> is written to the message as a byte
1012 * array field. Consecutively written byte array fields are treated as two
1013 * distinct fields when the fields are read.
1014 *
1015 * @param value the byte array value to be written
1016 * @throws JMSException if the JMS provider fails to write the message due
1017 * to some internal error.
1018 * @throws MessageNotWriteableException if the message is in read-only mode.
1019 */
1020
1021 public void writeBytes(byte[] value) throws JMSException {
1022 writeBytes(value, 0, value.length);
1023 }
1024
1025 /**
1026 * Writes a portion of a byte array as a byte array field to the stream
1027 * message. <p/>
1028 * <P>
1029 * The a portion of the byte array <code>value</code> is written to the
1030 * message as a byte array field. Consecutively written byte array fields
1031 * are treated as two distinct fields when the fields are read.
1032 *
1033 * @param value the byte array value to be written
1034 * @param offset the initial offset within the byte array
1035 * @param length the number of bytes to use
1036 * @throws JMSException if the JMS provider fails to write the message due
1037 * to some internal error.
1038 * @throws MessageNotWriteableException if the message is in read-only mode.
1039 */
1040
1041 public void writeBytes(byte[] value, int offset, int length) throws JMSException {
1042 initializeWriting();
1043 try {
1044 MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
1045 } catch (IOException ioe) {
1046 throw JMSExceptionSupport.create(ioe);
1047 }
1048 }
1049
1050 /**
1051 * Writes an object to the stream message. <p/>
1052 * <P>
1053 * This method works only for the objectified primitive object types (<code>Integer</code>,
1054 * <code>Double</code>, <code>Long</code> ...),
1055 * <code>String</code> objects, and byte arrays.
1056 *
1057 * @param value the Java object to be written
1058 * @throws JMSException if the JMS provider fails to write the message due
1059 * to some internal error.
1060 * @throws MessageFormatException if the object is invalid.
1061 * @throws MessageNotWriteableException if the message is in read-only mode.
1062 */
1063
1064 public void writeObject(Object value) throws JMSException {
1065 initializeWriting();
1066 if (value == null) {
1067 try {
1068 MarshallingSupport.marshalNull(dataOut);
1069 } catch (IOException ioe) {
1070 throw JMSExceptionSupport.create(ioe);
1071 }
1072 } else if (value instanceof String) {
1073 writeString(value.toString());
1074 } else if (value instanceof Character) {
1075 writeChar(((Character)value).charValue());
1076 } else if (value instanceof Boolean) {
1077 writeBoolean(((Boolean)value).booleanValue());
1078 } else if (value instanceof Byte) {
1079 writeByte(((Byte)value).byteValue());
1080 } else if (value instanceof Short) {
1081 writeShort(((Short)value).shortValue());
1082 } else if (value instanceof Integer) {
1083 writeInt(((Integer)value).intValue());
1084 } else if (value instanceof Float) {
1085 writeFloat(((Float)value).floatValue());
1086 } else if (value instanceof Double) {
1087 writeDouble(((Double)value).doubleValue());
1088 } else if (value instanceof byte[]) {
1089 writeBytes((byte[])value);
1090 }else if (value instanceof Long) {
1091 writeLong(((Long)value).longValue());
1092 }else {
1093 throw new MessageFormatException("Unsupported Object type: " + value.getClass());
1094 }
1095 }
1096
1097 /**
1098 * Puts the message body in read-only mode and repositions the stream of
1099 * bytes to the beginning.
1100 *
1101 * @throws JMSException if an internal error occurs
1102 */
1103
1104 public void reset() throws JMSException {
1105 storeContent();
1106 this.bytesOut = null;
1107 this.dataIn = null;
1108 this.dataOut = null;
1109 this.remainingBytes = -1;
1110 setReadOnlyBody(true);
1111 }
1112
1113 private void initializeWriting() throws MessageNotWriteableException {
1114 checkReadOnlyBody();
1115 if (this.dataOut == null) {
1116 this.bytesOut = new ByteArrayOutputStream();
1117 OutputStream os = bytesOut;
1118 ActiveMQConnection connection = getConnection();
1119 if (connection != null && connection.isUseCompression()) {
1120 compressed = true;
1121 os = new DeflaterOutputStream(os);
1122 }
1123 this.dataOut = new DataOutputStream(os);
1124 }
1125 }
1126
1127 protected void checkWriteOnlyBody() throws MessageNotReadableException {
1128 if (!readOnlyBody) {
1129 throw new MessageNotReadableException("Message body is write-only");
1130 }
1131 }
1132
1133 private void initializeReading() throws MessageNotReadableException {
1134 checkWriteOnlyBody();
1135 if (this.dataIn == null) {
1136 ByteSequence data = getContent();
1137 if (data == null) {
1138 data = new ByteSequence(new byte[] {}, 0, 0);
1139 }
1140 InputStream is = new ByteArrayInputStream(data);
1141 if (isCompressed()) {
1142 is = new InflaterInputStream(is);
1143 is = new BufferedInputStream(is);
1144 }
1145 this.dataIn = new DataInputStream(is);
1146 }
1147 }
1148
1149 public String toString() {
1150 return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
1151 }
1152 }