001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.net.MalformedURLException;
020 import java.util.Enumeration;
021
022 import javax.jms.BytesMessage;
023 import javax.jms.Destination;
024 import javax.jms.JMSException;
025 import javax.jms.MapMessage;
026 import javax.jms.Message;
027 import javax.jms.MessageEOFException;
028 import javax.jms.ObjectMessage;
029 import javax.jms.Queue;
030 import javax.jms.StreamMessage;
031 import javax.jms.TemporaryQueue;
032 import javax.jms.TemporaryTopic;
033 import javax.jms.TextMessage;
034 import javax.jms.Topic;
035
036 import org.apache.activemq.blob.BlobDownloader;
037 import org.apache.activemq.blob.BlobUploader;
038 import org.apache.activemq.command.ActiveMQBlobMessage;
039 import org.apache.activemq.command.ActiveMQBytesMessage;
040 import org.apache.activemq.command.ActiveMQDestination;
041 import org.apache.activemq.command.ActiveMQMapMessage;
042 import org.apache.activemq.command.ActiveMQMessage;
043 import org.apache.activemq.command.ActiveMQObjectMessage;
044 import org.apache.activemq.command.ActiveMQQueue;
045 import org.apache.activemq.command.ActiveMQStreamMessage;
046 import org.apache.activemq.command.ActiveMQTempQueue;
047 import org.apache.activemq.command.ActiveMQTempTopic;
048 import org.apache.activemq.command.ActiveMQTextMessage;
049 import org.apache.activemq.command.ActiveMQTopic;
050
051 /**
052 * A helper class for converting normal JMS interfaces into ActiveMQ specific
053 * ones.
054 *
055 *
056 */
057 public final class ActiveMQMessageTransformation {
058
059 private ActiveMQMessageTransformation() {
060 }
061
062 /**
063 * Creates a an available JMS message from another provider.
064 *
065 * @param destination - Destination to be converted into ActiveMQ's
066 * implementation.
067 * @return ActiveMQDestination - ActiveMQ's implementation of the
068 * destination.
069 * @throws JMSException if an error occurs
070 */
071 public static ActiveMQDestination transformDestination(Destination destination) throws JMSException {
072 ActiveMQDestination activeMQDestination = null;
073
074 if (destination != null) {
075 if (destination instanceof ActiveMQDestination) {
076 return (ActiveMQDestination)destination;
077
078 } else {
079 if (destination instanceof TemporaryQueue) {
080 activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName());
081 } else if (destination instanceof TemporaryTopic) {
082 activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName());
083 } else if (destination instanceof Queue) {
084 activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName());
085 } else if (destination instanceof Topic) {
086 activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName());
087 }
088 }
089 }
090
091 return activeMQDestination;
092 }
093
094 /**
095 * Creates a fast shallow copy of the current ActiveMQMessage or creates a
096 * whole new message instance from an available JMS message from another
097 * provider.
098 *
099 * @param message - Message to be converted into ActiveMQ's implementation.
100 * @param connection
101 * @return ActiveMQMessage - ActiveMQ's implementation object of the
102 * message.
103 * @throws JMSException if an error occurs
104 */
105 public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
106 throws JMSException {
107 if (message instanceof ActiveMQMessage) {
108 return (ActiveMQMessage)message;
109
110 } else {
111 ActiveMQMessage activeMessage = null;
112
113 if (message instanceof BytesMessage) {
114 BytesMessage bytesMsg = (BytesMessage)message;
115 bytesMsg.reset();
116 ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
117 msg.setConnection(connection);
118 try {
119 for (;;) {
120 // Reads a byte from the message stream until the stream
121 // is empty
122 msg.writeByte(bytesMsg.readByte());
123 }
124 } catch (MessageEOFException e) {
125 // if an end of message stream as expected
126 } catch (JMSException e) {
127 }
128
129 activeMessage = msg;
130 } else if (message instanceof MapMessage) {
131 MapMessage mapMsg = (MapMessage)message;
132 ActiveMQMapMessage msg = new ActiveMQMapMessage();
133 msg.setConnection(connection);
134 Enumeration iter = mapMsg.getMapNames();
135
136 while (iter.hasMoreElements()) {
137 String name = iter.nextElement().toString();
138 msg.setObject(name, mapMsg.getObject(name));
139 }
140
141 activeMessage = msg;
142 } else if (message instanceof ObjectMessage) {
143 ObjectMessage objMsg = (ObjectMessage)message;
144 ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
145 msg.setConnection(connection);
146 msg.setObject(objMsg.getObject());
147 msg.storeContent();
148 activeMessage = msg;
149 } else if (message instanceof StreamMessage) {
150 StreamMessage streamMessage = (StreamMessage)message;
151 streamMessage.reset();
152 ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
153 msg.setConnection(connection);
154 Object obj = null;
155
156 try {
157 while ((obj = streamMessage.readObject()) != null) {
158 msg.writeObject(obj);
159 }
160 } catch (MessageEOFException e) {
161 // if an end of message stream as expected
162 } catch (JMSException e) {
163 }
164
165 activeMessage = msg;
166 } else if (message instanceof TextMessage) {
167 TextMessage textMsg = (TextMessage)message;
168 ActiveMQTextMessage msg = new ActiveMQTextMessage();
169 msg.setConnection(connection);
170 msg.setText(textMsg.getText());
171 activeMessage = msg;
172 } else if (message instanceof BlobMessage) {
173 BlobMessage blobMessage = (BlobMessage)message;
174 ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
175 msg.setConnection(connection);
176 msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy()));
177 try {
178 msg.setURL(blobMessage.getURL());
179 } catch (MalformedURLException e) {
180
181 }
182 activeMessage = msg;
183 } else {
184 activeMessage = new ActiveMQMessage();
185 activeMessage.setConnection(connection);
186 }
187
188 copyProperties(message, activeMessage);
189
190 return activeMessage;
191 }
192 }
193
194 /**
195 * Copies the standard JMS and user defined properties from the givem
196 * message to the specified message
197 *
198 * @param fromMessage the message to take the properties from
199 * @param toMessage the message to add the properties to
200 * @throws JMSException
201 */
202 public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
203 toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
204 toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
205 toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
206 toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
207 toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
208 toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
209 toMessage.setJMSType(fromMessage.getJMSType());
210 toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
211 toMessage.setJMSPriority(fromMessage.getJMSPriority());
212 toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
213
214 Enumeration propertyNames = fromMessage.getPropertyNames();
215
216 while (propertyNames.hasMoreElements()) {
217 String name = propertyNames.nextElement().toString();
218 Object obj = fromMessage.getObjectProperty(name);
219 toMessage.setObjectProperty(name, obj);
220 }
221 }
222 }