001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.LinkedList;
022 import java.util.List;
023 import java.util.concurrent.CancellationException;
024 import java.util.concurrent.ConcurrentHashMap;
025 import java.util.concurrent.CopyOnWriteArrayList;
026 import java.util.concurrent.Future;
027 import java.util.concurrent.locks.ReentrantReadWriteLock;
028
029 import org.apache.activemq.broker.BrokerService;
030 import org.apache.activemq.broker.ConnectionContext;
031 import org.apache.activemq.broker.ProducerBrokerExchange;
032 import org.apache.activemq.broker.region.policy.DispatchPolicy;
033 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
034 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
035 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
036 import org.apache.activemq.broker.util.InsertionCountList;
037 import org.apache.activemq.command.ActiveMQDestination;
038 import org.apache.activemq.command.ExceptionResponse;
039 import org.apache.activemq.command.Message;
040 import org.apache.activemq.command.MessageAck;
041 import org.apache.activemq.command.MessageId;
042 import org.apache.activemq.command.ProducerAck;
043 import org.apache.activemq.command.ProducerInfo;
044 import org.apache.activemq.command.Response;
045 import org.apache.activemq.command.SubscriptionInfo;
046 import org.apache.activemq.filter.MessageEvaluationContext;
047 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
048 import org.apache.activemq.store.MessageRecoveryListener;
049 import org.apache.activemq.store.TopicMessageStore;
050 import org.apache.activemq.thread.Task;
051 import org.apache.activemq.thread.TaskRunner;
052 import org.apache.activemq.thread.TaskRunnerFactory;
053 import org.apache.activemq.transaction.Synchronization;
054 import org.apache.activemq.util.SubscriptionKey;
055 import org.slf4j.Logger;
056 import org.slf4j.LoggerFactory;
057
058 /**
059 * The Topic is a destination that sends a copy of a message to every active
060 * Subscription registered.
061 */
062 public class Topic extends BaseDestination implements Task {
063 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
064 private final TopicMessageStore topicStore;
065 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
066 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
067 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
068 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
069 private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
070 private final TaskRunner taskRunner;
071 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
072 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
073 public void run() {
074 try {
075 Topic.this.taskRunner.wakeup();
076 } catch (InterruptedException e) {
077 }
078 };
079 };
080
081 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
082 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
083 super(brokerService, store, destination, parentStats);
084 this.topicStore = store;
085 // set default subscription recovery policy
086 subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
087 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
088 }
089
090 @Override
091 public void initialize() throws Exception {
092 super.initialize();
093 if (store != null) {
094 // AMQ-2586: Better to leave this stat at zero than to give the user
095 // misleading metrics.
096 // int messageCount = store.getMessageCount();
097 // destinationStatistics.getMessages().setCount(messageCount);
098 }
099 }
100
101 public List<Subscription> getConsumers() {
102 synchronized (consumers) {
103 return new ArrayList<Subscription>(consumers);
104 }
105 }
106
107 public boolean lock(MessageReference node, LockOwner sub) {
108 return true;
109 }
110
111 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
112
113 super.addSubscription(context, sub);
114
115 if (!sub.getConsumerInfo().isDurable()) {
116
117 // Do a retroactive recovery if needed.
118 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
119
120 // synchronize with dispatch method so that no new messages are sent
121 // while we are recovering a subscription to avoid out of order messages.
122 dispatchLock.writeLock().lock();
123 try {
124 synchronized (consumers) {
125 sub.add(context, this);
126 consumers.add(sub);
127 }
128 subscriptionRecoveryPolicy.recover(context, this, sub);
129 } finally {
130 dispatchLock.writeLock().unlock();
131 }
132
133 } else {
134 synchronized (consumers) {
135 sub.add(context, this);
136 consumers.add(sub);
137 }
138 }
139 } else {
140 DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
141 sub.add(context, this);
142 if(dsub.isActive()) {
143 synchronized (consumers) {
144 boolean hasSubscription = false;
145
146 if(consumers.size()==0) {
147 hasSubscription = false;
148 } else {
149 for(Subscription currentSub : consumers) {
150 if(currentSub.getConsumerInfo().isDurable()) {
151 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
152 if(dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
153 hasSubscription = true;
154 break;
155 }
156 }
157 }
158 }
159
160 if(!hasSubscription)
161 consumers.add(sub);
162 }
163 }
164 durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
165 }
166 }
167
168 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
169 throws Exception {
170 if (!sub.getConsumerInfo().isDurable()) {
171 super.removeSubscription(context, sub, lastDeliveredSequenceId);
172 synchronized (consumers) {
173 consumers.remove(sub);
174 }
175 }
176 sub.remove(context, this);
177 }
178
179 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
180 if (topicStore != null) {
181 topicStore.deleteSubscription(key.clientId, key.subscriptionName);
182 DurableTopicSubscription removed = durableSubcribers.remove(key);
183 if (removed != null) {
184 destinationStatistics.getConsumers().decrement();
185 // deactivate and remove
186 removed.deactivate(false);
187 consumers.remove(removed);
188 }
189 }
190 }
191
192 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
193 // synchronize with dispatch method so that no new messages are sent
194 // while we are recovering a subscription to avoid out of order messages.
195 dispatchLock.writeLock().lock();
196 try {
197
198 if (topicStore == null) {
199 return;
200 }
201
202 // Recover the durable subscription.
203 String clientId = subscription.getSubscriptionKey().getClientId();
204 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
205 String selector = subscription.getConsumerInfo().getSelector();
206 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
207 if (info != null) {
208 // Check to see if selector changed.
209 String s1 = info.getSelector();
210 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
211 // Need to delete the subscription
212 topicStore.deleteSubscription(clientId, subscriptionName);
213 info = null;
214 } else {
215 synchronized (consumers) {
216 consumers.add(subscription);
217 }
218 }
219 }
220
221 // Do we need to create the subscription?
222 if (info == null) {
223 info = new SubscriptionInfo();
224 info.setClientId(clientId);
225 info.setSelector(selector);
226 info.setSubscriptionName(subscriptionName);
227 info.setDestination(getActiveMQDestination());
228 // This destination is an actual destination id.
229 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
230 // This destination might be a pattern
231 synchronized (consumers) {
232 consumers.add(subscription);
233 topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
234 }
235 }
236
237 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
238 msgContext.setDestination(destination);
239 if (subscription.isRecoveryRequired()) {
240 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
241 public boolean recoverMessage(Message message) throws Exception {
242 message.setRegionDestination(Topic.this);
243 try {
244 msgContext.setMessageReference(message);
245 if (subscription.matches(message, msgContext)) {
246 subscription.add(message);
247 }
248 } catch (IOException e) {
249 LOG.error("Failed to recover this message " + message);
250 }
251 return true;
252 }
253
254 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
255 throw new RuntimeException("Should not be called.");
256 }
257
258 public boolean hasSpace() {
259 return true;
260 }
261
262 public boolean isDuplicate(MessageId id) {
263 return false;
264 }
265 });
266 }
267 } finally {
268 dispatchLock.writeLock().unlock();
269 }
270 }
271
272 public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
273 synchronized (consumers) {
274 consumers.remove(sub);
275 }
276 sub.remove(context, this);
277 }
278
279 protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
280 if (subscription.getConsumerInfo().isRetroactive()) {
281 subscriptionRecoveryPolicy.recover(context, this, subscription);
282 }
283 }
284
285 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
286 final ConnectionContext context = producerExchange.getConnectionContext();
287
288 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
289 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
290 && !context.isInRecoveryMode();
291
292 // There is delay between the client sending it and it arriving at the
293 // destination.. it may have expired.
294 if (message.isExpired()) {
295 broker.messageExpired(context, message, null);
296 getDestinationStatistics().getExpired().increment();
297 if (sendProducerAck) {
298 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
299 context.getConnection().dispatchAsync(ack);
300 }
301 return;
302 }
303
304 if (memoryUsage.isFull()) {
305 isFull(context, memoryUsage);
306 fastProducer(context, producerInfo);
307
308 if (isProducerFlowControl() && context.isProducerFlowControl()) {
309
310 if (warnOnProducerFlowControl) {
311 warnOnProducerFlowControl = false;
312 LOG.info(memoryUsage + ", Usage Manager memory limit reached for "
313 + getActiveMQDestination().getQualifiedName()
314 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
315 + " See http://activemq.apache.org/producer-flow-control.html for more info");
316 }
317
318 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
319 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
320 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
321 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
322 + " See http://activemq.apache.org/producer-flow-control.html for more info");
323 }
324
325 // We can avoid blocking due to low usage if the producer is
326 // sending
327 // a sync message or
328 // if it is using a producer window
329 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
330 synchronized (messagesWaitingForSpace) {
331 messagesWaitingForSpace.add(new Runnable() {
332 public void run() {
333 try {
334
335 // While waiting for space to free up... the
336 // message may have expired.
337 if (message.isExpired()) {
338 broker.messageExpired(context, message, null);
339 getDestinationStatistics().getExpired().increment();
340 } else {
341 doMessageSend(producerExchange, message);
342 }
343
344 if (sendProducerAck) {
345 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
346 .getSize());
347 context.getConnection().dispatchAsync(ack);
348 } else {
349 Response response = new Response();
350 response.setCorrelationId(message.getCommandId());
351 context.getConnection().dispatchAsync(response);
352 }
353
354 } catch (Exception e) {
355 if (!sendProducerAck && !context.isInRecoveryMode()) {
356 ExceptionResponse response = new ExceptionResponse(e);
357 response.setCorrelationId(message.getCommandId());
358 context.getConnection().dispatchAsync(response);
359 }
360 }
361
362 }
363 });
364
365 registerCallbackForNotFullNotification();
366 context.setDontSendReponse(true);
367 return;
368 }
369
370 } else {
371 // Producer flow control cannot be used, so we have do the
372 // flow
373 // control at the broker
374 // by blocking this thread until there is space available.
375
376 if (memoryUsage.isFull()) {
377 if (context.isInTransaction()) {
378
379 int count = 0;
380 while (!memoryUsage.waitForSpace(1000)) {
381 if (context.getStopping().get()) {
382 throw new IOException("Connection closed, send aborted.");
383 }
384 if (count > 2 && context.isInTransaction()) {
385 count = 0;
386 int size = context.getTransaction().size();
387 LOG.warn("Waiting for space to send transacted message - transaction elements = "
388 + size + " need more space to commit. Message = " + message);
389 }
390 }
391 } else {
392 waitForSpace(
393 context,
394 memoryUsage,
395 "Usage Manager Memory Usage limit reached. Stopping producer ("
396 + message.getProducerId()
397 + ") to prevent flooding "
398 + getActiveMQDestination().getQualifiedName()
399 + "."
400 + " See http://activemq.apache.org/producer-flow-control.html for more info");
401 }
402 }
403
404 // The usage manager could have delayed us by the time
405 // we unblock the message could have expired..
406 if (message.isExpired()) {
407 getDestinationStatistics().getExpired().increment();
408 if (LOG.isDebugEnabled()) {
409 LOG.debug("Expired message: " + message);
410 }
411 return;
412 }
413 }
414 }
415 }
416
417 doMessageSend(producerExchange, message);
418 messageDelivered(context, message);
419 if (sendProducerAck) {
420 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
421 context.getConnection().dispatchAsync(ack);
422 }
423 }
424
425 /**
426 * do send the message - this needs to be synchronized to ensure messages
427 * are stored AND dispatched in the right order
428 *
429 * @param producerExchange
430 * @param message
431 * @throws IOException
432 * @throws Exception
433 */
434 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
435 throws IOException, Exception {
436 final ConnectionContext context = producerExchange.getConnectionContext();
437 message.setRegionDestination(this);
438 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
439 Future<Object> result = null;
440
441 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
442 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
443 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
444 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
445 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
446 + " See http://activemq.apache.org/producer-flow-control.html for more info";
447 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
448 throw new javax.jms.ResourceAllocationException(logMessage);
449 }
450
451 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
452 }
453 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
454 }
455
456 message.incrementReferenceCount();
457
458 if (context.isInTransaction()) {
459 context.getTransaction().addSynchronization(new Synchronization() {
460 @Override
461 public void afterCommit() throws Exception {
462 // It could take while before we receive the commit
463 // operation.. by that time the message could have
464 // expired..
465 if (broker.isExpired(message)) {
466 getDestinationStatistics().getExpired().increment();
467 broker.messageExpired(context, message, null);
468 message.decrementReferenceCount();
469 return;
470 }
471 try {
472 dispatch(context, message);
473 } finally {
474 message.decrementReferenceCount();
475 }
476 }
477 });
478
479 } else {
480 try {
481 dispatch(context, message);
482 } finally {
483 message.decrementReferenceCount();
484 }
485 }
486
487 if (result != null && !result.isCancelled()) {
488 try {
489 result.get();
490 } catch (CancellationException e) {
491 // ignore - the task has been cancelled if the message
492 // has already been deleted
493 }
494 }
495 }
496
497 private boolean canOptimizeOutPersistence() {
498 return durableSubcribers.size() == 0;
499 }
500
501 @Override
502 public String toString() {
503 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
504 }
505
506 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
507 final MessageReference node) throws IOException {
508 if (topicStore != null && node.isPersistent()) {
509 DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
510 SubscriptionKey key = dsub.getSubscriptionKey();
511 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
512 }
513 messageConsumed(context, node);
514 }
515
516 public void gc() {
517 }
518
519 public Message loadMessage(MessageId messageId) throws IOException {
520 return topicStore != null ? topicStore.getMessage(messageId) : null;
521 }
522
523 public void start() throws Exception {
524 this.subscriptionRecoveryPolicy.start();
525 if (memoryUsage != null) {
526 memoryUsage.start();
527 }
528
529 if (getExpireMessagesPeriod() > 0) {
530 scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
531 }
532 }
533
534 public void stop() throws Exception {
535 if (taskRunner != null) {
536 taskRunner.shutdown();
537 }
538 this.subscriptionRecoveryPolicy.stop();
539 if (memoryUsage != null) {
540 memoryUsage.stop();
541 }
542 if (this.topicStore != null) {
543 this.topicStore.stop();
544 }
545
546 scheduler.cancel(expireMessagesTask);
547 }
548
549 public Message[] browse() {
550 final List<Message> result = new ArrayList<Message>();
551 doBrowse(result, getMaxBrowsePageSize());
552 return result.toArray(new Message[result.size()]);
553 }
554
555 private void doBrowse(final List<Message> browseList, final int max) {
556 try {
557 if (topicStore != null) {
558 final List<Message> toExpire = new ArrayList<Message>();
559 topicStore.recover(new MessageRecoveryListener() {
560 public boolean recoverMessage(Message message) throws Exception {
561 if (message.isExpired()) {
562 toExpire.add(message);
563 }
564 browseList.add(message);
565 return true;
566 }
567
568 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
569 return true;
570 }
571
572 public boolean hasSpace() {
573 return browseList.size() < max;
574 }
575
576 public boolean isDuplicate(MessageId id) {
577 return false;
578 }
579 });
580 final ConnectionContext connectionContext = createConnectionContext();
581 for (Message message : toExpire) {
582 for (DurableTopicSubscription sub : durableSubcribers.values()) {
583 if (!sub.isActive()) {
584 messageExpired(connectionContext, sub, message);
585 }
586 }
587 }
588 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
589 if (msgs != null) {
590 for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
591 browseList.add(msgs[i]);
592 }
593 }
594 }
595 } catch (Throwable e) {
596 LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
597 }
598 }
599
600 public boolean iterate() {
601 synchronized (messagesWaitingForSpace) {
602 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
603 Runnable op = messagesWaitingForSpace.removeFirst();
604 op.run();
605 }
606
607 if (!messagesWaitingForSpace.isEmpty()) {
608 registerCallbackForNotFullNotification();
609 }
610 }
611 return false;
612 }
613
614 private void registerCallbackForNotFullNotification() {
615 // If the usage manager is not full, then the task will not
616 // get called..
617 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
618 // so call it directly here.
619 sendMessagesWaitingForSpaceTask.run();
620 }
621 }
622
623 // Properties
624 // -------------------------------------------------------------------------
625
626 public DispatchPolicy getDispatchPolicy() {
627 return dispatchPolicy;
628 }
629
630 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
631 this.dispatchPolicy = dispatchPolicy;
632 }
633
634 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
635 return subscriptionRecoveryPolicy;
636 }
637
638 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
639 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
640 }
641
642 // Implementation methods
643 // -------------------------------------------------------------------------
644
645 public final void wakeup() {
646 }
647
648 protected void dispatch(final ConnectionContext context, Message message) throws Exception {
649 // AMQ-2586: Better to leave this stat at zero than to give the user
650 // misleading metrics.
651 // destinationStatistics.getMessages().increment();
652 destinationStatistics.getEnqueues().increment();
653 MessageEvaluationContext msgContext = null;
654
655 dispatchLock.readLock().lock();
656 try {
657 if (!subscriptionRecoveryPolicy.add(context, message)) {
658 return;
659 }
660 synchronized (consumers) {
661 if (consumers.isEmpty()) {
662 onMessageWithNoConsumers(context, message);
663 return;
664 }
665 }
666 msgContext = context.getMessageEvaluationContext();
667 msgContext.setDestination(destination);
668 msgContext.setMessageReference(message);
669 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
670 onMessageWithNoConsumers(context, message);
671 }
672
673 } finally {
674 dispatchLock.readLock().unlock();
675 if (msgContext != null) {
676 msgContext.clear();
677 }
678 }
679 }
680
681 private final Runnable expireMessagesTask = new Runnable() {
682 public void run() {
683 List<Message> browsedMessages = new InsertionCountList<Message>();
684 doBrowse(browsedMessages, getMaxExpirePageSize());
685 }
686 };
687
688 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
689 broker.messageExpired(context, reference, subs);
690 // AMQ-2586: Better to leave this stat at zero than to give the user
691 // misleading metrics.
692 // destinationStatistics.getMessages().decrement();
693 destinationStatistics.getEnqueues().decrement();
694 destinationStatistics.getExpired().increment();
695 MessageAck ack = new MessageAck();
696 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
697 ack.setDestination(destination);
698 ack.setMessageID(reference.getMessageId());
699 try {
700 if (subs instanceof DurableTopicSubscription) {
701 ((DurableTopicSubscription)subs).removePending(reference);
702 }
703 acknowledge(context, subs, ack, reference);
704 } catch (Exception e) {
705 LOG.error("Failed to remove expired Message from the store ", e);
706 }
707 }
708
709 @Override
710 protected Logger getLog() {
711 return LOG;
712 }
713
714 protected boolean isOptimizeStorage(){
715 boolean result = false;
716
717 if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){
718 result = true;
719 for (DurableTopicSubscription s : durableSubcribers.values()) {
720 if (s.isActive()== false){
721 result = false;
722 break;
723 }
724 if (s.getPrefetchSize()==0){
725 result = false;
726 break;
727 }
728 if (s.isSlowConsumer()){
729 result = false;
730 break;
731 }
732 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
733 result = false;
734 break;
735 }
736 }
737 }
738 return result;
739 }
740 }