001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import org.apache.activemq.broker.BrokerContext;
020 import org.apache.activemq.broker.BrokerContextAware;
021 import org.apache.activemq.command.*;
022 import org.apache.activemq.util.ByteArrayOutputStream;
023 import org.apache.activemq.util.*;
024 import org.slf4j.Logger;
025 import org.slf4j.LoggerFactory;
026
027 import javax.jms.JMSException;
028 import java.io.*;
029 import java.util.*;
030 import java.util.concurrent.ConcurrentHashMap;
031 import java.util.concurrent.atomic.AtomicBoolean;
032
033 /**
034 * @author <a href="http://hiramchirino.com">chirino</a>
035 */
036 public class ProtocolConverter {
037
038 private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
039
040 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
041
042 private static final String BROKER_VERSION;
043 private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
044
045 static {
046 InputStream in = null;
047 String version = "5.6.0";
048 if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
049 BufferedReader reader = new BufferedReader(new InputStreamReader(in));
050 try {
051 version = reader.readLine();
052 } catch(Exception e) {
053 }
054 }
055 BROKER_VERSION = version;
056 }
057
058 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
059 private final SessionId sessionId = new SessionId(connectionId, -1);
060 private final ProducerId producerId = new ProducerId(sessionId, 1);
061
062 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
063 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
064 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
065 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
066
067 private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
068 private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
069 private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
070 private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
071 private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
072 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
073 private final StompTransport stompTransport;
074
075 private final Object commnadIdMutex = new Object();
076 private int lastCommandId;
077 private final AtomicBoolean connected = new AtomicBoolean(false);
078 private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
079 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
080 private final BrokerContext brokerContext;
081 private String version = "1.0";
082 private long hbReadInterval;
083 private long hbWriteInterval;
084 private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
085
086 public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
087 this.stompTransport = stompTransport;
088 this.brokerContext = brokerContext;
089 }
090
091 protected int generateCommandId() {
092 synchronized (commnadIdMutex) {
093 return lastCommandId++;
094 }
095 }
096
097 protected ResponseHandler createResponseHandler(final StompFrame command) {
098 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
099 if (receiptId != null) {
100 return new ResponseHandler() {
101 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
102 if (response.isException()) {
103 // Generally a command can fail.. but that does not invalidate the connection.
104 // We report back the failure but we don't close the connection.
105 Throwable exception = ((ExceptionResponse)response).getException();
106 handleException(exception, command);
107 } else {
108 StompFrame sc = new StompFrame();
109 sc.setAction(Stomp.Responses.RECEIPT);
110 sc.setHeaders(new HashMap<String, String>(1));
111 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
112 stompTransport.sendToStomp(sc);
113 }
114 }
115 };
116 }
117 return null;
118 }
119
120 protected void sendToActiveMQ(Command command, ResponseHandler handler) {
121 command.setCommandId(generateCommandId());
122 if (handler != null) {
123 command.setResponseRequired(true);
124 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
125 }
126 stompTransport.sendToActiveMQ(command);
127 }
128
129 protected void sendToStomp(StompFrame command) throws IOException {
130 stompTransport.sendToStomp(command);
131 }
132
133 protected FrameTranslator findTranslator(String header) {
134 FrameTranslator translator = frameTranslator;
135 try {
136 if (header != null) {
137 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
138 .newInstance(header);
139 if (translator instanceof BrokerContextAware) {
140 ((BrokerContextAware)translator).setBrokerContext(brokerContext);
141 }
142 }
143 } catch (Exception ignore) {
144 // if anything goes wrong use the default translator
145 }
146
147 return translator;
148 }
149
150 /**
151 * Convert a stomp command
152 *
153 * @param command
154 */
155 public void onStompCommand(StompFrame command) throws IOException, JMSException {
156 try {
157
158 if (command.getClass() == StompFrameError.class) {
159 throw ((StompFrameError)command).getException();
160 }
161
162 String action = command.getAction();
163 if (action.startsWith(Stomp.Commands.SEND)) {
164 onStompSend(command);
165 } else if (action.startsWith(Stomp.Commands.ACK)) {
166 onStompAck(command);
167 } else if (action.startsWith(Stomp.Commands.NACK)) {
168 onStompNack(command);
169 } else if (action.startsWith(Stomp.Commands.BEGIN)) {
170 onStompBegin(command);
171 } else if (action.startsWith(Stomp.Commands.COMMIT)) {
172 onStompCommit(command);
173 } else if (action.startsWith(Stomp.Commands.ABORT)) {
174 onStompAbort(command);
175 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
176 onStompSubscribe(command);
177 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
178 onStompUnsubscribe(command);
179 } else if (action.startsWith(Stomp.Commands.CONNECT) ||
180 action.startsWith(Stomp.Commands.STOMP)) {
181 onStompConnect(command);
182 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
183 onStompDisconnect(command);
184 } else {
185 throw new ProtocolException("Unknown STOMP action: " + action);
186 }
187
188 } catch (ProtocolException e) {
189 handleException(e, command);
190 // Some protocol errors can cause the connection to get closed.
191 if (e.isFatal()) {
192 getStompTransport().onException(e);
193 }
194 }
195 }
196
197 protected void handleException(Throwable exception, StompFrame command) throws IOException {
198 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
199 if (LOG.isDebugEnabled()) {
200 LOG.debug("Exception detail", exception);
201 }
202
203 // Let the stomp client know about any protocol errors.
204 ByteArrayOutputStream baos = new ByteArrayOutputStream();
205 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
206 exception.printStackTrace(stream);
207 stream.close();
208
209 HashMap<String, String> headers = new HashMap<String, String>();
210 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
211 headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
212
213 if (command != null) {
214 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
215 if (receiptId != null) {
216 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
217 }
218 }
219
220 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
221 sendToStomp(errorMessage);
222 }
223
224 protected void onStompSend(StompFrame command) throws IOException, JMSException {
225 checkConnected();
226
227 Map<String, String> headers = command.getHeaders();
228 String destination = headers.get(Stomp.Headers.Send.DESTINATION);
229 if (destination == null) {
230 throw new ProtocolException("SEND received without a Destination specified!");
231 }
232
233 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
234 headers.remove("transaction");
235
236 ActiveMQMessage message = convertMessage(command);
237
238 message.setProducerId(producerId);
239 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
240 message.setMessageId(id);
241 message.setJMSTimestamp(System.currentTimeMillis());
242
243 if (stompTx != null) {
244 TransactionId activemqTx = transactions.get(stompTx);
245 if (activemqTx == null) {
246 throw new ProtocolException("Invalid transaction id: " + stompTx);
247 }
248 message.setTransactionId(activemqTx);
249 }
250
251 message.onSend();
252 sendToActiveMQ(message, createResponseHandler(command));
253 }
254
255 protected void onStompNack(StompFrame command) throws ProtocolException {
256
257 checkConnected();
258
259 if (this.version.equals(Stomp.V1_0)) {
260 throw new ProtocolException("NACK received but connection is in v1.0 mode.");
261 }
262
263 Map<String, String> headers = command.getHeaders();
264
265 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
266 if (subscriptionId == null) {
267 throw new ProtocolException("NACK received without a subscription id for acknowledge!");
268 }
269
270 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
271 if (messageId == null) {
272 throw new ProtocolException("NACK received without a message-id to acknowledge!");
273 }
274
275 TransactionId activemqTx = null;
276 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
277 if (stompTx != null) {
278 activemqTx = transactions.get(stompTx);
279 if (activemqTx == null) {
280 throw new ProtocolException("Invalid transaction id: " + stompTx);
281 }
282 }
283
284 if (subscriptionId != null) {
285 StompSubscription sub = this.subscriptions.get(subscriptionId);
286 if (sub != null) {
287 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
288 if (ack != null) {
289 sendToActiveMQ(ack, createResponseHandler(command));
290 } else {
291 throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
292 }
293 }
294 }
295 }
296
297 protected void onStompAck(StompFrame command) throws ProtocolException {
298 checkConnected();
299
300 Map<String, String> headers = command.getHeaders();
301 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
302 if (messageId == null) {
303 throw new ProtocolException("ACK received without a message-id to acknowledge!");
304 }
305
306 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
307 if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
308 throw new ProtocolException("ACK received without a subscription id for acknowledge!");
309 }
310
311 TransactionId activemqTx = null;
312 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
313 if (stompTx != null) {
314 activemqTx = transactions.get(stompTx);
315 if (activemqTx == null) {
316 throw new ProtocolException("Invalid transaction id: " + stompTx);
317 }
318 }
319
320 boolean acked = false;
321
322 if (subscriptionId != null) {
323
324 StompSubscription sub = this.subscriptions.get(subscriptionId);
325 if (sub != null) {
326 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
327 if (ack != null) {
328 sendToActiveMQ(ack, createResponseHandler(command));
329 acked = true;
330 }
331 }
332
333 } else {
334
335 // TODO: acking with just a message id is very bogus since the same message id
336 // could have been sent to 2 different subscriptions on the same Stomp connection.
337 // For example, when 2 subs are created on the same topic.
338
339 for (StompSubscription sub : subscriptionsByConsumerId.values()) {
340 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
341 if (ack != null) {
342 sendToActiveMQ(ack, createResponseHandler(command));
343 acked = true;
344 break;
345 }
346 }
347 }
348
349 if (!acked) {
350 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
351 }
352 }
353
354 protected void onStompBegin(StompFrame command) throws ProtocolException {
355 checkConnected();
356
357 Map<String, String> headers = command.getHeaders();
358
359 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
360
361 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
362 throw new ProtocolException("Must specify the transaction you are beginning");
363 }
364
365 if (transactions.get(stompTx) != null) {
366 throw new ProtocolException("The transaction was allready started: " + stompTx);
367 }
368
369 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
370 transactions.put(stompTx, activemqTx);
371
372 TransactionInfo tx = new TransactionInfo();
373 tx.setConnectionId(connectionId);
374 tx.setTransactionId(activemqTx);
375 tx.setType(TransactionInfo.BEGIN);
376
377 sendToActiveMQ(tx, createResponseHandler(command));
378 }
379
380 protected void onStompCommit(StompFrame command) throws ProtocolException {
381 checkConnected();
382
383 Map<String, String> headers = command.getHeaders();
384
385 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
386 if (stompTx == null) {
387 throw new ProtocolException("Must specify the transaction you are committing");
388 }
389
390 TransactionId activemqTx = transactions.remove(stompTx);
391 if (activemqTx == null) {
392 throw new ProtocolException("Invalid transaction id: " + stompTx);
393 }
394
395 for (StompSubscription sub : subscriptionsByConsumerId.values()) {
396 sub.onStompCommit(activemqTx);
397 }
398
399 TransactionInfo tx = new TransactionInfo();
400 tx.setConnectionId(connectionId);
401 tx.setTransactionId(activemqTx);
402 tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
403
404 sendToActiveMQ(tx, createResponseHandler(command));
405 }
406
407 protected void onStompAbort(StompFrame command) throws ProtocolException {
408 checkConnected();
409 Map<String, String> headers = command.getHeaders();
410
411 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
412 if (stompTx == null) {
413 throw new ProtocolException("Must specify the transaction you are committing");
414 }
415
416 TransactionId activemqTx = transactions.remove(stompTx);
417 if (activemqTx == null) {
418 throw new ProtocolException("Invalid transaction id: " + stompTx);
419 }
420 for (StompSubscription sub : subscriptionsByConsumerId.values()) {
421 try {
422 sub.onStompAbort(activemqTx);
423 } catch (Exception e) {
424 throw new ProtocolException("Transaction abort failed", false, e);
425 }
426 }
427
428 TransactionInfo tx = new TransactionInfo();
429 tx.setConnectionId(connectionId);
430 tx.setTransactionId(activemqTx);
431 tx.setType(TransactionInfo.ROLLBACK);
432
433 sendToActiveMQ(tx, createResponseHandler(command));
434 }
435
436 protected void onStompSubscribe(StompFrame command) throws ProtocolException {
437 checkConnected();
438 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
439 Map<String, String> headers = command.getHeaders();
440
441 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
442 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
443
444 if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
445 throw new ProtocolException("SUBSCRIBE received without a subscription id!");
446 }
447
448 ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
449
450 if (actualDest == null) {
451 throw new ProtocolException("Invalid Destination.");
452 }
453
454 ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
455 ConsumerInfo consumerInfo = new ConsumerInfo(id);
456 consumerInfo.setPrefetchSize(1000);
457 consumerInfo.setDispatchAsync(true);
458
459 String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
460 if (browser != null && browser.equals(Stomp.TRUE)) {
461
462 if (!this.version.equals(Stomp.V1_1)) {
463 throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
464 }
465
466 consumerInfo.setBrowser(true);
467 }
468
469 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
470 consumerInfo.setSelector(selector);
471
472 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
473
474 consumerInfo.setDestination(translator.convertDestination(this, destination, true));
475
476 StompSubscription stompSubscription;
477 if (!consumerInfo.isBrowser()) {
478 stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
479 } else {
480 stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
481 }
482 stompSubscription.setDestination(actualDest);
483
484 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
485 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
486 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
487 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
488 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
489 } else {
490 stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
491 }
492
493 subscriptionsByConsumerId.put(id, stompSubscription);
494 // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
495 if (subscriptionId != null) {
496 subscriptions.put(subscriptionId, stompSubscription);
497 }
498
499 // dispatch can beat the receipt so send it early
500 sendReceipt(command);
501 sendToActiveMQ(consumerInfo, null);
502 }
503
504 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
505 checkConnected();
506 Map<String, String> headers = command.getHeaders();
507
508 ActiveMQDestination destination = null;
509 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
510 if (o != null) {
511 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
512 }
513
514 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
515 if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
516 throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
517 }
518
519 if (subscriptionId == null && destination == null) {
520 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
521 }
522
523 // check if it is a durable subscription
524 String durable = command.getHeaders().get("activemq.subscriptionName");
525 if (durable != null) {
526 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
527 info.setClientId(durable);
528 info.setSubscriptionName(durable);
529 info.setConnectionId(connectionId);
530 sendToActiveMQ(info, createResponseHandler(command));
531 return;
532 }
533
534 if (subscriptionId != null) {
535
536 StompSubscription sub = this.subscriptions.remove(subscriptionId);
537 if (sub != null) {
538 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
539 return;
540 }
541
542 } else {
543
544 // Unsubscribing using a destination is a bit weird if multiple subscriptions
545 // are created with the same destination.
546 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
547 StompSubscription sub = iter.next();
548 if (destination != null && destination.equals(sub.getDestination())) {
549 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
550 iter.remove();
551 return;
552 }
553 }
554 }
555
556 throw new ProtocolException("No subscription matched.");
557 }
558
559 ConnectionInfo connectionInfo = new ConnectionInfo();
560
561 protected void onStompConnect(final StompFrame command) throws ProtocolException {
562
563 if (connected.get()) {
564 throw new ProtocolException("Allready connected.");
565 }
566
567 final Map<String, String> headers = command.getHeaders();
568
569 // allow anyone to login for now
570 String login = headers.get(Stomp.Headers.Connect.LOGIN);
571 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
572 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
573 String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
574 String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
575
576 if (accepts == null) {
577 accepts = Stomp.DEFAULT_VERSION;
578 }
579 if (heartBeat == null) {
580 heartBeat = defaultHeartBeat;
581 }
582
583 HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
584 acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
585 if (acceptsVersions.isEmpty()) {
586 throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
587 Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
588 } else {
589 this.version = Collections.max(acceptsVersions);
590 }
591
592 configureInactivityMonitor(heartBeat);
593
594 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
595 connectionInfo.setConnectionId(connectionId);
596 if (clientId != null) {
597 connectionInfo.setClientId(clientId);
598 } else {
599 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
600 }
601
602 connectionInfo.setResponseRequired(true);
603 connectionInfo.setUserName(login);
604 connectionInfo.setPassword(passcode);
605 connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
606
607 sendToActiveMQ(connectionInfo, new ResponseHandler() {
608 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
609
610 if (response.isException()) {
611 // If the connection attempt fails we close the socket.
612 Throwable exception = ((ExceptionResponse)response).getException();
613 handleException(exception, command);
614 getStompTransport().onException(IOExceptionSupport.create(exception));
615 return;
616 }
617
618 final SessionInfo sessionInfo = new SessionInfo(sessionId);
619 sendToActiveMQ(sessionInfo, null);
620
621 final ProducerInfo producerInfo = new ProducerInfo(producerId);
622 sendToActiveMQ(producerInfo, new ResponseHandler() {
623 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
624
625 if (response.isException()) {
626 // If the connection attempt fails we close the socket.
627 Throwable exception = ((ExceptionResponse)response).getException();
628 handleException(exception, command);
629 getStompTransport().onException(IOExceptionSupport.create(exception));
630 }
631
632 connected.set(true);
633 HashMap<String, String> responseHeaders = new HashMap<String, String>();
634
635 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
636 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
637 if (requestId == null) {
638 // TODO legacy
639 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
640 }
641 if (requestId != null) {
642 // TODO legacy
643 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
644 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
645 }
646
647 responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
648 responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
649 String.format("%d,%d", hbWriteInterval, hbReadInterval));
650 responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
651
652 StompFrame sc = new StompFrame();
653 sc.setAction(Stomp.Responses.CONNECTED);
654 sc.setHeaders(responseHeaders);
655 sendToStomp(sc);
656
657 if (version.equals(Stomp.V1_1)) {
658 StompWireFormat format = stompTransport.getWireFormat();
659 if (format != null) {
660 format.setEncodingEnabled(true);
661 }
662 }
663 }
664 });
665
666 }
667 });
668 }
669
670 protected void onStompDisconnect(StompFrame command) throws ProtocolException {
671 checkConnected();
672 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
673 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
674 connected.set(false);
675 }
676
677 protected void checkConnected() throws ProtocolException {
678 if (!connected.get()) {
679 throw new ProtocolException("Not connected.");
680 }
681 }
682
683 /**
684 * Dispatch a ActiveMQ command
685 *
686 * @param command
687 * @throws IOException
688 */
689 public void onActiveMQCommand(Command command) throws IOException, JMSException {
690 if (command.isResponse()) {
691 Response response = (Response)command;
692 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
693 if (rh != null) {
694 rh.onResponse(this, response);
695 } else {
696 // Pass down any unexpected errors. Should this close the connection?
697 if (response.isException()) {
698 Throwable exception = ((ExceptionResponse)response).getException();
699 handleException(exception, null);
700 }
701 }
702 } else if (command.isMessageDispatch()) {
703 MessageDispatch md = (MessageDispatch)command;
704 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
705 if (sub != null) {
706 sub.onMessageDispatch(md);
707 }
708 } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
709 stompTransport.sendToStomp(ping);
710 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
711 // Pass down any unexpected async errors. Should this close the connection?
712 Throwable exception = ((ConnectionError)command).getException();
713 handleException(exception, null);
714 }
715 }
716
717 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
718 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
719 return msg;
720 }
721
722 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
723 if (ignoreTransformation == true) {
724 return frameTranslator.convertMessage(this, message);
725 } else {
726 return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
727 }
728 }
729
730 public StompTransport getStompTransport() {
731 return stompTransport;
732 }
733
734 public ActiveMQDestination createTempDestination(String name, boolean topic) {
735 ActiveMQDestination rc = tempDestinations.get(name);
736 if( rc == null ) {
737 if (topic) {
738 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
739 } else {
740 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
741 }
742 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
743 tempDestinations.put(name, rc);
744 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
745 }
746 return rc;
747 }
748
749 public String getCreatedTempDestinationName(ActiveMQDestination destination) {
750 return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
751 }
752
753 public String getDefaultHeartBeat() {
754 return defaultHeartBeat;
755 }
756
757 public void setDefaultHeartBeat(String defaultHeartBeat) {
758 this.defaultHeartBeat = defaultHeartBeat;
759 }
760
761 protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
762
763 String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
764
765 if (keepAliveOpts == null || keepAliveOpts.length != 2) {
766 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
767 } else {
768
769 try {
770 hbReadInterval = Long.parseLong(keepAliveOpts[0]);
771 hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
772 } catch(NumberFormatException e) {
773 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
774 }
775
776 try {
777
778 StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
779
780 monitor.setReadCheckTime(hbReadInterval);
781 monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
782 monitor.setWriteCheckTime(hbWriteInterval);
783
784 monitor.startMonitoring();
785
786 } catch(Exception ex) {
787 hbReadInterval = 0;
788 hbWriteInterval = 0;
789 }
790
791 if (LOG.isDebugEnabled()) {
792 LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
793 }
794 }
795 }
796
797 protected void sendReceipt(StompFrame command) {
798 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
799 if (receiptId != null) {
800 StompFrame sc = new StompFrame();
801 sc.setAction(Stomp.Responses.RECEIPT);
802 sc.setHeaders(new HashMap<String, String>(1));
803 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
804 try {
805 sendToStomp(sc);
806 } catch (IOException e) {
807 LOG.warn("Could not send a receipt for " + command, e);
808 }
809 }
810 }
811 }