001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.mqtt;
018
019 import java.io.IOException;
020
021 import javax.jms.JMSException;
022 import org.apache.activemq.transport.tcp.TcpTransport;
023 import org.fusesource.hawtbuf.DataByteArrayInputStream;
024 import org.fusesource.hawtbuf.DataByteArrayOutputStream;
025 import org.fusesource.mqtt.codec.*;
026
027 public class MQTTCodec {
028
029 TcpTransport transport;
030
031 DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream();
032 boolean processedHeader = false;
033 String action;
034 byte header;
035 int contentLength = -1;
036 int previousByte = -1;
037 int payLoadRead = 0;
038
039 public MQTTCodec(TcpTransport transport) {
040 this.transport = transport;
041 }
042
043 public void parse(DataByteArrayInputStream input, int readSize) throws Exception {
044 int i = 0;
045 byte b;
046 while (i++ < readSize) {
047 b = input.readByte();
048 // skip repeating nulls
049 if (!processedHeader && b == 0) {
050 previousByte = 0;
051 continue;
052 }
053
054 if (!processedHeader) {
055 i += processHeader(b, input);
056 if (contentLength == 0) {
057 processCommand();
058 }
059
060 } else {
061
062 if (contentLength == -1) {
063 // end of command reached, unmarshal
064 if (b == 0) {
065 processCommand();
066 } else {
067 currentCommand.write(b);
068 }
069 } else {
070 // read desired content length
071 if (payLoadRead == contentLength) {
072 processCommand();
073 i += processHeader(b, input);
074 } else {
075 currentCommand.write(b);
076 payLoadRead++;
077 }
078 }
079 }
080
081 previousByte = b;
082 }
083 if (processedHeader && payLoadRead == contentLength) {
084 processCommand();
085 }
086 }
087
088 /**
089 * sets the content length
090 *
091 * @return number of bytes read
092 */
093 private int processHeader(byte header, DataByteArrayInputStream input) {
094 this.header = header;
095 byte digit;
096 int multiplier = 1;
097 int read = 0;
098 int length = 0;
099 do {
100 digit = input.readByte();
101 length += (digit & 0x7F) * multiplier;
102 multiplier <<= 7;
103 read++;
104 } while ((digit & 0x80) != 0);
105
106 contentLength = length;
107 processedHeader = true;
108 return read;
109 }
110
111
112 private void processCommand() throws Exception {
113 MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header);
114 transport.doConsume(frame);
115 processedHeader = false;
116 currentCommand.reset();
117 contentLength = -1;
118 payLoadRead = 0;
119 }
120
121 public static String commandType(byte header) throws IOException, JMSException {
122
123 byte messageType = (byte) ((header & 0xF0) >>> 4);
124 switch (messageType) {
125 case PINGREQ.TYPE: {
126 return "PINGREQ";
127 }
128 case CONNECT.TYPE: {
129 return "CONNECT";
130 }
131 case DISCONNECT.TYPE: {
132 return "DISCONNECT";
133 }
134 case SUBSCRIBE.TYPE: {
135 return "SUBSCRIBE";
136 }
137 case UNSUBSCRIBE.TYPE: {
138 return "UNSUBSCRIBE";
139 }
140 case PUBLISH.TYPE: {
141 return "PUBLISH";
142 }
143 case PUBACK.TYPE: {
144 return "PUBACK";
145 }
146 case PUBREC.TYPE: {
147 return "PUBREC";
148 }
149 case PUBREL.TYPE: {
150 return "PUBREL";
151 }
152 case PUBCOMP.TYPE: {
153 return "PUBCOMP";
154 }
155 default:
156 return "UNKNOWN";
157 }
158
159 }
160
161 }