001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport.stomp;
019
020 import java.io.ByteArrayOutputStream;
021 import java.io.DataInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.OutputStream;
025 import java.net.Socket;
026 import java.net.UnknownHostException;
027 import java.util.HashMap;
028
029 public class StompConnection {
030
031 public static final long RECEIVE_TIMEOUT = 10000;
032
033 private Socket stompSocket;
034 private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035
036 public void open(String host, int port) throws IOException, UnknownHostException {
037 open(new Socket(host, port));
038 }
039
040 public void open(Socket socket) {
041 stompSocket = socket;
042 }
043
044 public void close() throws IOException {
045 if (stompSocket != null) {
046 stompSocket.close();
047 stompSocket = null;
048 }
049 }
050
051 public void sendFrame(String data) throws Exception {
052 byte[] bytes = data.getBytes("UTF-8");
053 OutputStream outputStream = stompSocket.getOutputStream();
054 outputStream.write(bytes);
055 outputStream.flush();
056 }
057
058 public void sendFrame(String frame, byte[] data) throws Exception {
059 byte[] bytes = frame.getBytes("UTF-8");
060 OutputStream outputStream = stompSocket.getOutputStream();
061 outputStream.write(bytes);
062 outputStream.write(data);
063 outputStream.flush();
064 }
065
066 public StompFrame receive() throws Exception {
067 return receive(RECEIVE_TIMEOUT);
068 }
069
070 public StompFrame receive(long timeOut) throws Exception {
071 stompSocket.setSoTimeout((int)timeOut);
072 InputStream is = stompSocket.getInputStream();
073 StompWireFormat wf = new StompWireFormat();
074 DataInputStream dis = new DataInputStream(is);
075 return (StompFrame)wf.unmarshal(dis);
076 }
077
078 public String receiveFrame() throws Exception {
079 return receiveFrame(RECEIVE_TIMEOUT);
080 }
081
082 public String receiveFrame(long timeOut) throws Exception {
083 stompSocket.setSoTimeout((int)timeOut);
084 InputStream is = stompSocket.getInputStream();
085 int c = 0;
086 for (;;) {
087 c = is.read();
088 if (c < 0) {
089 throw new IOException("socket closed.");
090 } else if (c == 0) {
091 c = is.read();
092 if (c == '\n') {
093 // end of frame
094 return stringFromBuffer(inputBuffer);
095 } else {
096 inputBuffer.write(0);
097 inputBuffer.write(c);
098 }
099 } else {
100 inputBuffer.write(c);
101 }
102 }
103 }
104
105 private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
106 byte[] ba = inputBuffer.toByteArray();
107 inputBuffer.reset();
108 return new String(ba, "UTF-8");
109 }
110
111 public Socket getStompSocket() {
112 return stompSocket;
113 }
114
115 public void setStompSocket(Socket stompSocket) {
116 this.stompSocket = stompSocket;
117 }
118
119 public void connect(String username, String password) throws Exception {
120 connect(username, password, null);
121 }
122
123 public void connect(String username, String password, String client) throws Exception {
124 HashMap<String, String> headers = new HashMap<String, String>();
125 headers.put("login", username);
126 headers.put("passcode", password);
127 if (client != null) {
128 headers.put("client-id", client);
129 }
130 StompFrame frame = new StompFrame("CONNECT", headers);
131 sendFrame(frame.format());
132
133 StompFrame connect = receive();
134 if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
135 throw new Exception ("Not connected: " + connect.getBody());
136 }
137 }
138
139 public void disconnect() throws Exception {
140 StompFrame frame = new StompFrame("DISCONNECT");
141 sendFrame(frame.format());
142 }
143
144 public void send(String destination, String message) throws Exception {
145 send(destination, message, null, null);
146 }
147
148 public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
149 if (headers == null) {
150 headers = new HashMap<String, String>();
151 }
152 headers.put("destination", destination);
153 if (transaction != null) {
154 headers.put("transaction", transaction);
155 }
156 StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
157 sendFrame(frame.format());
158 }
159
160 public void subscribe(String destination) throws Exception {
161 subscribe(destination, null, null);
162 }
163
164 public void subscribe(String destination, String ack) throws Exception {
165 subscribe(destination, ack, new HashMap<String, String>());
166 }
167
168 public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
169 if (headers == null) {
170 headers = new HashMap<String, String>();
171 }
172 headers.put("destination", destination);
173 if (ack != null) {
174 headers.put("ack", ack);
175 }
176 StompFrame frame = new StompFrame("SUBSCRIBE", headers);
177 sendFrame(frame.format());
178 }
179
180 public void unsubscribe(String destination) throws Exception {
181 unsubscribe(destination, null);
182 }
183
184 public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
185 if (headers == null) {
186 headers = new HashMap<String, String>();
187 }
188 headers.put("destination", destination);
189 StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
190 sendFrame(frame.format());
191 }
192
193 public void begin(String transaction) throws Exception {
194 HashMap<String, String> headers = new HashMap<String, String>();
195 headers.put("transaction", transaction);
196 StompFrame frame = new StompFrame("BEGIN", headers);
197 sendFrame(frame.format());
198 }
199
200 public void abort(String transaction) throws Exception {
201 HashMap<String, String> headers = new HashMap<String, String>();
202 headers.put("transaction", transaction);
203 StompFrame frame = new StompFrame("ABORT", headers);
204 sendFrame(frame.format());
205 }
206
207 public void commit(String transaction) throws Exception {
208 HashMap<String, String> headers = new HashMap<String, String>();
209 headers.put("transaction", transaction);
210 StompFrame frame = new StompFrame("COMMIT", headers);
211 sendFrame(frame.format());
212 }
213
214 public void ack(StompFrame frame) throws Exception {
215 ack(frame.getHeaders().get("message-id"), null);
216 }
217
218 public void ack(StompFrame frame, String transaction) throws Exception {
219 ack(frame.getHeaders().get("message-id"), transaction);
220 }
221
222 public void ack(String messageId) throws Exception {
223 ack(messageId, null);
224 }
225
226 public void ack(String messageId, String transaction) throws Exception {
227 HashMap<String, String> headers = new HashMap<String, String>();
228 headers.put("message-id", messageId);
229 if (transaction != null)
230 headers.put("transaction", transaction);
231 StompFrame frame = new StompFrame("ACK", headers);
232 sendFrame(frame.format());
233 }
234
235 protected String appendHeaders(HashMap<String, Object> headers) {
236 StringBuilder result = new StringBuilder();
237 for (String key : headers.keySet()) {
238 result.append(key + ":" + headers.get(key) + "\n");
239 }
240 result.append("\n");
241 return result.toString();
242 }
243
244 }