001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import org.apache.activemq.state.CommandVisitor;
020
021 /**
022 * @openwire:marshaller code="22"
023 *
024 */
025 public class MessageAck extends BaseCommand {
026
027 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK;
028
029 /**
030 * Used to let the broker know that the message has been delivered to the
031 * client. Message will still be retained until an standard ack is received.
032 * This is used get the broker to send more messages past prefetch limits
033 * when an standard ack has not been sent.
034 */
035 public static final byte DELIVERED_ACK_TYPE = 0;
036
037 /**
038 * The standard ack case where a client wants the message to be discarded.
039 */
040 public static final byte STANDARD_ACK_TYPE = 2;
041
042 /**
043 * In case the client want's to explicitly let the broker know that a
044 * message was not processed and the message was considered a poison
045 * message.
046 */
047 public static final byte POSION_ACK_TYPE = 1;
048
049 /**
050 * In case the client want's to explicitly let the broker know that a
051 * message was not processed and it was re-delivered to the consumer
052 * but it was not yet considered to be a poison message. The messageCount
053 * field will hold the number of times the message was re-delivered.
054 */
055 public static final byte REDELIVERED_ACK_TYPE = 3;
056
057 /**
058 * The ack case where a client wants only an individual message to be discarded.
059 */
060 public static final byte INDIVIDUAL_ACK_TYPE = 4;
061
062 /**
063 * The ack case where a durable topic subscription does not match a selector.
064 */
065 public static final byte UNMATCHED_ACK_TYPE = 5;
066
067 protected byte ackType;
068 protected ConsumerId consumerId;
069 protected MessageId firstMessageId;
070 protected MessageId lastMessageId;
071 protected ActiveMQDestination destination;
072 protected TransactionId transactionId;
073 protected int messageCount;
074 protected Throwable poisonCause;
075
076 protected transient String consumerKey;
077
078 public MessageAck() {
079 }
080
081 public MessageAck(MessageDispatch md, byte ackType, int messageCount) {
082 this.ackType = ackType;
083 this.consumerId = md.getConsumerId();
084 this.destination = md.getDestination();
085 this.lastMessageId = md.getMessage().getMessageId();
086 this.messageCount = messageCount;
087 }
088
089 public MessageAck(Message message, byte ackType, int messageCount) {
090 this.ackType = ackType;
091 this.destination = message.getDestination();
092 this.lastMessageId = message.getMessageId();
093 this.messageCount = messageCount;
094 }
095
096 public void copy(MessageAck copy) {
097 super.copy(copy);
098 copy.firstMessageId = firstMessageId;
099 copy.lastMessageId = lastMessageId;
100 copy.destination = destination;
101 copy.transactionId = transactionId;
102 copy.ackType = ackType;
103 copy.consumerId = consumerId;
104 }
105
106 public byte getDataStructureType() {
107 return DATA_STRUCTURE_TYPE;
108 }
109
110 public boolean isMessageAck() {
111 return true;
112 }
113
114 public boolean isPoisonAck() {
115 return ackType == POSION_ACK_TYPE;
116 }
117
118 public boolean isStandardAck() {
119 return ackType == STANDARD_ACK_TYPE;
120 }
121
122 public boolean isDeliveredAck() {
123 return ackType == DELIVERED_ACK_TYPE;
124 }
125
126 public boolean isRedeliveredAck() {
127 return ackType == REDELIVERED_ACK_TYPE;
128 }
129
130 public boolean isIndividualAck() {
131 return ackType == INDIVIDUAL_ACK_TYPE;
132 }
133
134 public boolean isUnmatchedAck() {
135 return ackType == UNMATCHED_ACK_TYPE;
136 }
137
138 /**
139 * @openwire:property version=1 cache=true
140 */
141 public ActiveMQDestination getDestination() {
142 return destination;
143 }
144
145 public void setDestination(ActiveMQDestination destination) {
146 this.destination = destination;
147 }
148
149 /**
150 * @openwire:property version=1 cache=true
151 */
152 public TransactionId getTransactionId() {
153 return transactionId;
154 }
155
156 public void setTransactionId(TransactionId transactionId) {
157 this.transactionId = transactionId;
158 }
159
160 public boolean isInTransaction() {
161 return transactionId != null;
162 }
163
164 /**
165 * @openwire:property version=1 cache=true
166 */
167 public ConsumerId getConsumerId() {
168 return consumerId;
169 }
170
171 public void setConsumerId(ConsumerId consumerId) {
172 this.consumerId = consumerId;
173 }
174
175 /**
176 * @openwire:property version=1
177 */
178 public byte getAckType() {
179 return ackType;
180 }
181
182 public void setAckType(byte ackType) {
183 this.ackType = ackType;
184 }
185
186 /**
187 * @openwire:property version=1
188 */
189 public MessageId getFirstMessageId() {
190 return firstMessageId;
191 }
192
193 public void setFirstMessageId(MessageId firstMessageId) {
194 this.firstMessageId = firstMessageId;
195 }
196
197 /**
198 * @openwire:property version=1
199 */
200 public MessageId getLastMessageId() {
201 return lastMessageId;
202 }
203
204 public void setLastMessageId(MessageId lastMessageId) {
205 this.lastMessageId = lastMessageId;
206 }
207
208 /**
209 * The number of messages being acknowledged in the range.
210 *
211 * @openwire:property version=1
212 */
213 public int getMessageCount() {
214 return messageCount;
215 }
216
217 public void setMessageCount(int messageCount) {
218 this.messageCount = messageCount;
219 }
220
221 /**
222 * The cause of a poison ack, if a message listener
223 * throws an exception it will be recorded here
224 *
225 * @openwire:property version=7
226 */
227 public Throwable getPoisonCause() {
228 return poisonCause;
229 }
230
231 public void setPoisonCause(Throwable poisonCause) {
232 this.poisonCause = poisonCause;
233 }
234
235 public Response visit(CommandVisitor visitor) throws Exception {
236 return visitor.processMessageAck(this);
237 }
238
239 /**
240 * A helper method to allow a single message ID to be acknowledged
241 */
242 public void setMessageID(MessageId messageID) {
243 setFirstMessageId(messageID);
244 setLastMessageId(messageID);
245 setMessageCount(1);
246 }
247
248 }