001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.command;
019
020 import java.io.DataInputStream;
021 import java.io.DataOutputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.ObjectOutputStream;
025 import java.io.OutputStream;
026 import java.io.Serializable;
027 import java.util.zip.DeflaterOutputStream;
028 import java.util.zip.InflaterInputStream;
029
030 import javax.jms.JMSException;
031 import javax.jms.ObjectMessage;
032
033 import org.apache.activemq.ActiveMQConnection;
034 import org.apache.activemq.util.ByteArrayInputStream;
035 import org.apache.activemq.util.ByteArrayOutputStream;
036 import org.apache.activemq.util.ByteSequence;
037 import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
038 import org.apache.activemq.util.JMSExceptionSupport;
039 import org.apache.activemq.wireformat.WireFormat;
040
041 /**
042 * An <CODE>ObjectMessage</CODE> object is used to send a message that
043 * contains a serializable object in the Java programming language ("Java
044 * object"). It inherits from the <CODE>Message</CODE> interface and adds a
045 * body containing a single reference to an object. Only
046 * <CODE>Serializable</CODE> Java objects can be used. <p/>
047 * <P>
048 * If a collection of Java objects must be sent, one of the
049 * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/>
050 * <P>
051 * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only
052 * mode. If a client attempts to write to the message at this point, a
053 * <CODE>MessageNotWriteableException</CODE> is thrown. If
054 * <CODE>clearBody</CODE> is called, the message can now be both read from and
055 * written to.
056 *
057 * @openwire:marshaller code="26"
058 * @see javax.jms.Session#createObjectMessage()
059 * @see javax.jms.Session#createObjectMessage(Serializable)
060 * @see javax.jms.BytesMessage
061 * @see javax.jms.MapMessage
062 * @see javax.jms.Message
063 * @see javax.jms.StreamMessage
064 * @see javax.jms.TextMessage
065 */
066 public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
067
068 // TODO: verify classloader
069 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
070 static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader();
071
072 protected transient Serializable object;
073
074 public Message copy() {
075 ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
076 copy(copy);
077 return copy;
078 }
079
080 private void copy(ActiveMQObjectMessage copy) {
081 ActiveMQConnection connection = getConnection();
082 if (connection == null || !connection.isObjectMessageSerializationDefered()) {
083 storeContent();
084 copy.object = null;
085 } else {
086 copy.object = object;
087 }
088 super.copy(copy);
089
090 }
091
092 public void storeContent() {
093 ByteSequence bodyAsBytes = getContent();
094 if (bodyAsBytes == null && object != null) {
095 try {
096 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
097 OutputStream os = bytesOut;
098 ActiveMQConnection connection = getConnection();
099 if (connection != null && connection.isUseCompression()) {
100 compressed = true;
101 os = new DeflaterOutputStream(os);
102 }
103 DataOutputStream dataOut = new DataOutputStream(os);
104 ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
105 objOut.writeObject(object);
106 objOut.flush();
107 objOut.reset();
108 objOut.close();
109 setContent(bytesOut.toByteSequence());
110 } catch (IOException ioe) {
111 throw new RuntimeException(ioe.getMessage(), ioe);
112 }
113 }
114 }
115
116 public byte getDataStructureType() {
117 return DATA_STRUCTURE_TYPE;
118 }
119
120 public String getJMSXMimeType() {
121 return "jms/object-message";
122 }
123
124 /**
125 * Clears out the message body. Clearing a message's body does not clear its
126 * header values or property entries. <p/>
127 * <P>
128 * If this message body was read-only, calling this method leaves the
129 * message body in the same state as an empty body in a newly created
130 * message.
131 *
132 * @throws JMSException if the JMS provider fails to clear the message body
133 * due to some internal error.
134 */
135
136 public void clearBody() throws JMSException {
137 super.clearBody();
138 this.object = null;
139 }
140
141 /**
142 * Sets the serializable object containing this message's data. It is
143 * important to note that an <CODE>ObjectMessage</CODE> contains a
144 * snapshot of the object at the time <CODE>setObject()</CODE> is called;
145 * subsequent modifications of the object will have no effect on the
146 * <CODE>ObjectMessage</CODE> body.
147 *
148 * @param newObject the message's data
149 * @throws JMSException if the JMS provider fails to set the object due to
150 * some internal error.
151 * @throws javax.jms.MessageFormatException if object serialization fails.
152 * @throws javax.jms.MessageNotWriteableException if the message is in
153 * read-only mode.
154 */
155
156 public void setObject(Serializable newObject) throws JMSException {
157 checkReadOnlyBody();
158 this.object = newObject;
159 setContent(null);
160 ActiveMQConnection connection = getConnection();
161 if (connection == null || !connection.isObjectMessageSerializationDefered()) {
162 storeContent();
163 }
164 }
165
166 /**
167 * Gets the serializable object containing this message's data. The default
168 * value is null.
169 *
170 * @return the serializable object containing this message's data
171 * @throws JMSException
172 */
173 public Serializable getObject() throws JMSException {
174 if (object == null && getContent() != null) {
175 try {
176 ByteSequence content = getContent();
177 InputStream is = new ByteArrayInputStream(content);
178 if (isCompressed()) {
179 is = new InflaterInputStream(is);
180 }
181 DataInputStream dataIn = new DataInputStream(is);
182 ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
183 try {
184 object = (Serializable)objIn.readObject();
185 } catch (ClassNotFoundException ce) {
186 throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce);
187 } finally {
188 dataIn.close();
189 }
190 } catch (IOException e) {
191 throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
192 }
193 }
194 return this.object;
195 }
196
197 @Override
198 public void beforeMarshall(WireFormat wireFormat) throws IOException {
199 super.beforeMarshall(wireFormat);
200 // may have initiated on vm transport with deferred marshalling
201 storeContent();
202 }
203
204 public void clearMarshalledState() throws JMSException {
205 super.clearMarshalledState();
206 this.object = null;
207 }
208
209 public void onMessageRolledBack() {
210 super.onMessageRolledBack();
211
212 // lets force the object to be deserialized again - as we could have
213 // changed the object
214 object = null;
215 }
216
217 public String toString() {
218 try {
219 getObject();
220 } catch (JMSException e) {
221 }
222 return super.toString();
223 }
224 }