001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.io.OutputStream;
024 import java.util.HashMap;
025 import java.util.zip.DeflaterOutputStream;
026 import java.util.zip.InflaterInputStream;
027
028 import javax.jms.JMSException;
029 import javax.jms.MessageNotWriteableException;
030 import javax.jms.TextMessage;
031
032 import org.apache.activemq.ActiveMQConnection;
033 import org.apache.activemq.util.ByteArrayInputStream;
034 import org.apache.activemq.util.ByteArrayOutputStream;
035 import org.apache.activemq.util.ByteSequence;
036 import org.apache.activemq.util.JMSExceptionSupport;
037 import org.apache.activemq.util.MarshallingSupport;
038 import org.apache.activemq.wireformat.WireFormat;
039
040 /**
041 * @openwire:marshaller code="28"
042 *
043 */
044 public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage {
045
046 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
047
048 protected String text;
049
050 public Message copy() {
051 ActiveMQTextMessage copy = new ActiveMQTextMessage();
052 copy(copy);
053 return copy;
054 }
055
056 private void copy(ActiveMQTextMessage copy) {
057 super.copy(copy);
058 copy.text = text;
059 }
060
061 public byte getDataStructureType() {
062 return DATA_STRUCTURE_TYPE;
063 }
064
065 public String getJMSXMimeType() {
066 return "jms/text-message";
067 }
068
069 public void setText(String text) throws MessageNotWriteableException {
070 checkReadOnlyBody();
071 this.text = text;
072 setContent(null);
073 }
074
075 public String getText() throws JMSException {
076 if (text == null && getContent() != null) {
077 InputStream is = null;
078 try {
079 ByteSequence bodyAsBytes = getContent();
080 if (bodyAsBytes != null) {
081 is = new ByteArrayInputStream(bodyAsBytes);
082 if (isCompressed()) {
083 is = new InflaterInputStream(is);
084 }
085 DataInputStream dataIn = new DataInputStream(is);
086 text = MarshallingSupport.readUTF8(dataIn);
087 dataIn.close();
088 setContent(null);
089 setCompressed(false);
090 }
091 } catch (IOException ioe) {
092 throw JMSExceptionSupport.create(ioe);
093 } finally {
094 if (is != null) {
095 try {
096 is.close();
097 } catch (IOException e) {
098 // ignore
099 }
100 }
101 }
102 }
103 return text;
104 }
105
106 public void beforeMarshall(WireFormat wireFormat) throws IOException {
107 super.beforeMarshall(wireFormat);
108
109 ByteSequence content = getContent();
110 if (content == null && text != null) {
111 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
112 OutputStream os = bytesOut;
113 ActiveMQConnection connection = getConnection();
114 if (connection != null && connection.isUseCompression()) {
115 compressed = true;
116 os = new DeflaterOutputStream(os);
117 }
118 DataOutputStream dataOut = new DataOutputStream(os);
119 MarshallingSupport.writeUTF8(dataOut, this.text);
120 dataOut.close();
121 setContent(bytesOut.toByteSequence());
122 }
123 }
124
125 // see https://issues.apache.org/activemq/browse/AMQ-2103
126 // and https://issues.apache.org/activemq/browse/AMQ-2966
127 public void clearMarshalledState() throws JMSException {
128 super.clearMarshalledState();
129 this.text = null;
130 }
131
132 /**
133 * Clears out the message body. Clearing a message's body does not clear its
134 * header values or property entries. <p/>
135 * <P>
136 * If this message body was read-only, calling this method leaves the
137 * message body in the same state as an empty body in a newly created
138 * message.
139 *
140 * @throws JMSException if the JMS provider fails to clear the message body
141 * due to some internal error.
142 */
143 public void clearBody() throws JMSException {
144 super.clearBody();
145 this.text = null;
146 }
147
148 public int getSize() {
149 if (size == 0 && content == null && text != null) {
150 size = getMinimumMessageSize();
151 if (marshalledProperties != null) {
152 size += marshalledProperties.getLength();
153 }
154 size += text.length() * 2;
155 }
156 return super.getSize();
157 }
158
159 public String toString() {
160 try {
161 String text = getText();
162 if (text != null) {
163 text = MarshallingSupport.truncate64(text);
164 HashMap<String, Object> overrideFields = new HashMap<String, Object>();
165 overrideFields.put("text", text);
166 return super.toString(overrideFields);
167 }
168 } catch (JMSException e) {
169 }
170 return super.toString();
171 }
172 }