001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.LinkedList;
021 import java.util.concurrent.atomic.AtomicLong;
022 import javax.jms.JMSException;
023 import org.apache.activemq.ActiveMQMessageAudit;
024 import org.apache.activemq.broker.Broker;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
027 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
028 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
029 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
030 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
031 import org.apache.activemq.command.ConsumerControl;
032 import org.apache.activemq.command.ConsumerInfo;
033 import org.apache.activemq.command.Message;
034 import org.apache.activemq.command.MessageAck;
035 import org.apache.activemq.command.MessageDispatch;
036 import org.apache.activemq.command.MessageDispatchNotification;
037 import org.apache.activemq.command.MessagePull;
038 import org.apache.activemq.command.Response;
039 import org.apache.activemq.transaction.Synchronization;
040 import org.apache.activemq.usage.SystemUsage;
041 import org.slf4j.Logger;
042 import org.slf4j.LoggerFactory;
043
044 public class TopicSubscription extends AbstractSubscription {
045
046 private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
047 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
048
049 protected PendingMessageCursor matched;
050 protected final SystemUsage usageManager;
051 protected AtomicLong dispatchedCounter = new AtomicLong();
052
053 boolean singleDestination = true;
054 Destination destination;
055
056 private int maximumPendingMessages = -1;
057 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
058 private int discarded;
059 private final Object matchedListMutex = new Object();
060 private final AtomicLong enqueueCounter = new AtomicLong(0);
061 private final AtomicLong dequeueCounter = new AtomicLong(0);
062 private int memoryUsageHighWaterMark = 95;
063 // allow duplicate suppression in a ring network of brokers
064 protected int maxProducersToAudit = 1024;
065 protected int maxAuditDepth = 1000;
066 protected boolean enableAudit = false;
067 protected ActiveMQMessageAudit audit;
068 protected boolean active = false;
069
070 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
071 super(broker, context, info);
072 this.usageManager = usageManager;
073 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
074 if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
075 this.matched = new VMPendingMessageCursor(false);
076 } else {
077 this.matched = new FilePendingMessageCursor(broker,matchedName,false);
078 }
079 }
080
081 public void init() throws Exception {
082 this.matched.setSystemUsage(usageManager);
083 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
084 this.matched.start();
085 if (enableAudit) {
086 audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
087 }
088 this.active=true;
089 }
090
091 public void add(MessageReference node) throws Exception {
092 if (isDuplicate(node)) {
093 return;
094 }
095 enqueueCounter.incrementAndGet();
096 if (!isFull() && matched.isEmpty() && !isSlave()) {
097 // if maximumPendingMessages is set we will only discard messages which
098 // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
099 dispatch(node);
100 setSlowConsumer(false);
101 } else {
102 if ( info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize() ) {
103 //we are slow
104 if(!isSlowConsumer()) {
105 LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow");
106 setSlowConsumer(true);
107 for (Destination dest: destinations) {
108 dest.slowConsumer(getContext(), this);
109 }
110 }
111 }
112 if (maximumPendingMessages != 0) {
113 boolean warnedAboutWait = false;
114 while (active) {
115 synchronized (matchedListMutex) {
116 while (matched.isFull()) {
117 if (getContext().getStopping().get()) {
118 LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
119 + node.getMessageId());
120 enqueueCounter.decrementAndGet();
121 return;
122 }
123 if (!warnedAboutWait) {
124 LOG.info(toString() + ": Pending message cursor [" + matched
125 + "] is full, temp usage ("
126 + +matched.getSystemUsage().getTempUsage().getPercentUsage()
127 + "%) or memory usage ("
128 + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
129 + "%) limit reached, blocking message add() pending the release of resources.");
130 warnedAboutWait = true;
131 }
132 matchedListMutex.wait(20);
133 }
134 //Temporary storage could be full - so just try to add the message
135 //see https://issues.apache.org/activemq/browse/AMQ-2475
136 if (matched.tryAddMessageLast(node, 10)) {
137 break;
138 }
139 }
140 }
141 synchronized (matchedListMutex) {
142
143 // NOTE - be careful about the slaveBroker!
144 if (maximumPendingMessages > 0) {
145 // calculate the high water mark from which point we
146 // will eagerly evict expired messages
147 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
148 if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
149 max = maximumPendingMessages;
150 }
151 if (!matched.isEmpty() && matched.size() > max) {
152 removeExpiredMessages();
153 }
154 // lets discard old messages as we are a slow consumer
155 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
156 int pageInSize = matched.size() - maximumPendingMessages;
157 // only page in a 1000 at a time - else we could
158 // blow da memory
159 pageInSize = Math.max(1000, pageInSize);
160 LinkedList<MessageReference> list = null;
161 MessageReference[] oldMessages=null;
162 synchronized(matched){
163 list = matched.pageInList(pageInSize);
164 oldMessages = messageEvictionStrategy.evictMessages(list);
165 for (MessageReference ref : list) {
166 ref.decrementReferenceCount();
167 }
168 }
169 int messagesToEvict = 0;
170 if (oldMessages != null){
171 messagesToEvict = oldMessages.length;
172 for (int i = 0; i < messagesToEvict; i++) {
173 MessageReference oldMessage = oldMessages[i];
174 discard(oldMessage);
175 }
176 }
177 // lets avoid an infinite loop if we are given a bad
178 // eviction strategy
179 // for a bad strategy lets just not evict
180 if (messagesToEvict == 0) {
181 LOG.warn("No messages to evict returned for " + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates");
182 break;
183 }
184 }
185 }
186 }
187 dispatchMatched();
188 }
189 }
190 }
191
192 private boolean isDuplicate(MessageReference node) {
193 boolean duplicate = false;
194 if (enableAudit && audit != null) {
195 duplicate = audit.isDuplicate(node);
196 if (LOG.isDebugEnabled()) {
197 if (duplicate) {
198 LOG.debug(this + ", ignoring duplicate add: " + node.getMessageId());
199 }
200 }
201 }
202 return duplicate;
203 }
204
205 /**
206 * Discard any expired messages from the matched list. Called from a
207 * synchronized block.
208 *
209 * @throws IOException
210 */
211 protected void removeExpiredMessages() throws IOException {
212 try {
213 matched.reset();
214 while (matched.hasNext()) {
215 MessageReference node = matched.next();
216 node.decrementReferenceCount();
217 if (broker.isExpired(node)) {
218 matched.remove();
219 dispatchedCounter.incrementAndGet();
220 node.decrementReferenceCount();
221 node.getRegionDestination().getDestinationStatistics().getExpired().increment();
222 broker.messageExpired(getContext(), node, this);
223 break;
224 }
225 }
226 } finally {
227 matched.release();
228 }
229 }
230
231 public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
232 synchronized (matchedListMutex) {
233 try {
234 matched.reset();
235 while (matched.hasNext()) {
236 MessageReference node = matched.next();
237 node.decrementReferenceCount();
238 if (node.getMessageId().equals(mdn.getMessageId())) {
239 matched.remove();
240 dispatchedCounter.incrementAndGet();
241 node.decrementReferenceCount();
242 break;
243 }
244 }
245 } finally {
246 matched.release();
247 }
248 }
249 }
250
251 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
252 // Handle the standard acknowledgment case.
253 if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
254 if (context.isInTransaction()) {
255 context.getTransaction().addSynchronization(new Synchronization() {
256
257 @Override
258 public void afterCommit() throws Exception {
259 synchronized (TopicSubscription.this) {
260 if (singleDestination && destination != null) {
261 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
262 }
263 }
264 dequeueCounter.addAndGet(ack.getMessageCount());
265 dispatchMatched();
266 }
267 });
268 } else {
269 if (singleDestination && destination != null) {
270 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
271 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
272 }
273 dequeueCounter.addAndGet(ack.getMessageCount());
274 }
275 dispatchMatched();
276 return;
277 } else if (ack.isDeliveredAck()) {
278 // Message was delivered but not acknowledged: update pre-fetch
279 // counters.
280 // also. get these for a consumer expired message.
281 if (destination != null && !ack.isInTransaction()) {
282 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
283 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
284 }
285 dequeueCounter.addAndGet(ack.getMessageCount());
286 dispatchMatched();
287 return;
288 } else if (ack.isRedeliveredAck()) {
289 // nothing to do atm
290 return;
291 }
292 throw new JMSException("Invalid acknowledgment: " + ack);
293 }
294
295 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
296 // not supported for topics
297 return null;
298 }
299
300 public int getPendingQueueSize() {
301 return matched();
302 }
303
304 public int getDispatchedQueueSize() {
305 return (int)(dispatchedCounter.get() - dequeueCounter.get());
306 }
307
308 public int getMaximumPendingMessages() {
309 return maximumPendingMessages;
310 }
311
312 public long getDispatchedCounter() {
313 return dispatchedCounter.get();
314 }
315
316 public long getEnqueueCounter() {
317 return enqueueCounter.get();
318 }
319
320 public long getDequeueCounter() {
321 return dequeueCounter.get();
322 }
323
324 /**
325 * @return the number of messages discarded due to being a slow consumer
326 */
327 public int discarded() {
328 synchronized (matchedListMutex) {
329 return discarded;
330 }
331 }
332
333 /**
334 * @return the number of matched messages (messages targeted for the
335 * subscription but not yet able to be dispatched due to the
336 * prefetch buffer being full).
337 */
338 public int matched() {
339 synchronized (matchedListMutex) {
340 return matched.size();
341 }
342 }
343
344 /**
345 * Sets the maximum number of pending messages that can be matched against
346 * this consumer before old messages are discarded.
347 */
348 public void setMaximumPendingMessages(int maximumPendingMessages) {
349 this.maximumPendingMessages = maximumPendingMessages;
350 }
351
352 public MessageEvictionStrategy getMessageEvictionStrategy() {
353 return messageEvictionStrategy;
354 }
355
356 /**
357 * Sets the eviction strategy used to decide which message to evict when the
358 * slow consumer needs to discard messages
359 */
360 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
361 this.messageEvictionStrategy = messageEvictionStrategy;
362 }
363
364 public int getMaxProducersToAudit() {
365 return maxProducersToAudit;
366 }
367
368 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
369 this.maxProducersToAudit = maxProducersToAudit;
370 if (audit != null) {
371 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
372 }
373 }
374
375 public int getMaxAuditDepth() {
376 return maxAuditDepth;
377 }
378
379 public synchronized void setMaxAuditDepth(int maxAuditDepth) {
380 this.maxAuditDepth = maxAuditDepth;
381 if (audit != null) {
382 audit.setAuditDepth(maxAuditDepth);
383 }
384 }
385
386 public boolean isEnableAudit() {
387 return enableAudit;
388 }
389
390 public synchronized void setEnableAudit(boolean enableAudit) {
391 this.enableAudit = enableAudit;
392 if (enableAudit && audit==null) {
393 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
394 }
395 }
396
397 // Implementation methods
398 // -------------------------------------------------------------------------
399 public boolean isFull() {
400 return getDispatchedQueueSize() >= info.getPrefetchSize();
401 }
402
403 public int getInFlightSize() {
404 return getDispatchedQueueSize();
405 }
406
407
408 /**
409 * @return true when 60% or more room is left for dispatching messages
410 */
411 public boolean isLowWaterMark() {
412 return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
413 }
414
415 /**
416 * @return true when 10% or less room is left for dispatching messages
417 */
418 public boolean isHighWaterMark() {
419 return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
420 }
421
422 /**
423 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
424 */
425 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
426 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
427 }
428
429 /**
430 * @return the memoryUsageHighWaterMark
431 */
432 public int getMemoryUsageHighWaterMark() {
433 return this.memoryUsageHighWaterMark;
434 }
435
436 /**
437 * @return the usageManager
438 */
439 public SystemUsage getUsageManager() {
440 return this.usageManager;
441 }
442
443 /**
444 * @return the matched
445 */
446 public PendingMessageCursor getMatched() {
447 return this.matched;
448 }
449
450 /**
451 * @param matched the matched to set
452 */
453 public void setMatched(PendingMessageCursor matched) {
454 this.matched = matched;
455 }
456
457 /**
458 * inform the MessageConsumer on the client to change it's prefetch
459 *
460 * @param newPrefetch
461 */
462 public void updateConsumerPrefetch(int newPrefetch) {
463 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
464 ConsumerControl cc = new ConsumerControl();
465 cc.setConsumerId(info.getConsumerId());
466 cc.setPrefetch(newPrefetch);
467 context.getConnection().dispatchAsync(cc);
468 }
469 }
470
471 private void dispatchMatched() throws IOException {
472 synchronized (matchedListMutex) {
473 if (!matched.isEmpty() && !isFull()) {
474 try {
475 matched.reset();
476
477 while (matched.hasNext() && !isFull()) {
478 MessageReference message = matched.next();
479 message.decrementReferenceCount();
480 matched.remove();
481 // Message may have been sitting in the matched list a
482 // while
483 // waiting for the consumer to ak the message.
484 if (message.isExpired()) {
485 discard(message);
486 continue; // just drop it.
487 }
488 dispatch(message);
489 }
490 } finally {
491 matched.release();
492 }
493 }
494 }
495 }
496
497 private void dispatch(final MessageReference node) throws IOException {
498 Message message = (Message)node;
499 node.incrementReferenceCount();
500 // Make sure we can dispatch a message.
501 MessageDispatch md = new MessageDispatch();
502 md.setMessage(message);
503 md.setConsumerId(info.getConsumerId());
504 md.setDestination(node.getRegionDestination().getActiveMQDestination());
505 dispatchedCounter.incrementAndGet();
506 // Keep track if this subscription is receiving messages from a single
507 // destination.
508 if (singleDestination) {
509 if (destination == null) {
510 destination = node.getRegionDestination();
511 } else {
512 if (destination != node.getRegionDestination()) {
513 singleDestination = false;
514 }
515 }
516 }
517 if (info.isDispatchAsync()) {
518 md.setTransmitCallback(new Runnable() {
519
520 public void run() {
521 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
522 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
523 node.decrementReferenceCount();
524 }
525 });
526 context.getConnection().dispatchAsync(md);
527 } else {
528 context.getConnection().dispatchSync(md);
529 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
530 node.getRegionDestination().getDestinationStatistics().getInflight().increment();
531 node.decrementReferenceCount();
532 }
533 }
534
535 private void discard(MessageReference message) {
536 message.decrementReferenceCount();
537 matched.remove(message);
538 discarded++;
539 if(destination != null) {
540 destination.getDestinationStatistics().getDequeues().increment();
541 }
542 if (LOG.isDebugEnabled()) {
543 LOG.debug(this + ", discarding message " + message);
544 }
545 Destination dest = message.getRegionDestination();
546 if (dest != null) {
547 dest.messageDiscarded(getContext(), this, message);
548 }
549 broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
550 }
551
552 @Override
553 public String toString() {
554 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
555 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
556 }
557
558 public void destroy() {
559 this.active=false;
560 synchronized (matchedListMutex) {
561 try {
562 matched.destroy();
563 } catch (Exception e) {
564 LOG.warn("Failed to destroy cursor", e);
565 }
566 }
567 setSlowConsumer(false);
568 }
569
570 @Override
571 public int getPrefetchSize() {
572 return info.getPrefetchSize();
573 }
574
575 }