001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.mqtt;
018
019 import java.io.IOException;
020 import java.security.cert.X509Certificate;
021
022 import javax.jms.JMSException;
023 import org.apache.activemq.broker.BrokerContext;
024 import org.apache.activemq.command.Command;
025 import org.apache.activemq.transport.Transport;
026 import org.apache.activemq.transport.TransportFilter;
027 import org.apache.activemq.transport.TransportListener;
028 import org.apache.activemq.transport.tcp.SslTransport;
029 import org.apache.activemq.util.IOExceptionSupport;
030 import org.apache.activemq.wireformat.WireFormat;
031 import org.fusesource.mqtt.codec.MQTTFrame;
032 import org.slf4j.Logger;
033 import org.slf4j.LoggerFactory;
034
035 /**
036 * The MQTTTransportFilter normally sits on top of a TcpTransport that has been
037 * configured with the StompWireFormat and is used to convert MQTT commands to
038 * ActiveMQ commands. All of the conversion work is done by delegating to the
039 * MQTTProtocolConverter
040 */
041 public class MQTTTransportFilter extends TransportFilter implements MQTTTransport {
042 private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class);
043 private static final Logger TRACE = LoggerFactory.getLogger(MQTTTransportFilter.class.getPackage().getName() + ".MQTTIO");
044 private final MQTTProtocolConverter protocolConverter;
045 private MQTTInactivityMonitor monitor;
046 private MQTTWireFormat wireFormat;
047
048 private boolean trace;
049
050 public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
051 super(next);
052 this.protocolConverter = new MQTTProtocolConverter(this, brokerContext);
053
054 if (wireFormat instanceof MQTTWireFormat) {
055 this.wireFormat = (MQTTWireFormat) wireFormat;
056 }
057 }
058
059 public void oneway(Object o) throws IOException {
060 try {
061 final Command command = (Command) o;
062 protocolConverter.onActiveMQCommand(command);
063 } catch (Exception e) {
064 throw IOExceptionSupport.create(e);
065 }
066 }
067
068 public void onCommand(Object command) {
069 try {
070 if (trace) {
071 TRACE.trace("Received: \n" + command);
072 }
073
074 protocolConverter.onMQTTCommand((MQTTFrame) command);
075 } catch (IOException e) {
076 handleException(e);
077 } catch (JMSException e) {
078 onException(IOExceptionSupport.create(e));
079 }
080 }
081
082 public void sendToActiveMQ(Command command) {
083 TransportListener l = transportListener;
084 if (l != null) {
085 l.onCommand(command);
086 }
087 }
088
089 public void sendToMQTT(MQTTFrame command) throws IOException {
090 if (trace) {
091 TRACE.trace("Sending: \n" + command);
092 }
093 Transport n = next;
094 if (n != null) {
095 n.oneway(command);
096 }
097 }
098
099 public X509Certificate[] getPeerCertificates() {
100 if (next instanceof SslTransport) {
101 X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
102 if (trace && peerCerts != null) {
103 LOG.debug("Peer Identity has been verified\n");
104 }
105 return peerCerts;
106 }
107 return null;
108 }
109
110 public boolean isTrace() {
111 return trace;
112 }
113
114 public void setTrace(boolean trace) {
115 this.trace = trace;
116 }
117
118 @Override
119 public MQTTInactivityMonitor getInactivityMonitor() {
120 return monitor;
121 }
122
123 public void setInactivityMonitor(MQTTInactivityMonitor monitor) {
124 this.monitor = monitor;
125 }
126
127 @Override
128 public MQTTWireFormat getWireFormat() {
129 return this.wireFormat;
130 }
131
132 public void handleException(IOException e) {
133 protocolConverter.onTransportError();
134 super.onException(e);
135 }
136
137
138 }