001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.IOException;
020 import java.io.Serializable;
021 import java.io.StringReader;
022 import java.io.StringWriter;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import javax.jms.JMSException;
027
028 import org.apache.activemq.advisory.AdvisorySupport;
029 import org.apache.activemq.broker.BrokerContext;
030 import org.apache.activemq.broker.BrokerContextAware;
031 import org.apache.activemq.command.ActiveMQMapMessage;
032 import org.apache.activemq.command.ActiveMQMessage;
033 import org.apache.activemq.command.ActiveMQObjectMessage;
034 import org.apache.activemq.command.DataStructure;
035 import org.codehaus.jettison.mapped.Configuration;
036
037 import com.thoughtworks.xstream.XStream;
038 import com.thoughtworks.xstream.io.HierarchicalStreamReader;
039 import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
040 import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
041 import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
042 import com.thoughtworks.xstream.io.xml.XppReader;
043 import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
044
045 /**
046 * Frame translator implementation that uses XStream to convert messages to and
047 * from XML and JSON
048 *
049 * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
050 */
051 public class JmsFrameTranslator extends LegacyFrameTranslator implements
052 BrokerContextAware {
053
054 XStream xStream = null;
055 BrokerContext brokerContext;
056
057 public ActiveMQMessage convertFrame(ProtocolConverter converter,
058 StompFrame command) throws JMSException, ProtocolException {
059 Map<String, String> headers = command.getHeaders();
060 ActiveMQMessage msg;
061 String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
062 if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
063 msg = super.convertFrame(converter, command);
064 } else {
065 HierarchicalStreamReader in;
066
067 try {
068 String text = new String(command.getContent(), "UTF-8");
069 switch (Stomp.Transformations.getValue(transformation)) {
070 case JMS_OBJECT_XML:
071 in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
072 msg = createObjectMessage(in);
073 break;
074 case JMS_OBJECT_JSON:
075 in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
076 msg = createObjectMessage(in);
077 break;
078 case JMS_MAP_XML:
079 in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
080 msg = createMapMessage(in);
081 break;
082 case JMS_MAP_JSON:
083 in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
084 msg = createMapMessage(in);
085 break;
086 default:
087 throw new Exception("Unknown transformation: " + transformation);
088 }
089 } catch (Throwable e) {
090 command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
091 msg = super.convertFrame(converter, command);
092 }
093 }
094 FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
095 return msg;
096 }
097
098 public StompFrame convertMessage(ProtocolConverter converter,
099 ActiveMQMessage message) throws IOException, JMSException {
100 if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
101 StompFrame command = new StompFrame();
102 command.setAction(Stomp.Responses.MESSAGE);
103 Map<String, String> headers = new HashMap<String, String>(25);
104 command.setHeaders(headers);
105
106 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
107 converter, message, command, this);
108
109 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
110 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
111 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
112 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
113 }
114
115 ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
116 command.setContent(marshall(msg.getObject(),
117 headers.get(Stomp.Headers.TRANSFORMATION))
118 .getBytes("UTF-8"));
119 return command;
120
121 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
122 StompFrame command = new StompFrame();
123 command.setAction(Stomp.Responses.MESSAGE);
124 Map<String, String> headers = new HashMap<String, String>(25);
125 command.setHeaders(headers);
126
127 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
128 converter, message, command, this);
129
130 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
131 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
132 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
133 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
134 }
135
136 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
137 command.setContent(marshall((Serializable)msg.getContentMap(),
138 headers.get(Stomp.Headers.TRANSFORMATION))
139 .getBytes("UTF-8"));
140 return command;
141 } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
142 AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
143
144 StompFrame command = new StompFrame();
145 command.setAction(Stomp.Responses.MESSAGE);
146 Map<String, String> headers = new HashMap<String, String>(25);
147 command.setHeaders(headers);
148
149 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
150 converter, message, command, this);
151
152 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
153 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
154 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
155 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
156 }
157
158 String body = marshallAdvisory(message.getDataStructure(),
159 headers.get(Stomp.Headers.TRANSFORMATION));
160 command.setContent(body.getBytes("UTF-8"));
161 return command;
162 } else {
163 return super.convertMessage(converter, message);
164 }
165 }
166
167 /**
168 * Marshalls the Object to a string using XML or JSON encoding
169 */
170 protected String marshall(Serializable object, String transformation)
171 throws JMSException {
172 StringWriter buffer = new StringWriter();
173 HierarchicalStreamWriter out;
174 if (transformation.toLowerCase().endsWith("json")) {
175 out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
176 } else {
177 out = new PrettyPrintWriter(buffer);
178 }
179 getXStream().marshal(object, out);
180 return buffer.toString();
181 }
182
183 protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
184 ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
185 Object obj = getXStream().unmarshal(in);
186 objMsg.setObject((Serializable) obj);
187 return objMsg;
188 }
189
190 @SuppressWarnings("unchecked")
191 protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
192 ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
193 Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
194 for (String key : map.keySet()) {
195 mapMsg.setObject(key, map.get(key));
196 }
197 return mapMsg;
198 }
199
200 protected String marshallAdvisory(final DataStructure ds, String transformation) {
201
202 StringWriter buffer = new StringWriter();
203 HierarchicalStreamWriter out;
204 if (transformation.toLowerCase().endsWith("json")) {
205 out = new JettisonMappedXmlDriver().createWriter(buffer);
206 } else {
207 out = new PrettyPrintWriter(buffer);
208 }
209
210 XStream xstream = getXStream();
211 xstream.setMode(XStream.NO_REFERENCES);
212 xstream.aliasPackage("", "org.apache.activemq.command");
213 xstream.marshal(ds, out);
214 return buffer.toString();
215 }
216
217 // Properties
218 // -------------------------------------------------------------------------
219 public XStream getXStream() {
220 if (xStream == null) {
221 xStream = createXStream();
222 }
223 return xStream;
224 }
225
226 public void setXStream(XStream xStream) {
227 this.xStream = xStream;
228 }
229
230 // Implementation methods
231 // -------------------------------------------------------------------------
232 @SuppressWarnings("unchecked")
233 protected XStream createXStream() {
234 XStream xstream = null;
235 if (brokerContext != null) {
236 Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
237 for (XStream bean : beans.values()) {
238 if (bean != null) {
239 xstream = bean;
240 break;
241 }
242 }
243 }
244
245 if (xstream == null) {
246 xstream = XStreamSupport.createXStream();
247 xstream.ignoreUnknownElements();
248 }
249 return xstream;
250
251 }
252
253 public void setBrokerContext(BrokerContext brokerContext) {
254 this.brokerContext = brokerContext;
255 }
256
257 }