001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import org.apache.activemq.command.*;
020
021 import javax.jms.JMSException;
022 import java.io.IOException;
023 import java.util.Iterator;
024 import java.util.LinkedHashMap;
025 import java.util.LinkedList;
026 import java.util.Map;
027 import java.util.Map.Entry;
028
029 /**
030 * Keeps track of the STOMP subscription so that acking is correctly done.
031 *
032 * @author <a href="http://hiramchirino.com">chirino</a>
033 */
034 public class StompSubscription {
035
036 public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
037 public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
038 public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
039
040 protected final ProtocolConverter protocolConverter;
041 protected final String subscriptionId;
042 protected final ConsumerInfo consumerInfo;
043
044 protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
045 protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
046
047 protected String ackMode = AUTO_ACK;
048 protected ActiveMQDestination destination;
049 protected String transformation;
050
051 public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
052 this.protocolConverter = stompTransport;
053 this.subscriptionId = subscriptionId;
054 this.consumerInfo = consumerInfo;
055 this.transformation = transformation;
056 }
057
058 void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
059 ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
060 if (ackMode == CLIENT_ACK) {
061 synchronized (this) {
062 dispatchedMessage.put(message.getMessageId(), md);
063 }
064 } else if (ackMode == INDIVIDUAL_ACK) {
065 synchronized (this) {
066 dispatchedMessage.put(message.getMessageId(), md);
067 }
068 } else if (ackMode == AUTO_ACK) {
069 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
070 protocolConverter.getStompTransport().sendToActiveMQ(ack);
071 }
072
073 boolean ignoreTransformation = false;
074
075 if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
076 message.setReadOnlyProperties(false);
077 message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
078 } else {
079 if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
080 ignoreTransformation = true;
081 }
082 }
083
084 StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
085
086 command.setAction(Stomp.Responses.MESSAGE);
087 if (subscriptionId != null) {
088 command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
089 }
090
091 protocolConverter.getStompTransport().sendToStomp(command);
092 }
093
094 synchronized void onStompAbort(TransactionId transactionId) {
095 unconsumedMessage.clear();
096 }
097
098 synchronized void onStompCommit(TransactionId transactionId) {
099 for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
100 @SuppressWarnings("rawtypes")
101 Map.Entry entry = (Entry)iter.next();
102 MessageDispatch msg = (MessageDispatch)entry.getValue();
103 if (unconsumedMessage.contains(msg)) {
104 iter.remove();
105 }
106 }
107
108 if (!unconsumedMessage.isEmpty()) {
109 MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
110 protocolConverter.getStompTransport().sendToActiveMQ(ack);
111 unconsumedMessage.clear();
112 }
113 }
114
115 synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
116
117 MessageId msgId = new MessageId(messageId);
118
119 if (!dispatchedMessage.containsKey(msgId)) {
120 return null;
121 }
122
123 MessageAck ack = new MessageAck();
124 ack.setDestination(consumerInfo.getDestination());
125 ack.setConsumerId(consumerInfo.getConsumerId());
126
127 if (ackMode == CLIENT_ACK) {
128 if (transactionId == null) {
129 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
130 } else {
131 ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
132 }
133 int count = 0;
134 for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
135
136 @SuppressWarnings("rawtypes")
137 Map.Entry entry = (Entry)iter.next();
138 MessageId id = (MessageId)entry.getKey();
139 MessageDispatch msg = (MessageDispatch)entry.getValue();
140
141 if (transactionId != null) {
142 if (!unconsumedMessage.contains(msg)) {
143 unconsumedMessage.add(msg);
144 count++;
145 }
146 } else {
147 iter.remove();
148 count++;
149 }
150
151 if (id.equals(msgId)) {
152 ack.setLastMessageId(id);
153 break;
154 }
155 }
156 ack.setMessageCount(count);
157 if (transactionId != null) {
158 ack.setTransactionId(transactionId);
159 }
160
161 } else if (ackMode == INDIVIDUAL_ACK) {
162 ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
163 ack.setMessageID(msgId);
164 if (transactionId != null) {
165 unconsumedMessage.add(dispatchedMessage.get(msgId));
166 ack.setTransactionId(transactionId);
167 }
168 dispatchedMessage.remove(msgId);
169 }
170 return ack;
171 }
172
173 public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
174
175 MessageId msgId = new MessageId(messageId);
176
177 if (!dispatchedMessage.containsKey(msgId)) {
178 return null;
179 }
180
181 MessageAck ack = new MessageAck();
182 ack.setDestination(consumerInfo.getDestination());
183 ack.setConsumerId(consumerInfo.getConsumerId());
184 ack.setAckType(MessageAck.POSION_ACK_TYPE);
185 ack.setMessageID(msgId);
186 if (transactionId != null) {
187 unconsumedMessage.add(dispatchedMessage.get(msgId));
188 ack.setTransactionId(transactionId);
189 }
190 dispatchedMessage.remove(msgId);
191
192 return ack;
193 }
194
195 public String getAckMode() {
196 return ackMode;
197 }
198
199 public void setAckMode(String ackMode) {
200 this.ackMode = ackMode;
201 }
202
203 public String getSubscriptionId() {
204 return subscriptionId;
205 }
206
207 public void setDestination(ActiveMQDestination destination) {
208 this.destination = destination;
209 }
210
211 public ActiveMQDestination getDestination() {
212 return destination;
213 }
214
215 public ConsumerInfo getConsumerInfo() {
216 return consumerInfo;
217 }
218 }