001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import java.util.ArrayList;
020 import java.util.List;
021
022 import org.apache.activemq.filter.BooleanExpression;
023 import org.apache.activemq.state.CommandVisitor;
024
025 /**
026 * @openwire:marshaller code="5"
027 *
028 */
029 public class ConsumerInfo extends BaseCommand {
030
031 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
032
033 public static final byte HIGH_PRIORITY = 10;
034 public static final byte NORMAL_PRIORITY = 0;
035 public static final byte NETWORK_CONSUMER_PRIORITY = -5;
036 public static final byte LOW_PRIORITY = -10;
037
038 protected ConsumerId consumerId;
039 protected ActiveMQDestination destination;
040 protected int prefetchSize;
041 protected int maximumPendingMessageLimit;
042 protected boolean browser;
043 protected boolean dispatchAsync;
044 protected String selector;
045 protected String subscriptionName;
046 protected boolean noLocal;
047 protected boolean exclusive;
048 protected boolean retroactive;
049 protected byte priority;
050 protected BrokerId[] brokerPath;
051 protected boolean optimizedAcknowledge;
052 // used by the broker
053 protected transient int currentPrefetchSize;
054 // if true, the consumer will not send range
055 protected boolean noRangeAcks;
056 // acks.
057
058 protected BooleanExpression additionalPredicate;
059 protected transient boolean networkSubscription; // this subscription
060 protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
061
062 // not marshalled, populated from RemoveInfo, the last message delivered, used
063 // to suppress redelivery on prefetched messages after close
064 private transient long lastDeliveredSequenceId;
065
066 // originated from a
067 // network connection
068
069 public ConsumerInfo() {
070 }
071
072 public ConsumerInfo(ConsumerId consumerId) {
073 this.consumerId = consumerId;
074 }
075
076 public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
077 this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
078 }
079
080 public ConsumerInfo copy() {
081 ConsumerInfo info = new ConsumerInfo();
082 copy(info);
083 return info;
084 }
085
086 public void copy(ConsumerInfo info) {
087 super.copy(info);
088 info.consumerId = consumerId;
089 info.destination = destination;
090 info.prefetchSize = prefetchSize;
091 info.maximumPendingMessageLimit = maximumPendingMessageLimit;
092 info.browser = browser;
093 info.dispatchAsync = dispatchAsync;
094 info.selector = selector;
095 info.subscriptionName = subscriptionName;
096 info.noLocal = noLocal;
097 info.exclusive = exclusive;
098 info.retroactive = retroactive;
099 info.priority = priority;
100 info.brokerPath = brokerPath;
101 info.networkSubscription = networkSubscription;
102 if (networkConsumerIds != null) {
103 if (info.networkConsumerIds==null){
104 info.networkConsumerIds=new ArrayList<ConsumerId>();
105 }
106 info.networkConsumerIds.addAll(networkConsumerIds);
107 }
108 }
109
110 public boolean isDurable() {
111 return subscriptionName != null;
112 }
113
114 public byte getDataStructureType() {
115 return DATA_STRUCTURE_TYPE;
116 }
117
118 /**
119 * Is used to uniquely identify the consumer to the broker.
120 *
121 * @openwire:property version=1 cache=true
122 */
123 public ConsumerId getConsumerId() {
124 return consumerId;
125 }
126
127 public void setConsumerId(ConsumerId consumerId) {
128 this.consumerId = consumerId;
129 }
130
131 /**
132 * Is this consumer a queue browser?
133 *
134 * @openwire:property version=1
135 */
136 public boolean isBrowser() {
137 return browser;
138 }
139
140 public void setBrowser(boolean browser) {
141 this.browser = browser;
142 }
143
144 /**
145 * The destination that the consumer is interested in receiving messages
146 * from. This destination could be a composite destination.
147 *
148 * @openwire:property version=1 cache=true
149 */
150 public ActiveMQDestination getDestination() {
151 return destination;
152 }
153
154 public void setDestination(ActiveMQDestination destination) {
155 this.destination = destination;
156 }
157
158 /**
159 * How many messages a broker will send to the client without receiving an
160 * ack before he stops dispatching messages to the client.
161 *
162 * @openwire:property version=1
163 */
164 public int getPrefetchSize() {
165 return prefetchSize;
166 }
167
168 public void setPrefetchSize(int prefetchSize) {
169 this.prefetchSize = prefetchSize;
170 this.currentPrefetchSize = prefetchSize;
171 }
172
173 /**
174 * How many messages a broker will keep around, above the prefetch limit,
175 * for non-durable topics before starting to discard older messages.
176 *
177 * @openwire:property version=1
178 */
179 public int getMaximumPendingMessageLimit() {
180 return maximumPendingMessageLimit;
181 }
182
183 public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
184 this.maximumPendingMessageLimit = maximumPendingMessageLimit;
185 }
186
187 /**
188 * Should the broker dispatch a message to the consumer async? If he does it
189 * async, then he uses a more SEDA style of processing while if it is not
190 * done async, then he broker use a STP style of processing. STP is more
191 * appropriate in high bandwidth situations or when being used by and in vm
192 * transport.
193 *
194 * @openwire:property version=1
195 */
196 public boolean isDispatchAsync() {
197 return dispatchAsync;
198 }
199
200 public void setDispatchAsync(boolean dispatchAsync) {
201 this.dispatchAsync = dispatchAsync;
202 }
203
204 /**
205 * The JMS selector used to filter out messages that this consumer is
206 * interested in.
207 *
208 * @openwire:property version=1
209 */
210 public String getSelector() {
211 return selector;
212 }
213
214 public void setSelector(String selector) {
215 this.selector = selector;
216 }
217
218 /**
219 * Used to identify the name of a durable subscription.
220 *
221 * @openwire:property version=1
222 */
223 public String getSubscriptionName() {
224 return subscriptionName;
225 }
226
227 public void setSubscriptionName(String durableSubscriptionId) {
228 this.subscriptionName = durableSubscriptionId;
229 }
230
231 /**
232 * @deprecated
233 * @return
234 * @see getSubscriptionName
235 */
236 public String getSubcriptionName() {
237 return subscriptionName;
238 }
239
240 /**
241 * @deprecated
242 * @see setSubscriptionName
243 * @param durableSubscriptionId
244 */
245 public void setSubcriptionName(String durableSubscriptionId) {
246 this.subscriptionName = durableSubscriptionId;
247 }
248
249 /**
250 * Set noLocal to true to avoid receiving messages that were published
251 * locally on the same connection.
252 *
253 * @openwire:property version=1
254 */
255 public boolean isNoLocal() {
256 return noLocal;
257 }
258
259 public void setNoLocal(boolean noLocal) {
260 this.noLocal = noLocal;
261 }
262
263 /**
264 * An exclusive consumer locks out other consumers from being able to
265 * receive messages from the destination. If there are multiple exclusive
266 * consumers for a destination, the first one created will be the exclusive
267 * consumer of the destination.
268 *
269 * @openwire:property version=1
270 */
271 public boolean isExclusive() {
272 return exclusive;
273 }
274
275 public void setExclusive(boolean exclusive) {
276 this.exclusive = exclusive;
277 }
278
279 /**
280 * A retroactive consumer only has meaning for Topics. It allows a consumer
281 * to retroactively see messages sent prior to the consumer being created.
282 * If the consumer is not durable, it will be delivered the last message
283 * published to the topic. If the consumer is durable then it will receive
284 * all persistent messages that are still stored in persistent storage for
285 * that topic.
286 *
287 * @openwire:property version=1
288 */
289 public boolean isRetroactive() {
290 return retroactive;
291 }
292
293 public void setRetroactive(boolean retroactive) {
294 this.retroactive = retroactive;
295 }
296
297 public RemoveInfo createRemoveCommand() {
298 RemoveInfo command = new RemoveInfo(getConsumerId());
299 command.setResponseRequired(isResponseRequired());
300 return command;
301 }
302
303 /**
304 * The broker will avoid dispatching to a lower priority consumer if there
305 * are other higher priority consumers available to dispatch to. This allows
306 * letting the broker to have an affinity to higher priority consumers.
307 * Default priority is 0.
308 *
309 * @openwire:property version=1
310 */
311 public byte getPriority() {
312 return priority;
313 }
314
315 public void setPriority(byte priority) {
316 this.priority = priority;
317 }
318
319 /**
320 * The route of brokers the command has moved through.
321 *
322 * @openwire:property version=1 cache=true
323 */
324 public BrokerId[] getBrokerPath() {
325 return brokerPath;
326 }
327
328 public void setBrokerPath(BrokerId[] brokerPath) {
329 this.brokerPath = brokerPath;
330 }
331
332 /**
333 * A transient additional predicate that can be used it inject additional
334 * predicates into the selector on the fly. Handy if if say a Security
335 * Broker interceptor wants to filter out messages based on security level
336 * of the consumer.
337 *
338 * @openwire:property version=1
339 */
340 public BooleanExpression getAdditionalPredicate() {
341 return additionalPredicate;
342 }
343
344 public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
345 this.additionalPredicate = additionalPredicate;
346 }
347
348 public Response visit(CommandVisitor visitor) throws Exception {
349 return visitor.processAddConsumer(this);
350 }
351
352 /**
353 * @openwire:property version=1
354 * @return Returns the networkSubscription.
355 */
356 public boolean isNetworkSubscription() {
357 return networkSubscription;
358 }
359
360 /**
361 * @param networkSubscription The networkSubscription to set.
362 */
363 public void setNetworkSubscription(boolean networkSubscription) {
364 this.networkSubscription = networkSubscription;
365 }
366
367 /**
368 * @openwire:property version=1
369 * @return Returns the optimizedAcknowledge.
370 */
371 public boolean isOptimizedAcknowledge() {
372 return optimizedAcknowledge;
373 }
374
375 /**
376 * @param optimizedAcknowledge The optimizedAcknowledge to set.
377 */
378 public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
379 this.optimizedAcknowledge = optimizedAcknowledge;
380 }
381
382 /**
383 * @return Returns the currentPrefetchSize.
384 */
385 public int getCurrentPrefetchSize() {
386 return currentPrefetchSize;
387 }
388
389 /**
390 * @param currentPrefetchSize The currentPrefetchSize to set.
391 */
392 public void setCurrentPrefetchSize(int currentPrefetchSize) {
393 this.currentPrefetchSize = currentPrefetchSize;
394 }
395
396 /**
397 * The broker may be able to optimize it's processing or provides better QOS
398 * if it knows the consumer will not be sending ranged acks.
399 *
400 * @return true if the consumer will not send range acks.
401 * @openwire:property version=1
402 */
403 public boolean isNoRangeAcks() {
404 return noRangeAcks;
405 }
406
407 public void setNoRangeAcks(boolean noRangeAcks) {
408 this.noRangeAcks = noRangeAcks;
409 }
410
411 public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
412 if (networkConsumerIds == null) {
413 networkConsumerIds = new ArrayList<ConsumerId>();
414 }
415 networkConsumerIds.add(networkConsumerId);
416 }
417
418 public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
419 if (networkConsumerIds != null) {
420 networkConsumerIds.remove(networkConsumerId);
421 if (networkConsumerIds.isEmpty()) {
422 networkConsumerIds=null;
423 }
424 }
425 }
426
427 public synchronized boolean isNetworkConsumersEmpty() {
428 return networkConsumerIds == null || networkConsumerIds.isEmpty();
429 }
430
431 public synchronized List<ConsumerId> getNetworkConsumerIds(){
432 List<ConsumerId> result = new ArrayList<ConsumerId>();
433 if (networkConsumerIds != null) {
434 result.addAll(networkConsumerIds);
435 }
436 return result;
437 }
438
439 /**
440 * Tracks the original subscription id that causes a subscription to
441 * percolate through a network when networkTTL > 1. Tracking the original
442 * subscription allows duplicate suppression.
443 *
444 * @return array of the current subscription path
445 * @openwire:property version=4
446 */
447 public ConsumerId[] getNetworkConsumerPath() {
448 ConsumerId[] result = null;
449 if (networkConsumerIds != null) {
450 result = networkConsumerIds.toArray(new ConsumerId[0]);
451 }
452 return result;
453 }
454
455 public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
456 if (consumerPath != null) {
457 for (int i=0; i<consumerPath.length; i++) {
458 addNetworkConsumerId(consumerPath[i]);
459 }
460 }
461 }
462
463 public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
464 this.lastDeliveredSequenceId = lastDeliveredSequenceId;
465 }
466
467 public long getLastDeliveredSequenceId() {
468 return lastDeliveredSequenceId;
469 }
470
471 }