001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.List;
021 import javax.jms.ResourceAllocationException;
022 import org.apache.activemq.advisory.AdvisorySupport;
023 import org.apache.activemq.broker.Broker;
024 import org.apache.activemq.broker.BrokerService;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.broker.ProducerBrokerExchange;
027 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
028 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ActiveMQTopic;
031 import org.apache.activemq.command.Message;
032 import org.apache.activemq.command.MessageDispatchNotification;
033 import org.apache.activemq.command.ProducerInfo;
034 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
035 import org.apache.activemq.security.SecurityContext;
036 import org.apache.activemq.state.ProducerState;
037 import org.apache.activemq.store.MessageStore;
038 import org.apache.activemq.thread.Scheduler;
039 import org.apache.activemq.usage.MemoryUsage;
040 import org.apache.activemq.usage.SystemUsage;
041 import org.apache.activemq.usage.Usage;
042 import org.slf4j.Logger;
043
044 /**
045 *
046 */
047 public abstract class BaseDestination implements Destination {
048 /**
049 * The maximum number of messages to page in to the destination from
050 * persistent storage
051 */
052 public static final int MAX_PAGE_SIZE = 200;
053 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
054 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
055 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
056 public static final int MAX_PRODUCERS_TO_AUDIT = 64;
057 public static final int MAX_AUDIT_DEPTH = 2048;
058
059 protected final ActiveMQDestination destination;
060 protected final Broker broker;
061 protected final MessageStore store;
062 protected SystemUsage systemUsage;
063 protected MemoryUsage memoryUsage;
064 private boolean producerFlowControl = true;
065 private boolean alwaysRetroactive = false;
066 protected boolean warnOnProducerFlowControl = true;
067 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
068
069 private int maxProducersToAudit = 1024;
070 private int maxAuditDepth = 2048;
071 private boolean enableAudit = true;
072 private int maxPageSize = MAX_PAGE_SIZE;
073 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
074 private boolean useCache = true;
075 private int minimumMessageSize = 1024;
076 private boolean lazyDispatch = false;
077 private boolean advisoryForSlowConsumers;
078 private boolean advisdoryForFastProducers;
079 private boolean advisoryForDiscardingMessages;
080 private boolean advisoryWhenFull;
081 private boolean advisoryForDelivery;
082 private boolean advisoryForConsumed;
083 private boolean sendAdvisoryIfNoConsumers;
084 protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
085 protected final BrokerService brokerService;
086 protected final Broker regionBroker;
087 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
088 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
089 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
090 protected int cursorMemoryHighWaterMark = 70;
091 protected int storeUsageHighWaterMark = 100;
092 private SlowConsumerStrategy slowConsumerStrategy;
093 private boolean prioritizedMessages;
094 private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
095 private boolean gcIfInactive;
096 private boolean gcWithNetworkConsumers;
097 private long lastActiveTime=0l;
098 private boolean reduceMemoryFootprint = false;
099 protected final Scheduler scheduler;
100 private boolean disposed = false;
101 private boolean doOptimzeMessageStorage = true;
102 /*
103 * percentage of in-flight messages above which optimize message store is disabled
104 */
105 private int optimizeMessageStoreInFlightLimit = 10;
106
107 /**
108 * @param brokerService
109 * @param store
110 * @param destination
111 * @param parentStats
112 * @throws Exception
113 */
114 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
115 this.brokerService = brokerService;
116 this.broker = brokerService.getBroker();
117 this.store = store;
118 this.destination = destination;
119 // let's copy the enabled property from the parent DestinationStatistics
120 this.destinationStatistics.setEnabled(parentStats.isEnabled());
121 this.destinationStatistics.setParent(parentStats);
122 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
123 this.memoryUsage = this.systemUsage.getMemoryUsage();
124 this.memoryUsage.setUsagePortion(1.0f);
125 this.regionBroker = brokerService.getRegionBroker();
126 this.scheduler = brokerService.getBroker().getScheduler();
127 }
128
129 /**
130 * initialize the destination
131 *
132 * @throws Exception
133 */
134 public void initialize() throws Exception {
135 // Let the store know what usage manager we are using so that he can
136 // flush messages to disk when usage gets high.
137 if (store != null) {
138 store.setMemoryUsage(this.memoryUsage);
139 }
140 }
141
142 /**
143 * @return the producerFlowControl
144 */
145 public boolean isProducerFlowControl() {
146 return producerFlowControl;
147 }
148
149 /**
150 * @param producerFlowControl the producerFlowControl to set
151 */
152 public void setProducerFlowControl(boolean producerFlowControl) {
153 this.producerFlowControl = producerFlowControl;
154 }
155
156 public boolean isAlwaysRetroactive() {
157 return alwaysRetroactive;
158 }
159
160 public void setAlwaysRetroactive(boolean alwaysRetroactive) {
161 this.alwaysRetroactive = alwaysRetroactive;
162 }
163
164 /**
165 * Set's the interval at which warnings about producers being blocked by
166 * resource usage will be triggered. Values of 0 or less will disable
167 * warnings
168 *
169 * @param blockedProducerWarningInterval the interval at which warning about
170 * blocked producers will be triggered.
171 */
172 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
173 this.blockedProducerWarningInterval = blockedProducerWarningInterval;
174 }
175
176 /**
177 *
178 * @return the interval at which warning about blocked producers will be
179 * triggered.
180 */
181 public long getBlockedProducerWarningInterval() {
182 return blockedProducerWarningInterval;
183 }
184
185 /**
186 * @return the maxProducersToAudit
187 */
188 public int getMaxProducersToAudit() {
189 return maxProducersToAudit;
190 }
191
192 /**
193 * @param maxProducersToAudit the maxProducersToAudit to set
194 */
195 public void setMaxProducersToAudit(int maxProducersToAudit) {
196 this.maxProducersToAudit = maxProducersToAudit;
197 }
198
199 /**
200 * @return the maxAuditDepth
201 */
202 public int getMaxAuditDepth() {
203 return maxAuditDepth;
204 }
205
206 /**
207 * @param maxAuditDepth the maxAuditDepth to set
208 */
209 public void setMaxAuditDepth(int maxAuditDepth) {
210 this.maxAuditDepth = maxAuditDepth;
211 }
212
213 /**
214 * @return the enableAudit
215 */
216 public boolean isEnableAudit() {
217 return enableAudit;
218 }
219
220 /**
221 * @param enableAudit the enableAudit to set
222 */
223 public void setEnableAudit(boolean enableAudit) {
224 this.enableAudit = enableAudit;
225 }
226
227 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
228 destinationStatistics.getProducers().increment();
229 this.lastActiveTime=0l;
230 }
231
232 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
233 destinationStatistics.getProducers().decrement();
234 }
235
236 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
237 destinationStatistics.getConsumers().increment();
238 this.lastActiveTime=0l;
239 }
240
241 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
242 destinationStatistics.getConsumers().decrement();
243 }
244
245
246 public final MemoryUsage getMemoryUsage() {
247 return memoryUsage;
248 }
249
250 public DestinationStatistics getDestinationStatistics() {
251 return destinationStatistics;
252 }
253
254 public ActiveMQDestination getActiveMQDestination() {
255 return destination;
256 }
257
258 public final String getName() {
259 return getActiveMQDestination().getPhysicalName();
260 }
261
262 public final MessageStore getMessageStore() {
263 return store;
264 }
265
266 public boolean isActive() {
267 boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
268 destinationStatistics.getProducers().getCount() != 0;
269 if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
270 isActive = hasRegularConsumers(getConsumers());
271 }
272 return isActive;
273 }
274
275 public int getMaxPageSize() {
276 return maxPageSize;
277 }
278
279 public void setMaxPageSize(int maxPageSize) {
280 this.maxPageSize = maxPageSize;
281 }
282
283 public int getMaxBrowsePageSize() {
284 return this.maxBrowsePageSize;
285 }
286
287 public void setMaxBrowsePageSize(int maxPageSize) {
288 this.maxBrowsePageSize = maxPageSize;
289 }
290
291 public int getMaxExpirePageSize() {
292 return this.maxExpirePageSize;
293 }
294
295 public void setMaxExpirePageSize(int maxPageSize) {
296 this.maxExpirePageSize = maxPageSize;
297 }
298
299 public void setExpireMessagesPeriod(long expireMessagesPeriod) {
300 this.expireMessagesPeriod = expireMessagesPeriod;
301 }
302
303 public long getExpireMessagesPeriod() {
304 return expireMessagesPeriod;
305 }
306
307 public boolean isUseCache() {
308 return useCache;
309 }
310
311 public void setUseCache(boolean useCache) {
312 this.useCache = useCache;
313 }
314
315 public int getMinimumMessageSize() {
316 return minimumMessageSize;
317 }
318
319 public void setMinimumMessageSize(int minimumMessageSize) {
320 this.minimumMessageSize = minimumMessageSize;
321 }
322
323 public boolean isLazyDispatch() {
324 return lazyDispatch;
325 }
326
327 public void setLazyDispatch(boolean lazyDispatch) {
328 this.lazyDispatch = lazyDispatch;
329 }
330
331 protected long getDestinationSequenceId() {
332 return regionBroker.getBrokerSequenceId();
333 }
334
335 /**
336 * @return the advisoryForSlowConsumers
337 */
338 public boolean isAdvisoryForSlowConsumers() {
339 return advisoryForSlowConsumers;
340 }
341
342 /**
343 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
344 */
345 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
346 this.advisoryForSlowConsumers = advisoryForSlowConsumers;
347 }
348
349 /**
350 * @return the advisoryForDiscardingMessages
351 */
352 public boolean isAdvisoryForDiscardingMessages() {
353 return advisoryForDiscardingMessages;
354 }
355
356 /**
357 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
358 * set
359 */
360 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
361 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
362 }
363
364 /**
365 * @return the advisoryWhenFull
366 */
367 public boolean isAdvisoryWhenFull() {
368 return advisoryWhenFull;
369 }
370
371 /**
372 * @param advisoryWhenFull the advisoryWhenFull to set
373 */
374 public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
375 this.advisoryWhenFull = advisoryWhenFull;
376 }
377
378 /**
379 * @return the advisoryForDelivery
380 */
381 public boolean isAdvisoryForDelivery() {
382 return advisoryForDelivery;
383 }
384
385 /**
386 * @param advisoryForDelivery the advisoryForDelivery to set
387 */
388 public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
389 this.advisoryForDelivery = advisoryForDelivery;
390 }
391
392 /**
393 * @return the advisoryForConsumed
394 */
395 public boolean isAdvisoryForConsumed() {
396 return advisoryForConsumed;
397 }
398
399 /**
400 * @param advisoryForConsumed the advisoryForConsumed to set
401 */
402 public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
403 this.advisoryForConsumed = advisoryForConsumed;
404 }
405
406 /**
407 * @return the advisdoryForFastProducers
408 */
409 public boolean isAdvisdoryForFastProducers() {
410 return advisdoryForFastProducers;
411 }
412
413 /**
414 * @param advisdoryForFastProducers the advisdoryForFastProducers to set
415 */
416 public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
417 this.advisdoryForFastProducers = advisdoryForFastProducers;
418 }
419
420 public boolean isSendAdvisoryIfNoConsumers() {
421 return sendAdvisoryIfNoConsumers;
422 }
423
424 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
425 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
426 }
427
428 /**
429 * @return the dead letter strategy
430 */
431 public DeadLetterStrategy getDeadLetterStrategy() {
432 return deadLetterStrategy;
433 }
434
435 /**
436 * set the dead letter strategy
437 *
438 * @param deadLetterStrategy
439 */
440 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
441 this.deadLetterStrategy = deadLetterStrategy;
442 }
443
444 public int getCursorMemoryHighWaterMark() {
445 return this.cursorMemoryHighWaterMark;
446 }
447
448 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
449 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
450 }
451
452 /**
453 * called when message is consumed
454 *
455 * @param context
456 * @param messageReference
457 */
458 public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
459 if (advisoryForConsumed) {
460 broker.messageConsumed(context, messageReference);
461 }
462 }
463
464 /**
465 * Called when message is delivered to the broker
466 *
467 * @param context
468 * @param messageReference
469 */
470 public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
471 if (advisoryForDelivery) {
472 broker.messageDelivered(context, messageReference);
473 }
474 }
475
476 /**
477 * Called when a message is discarded - e.g. running low on memory This will
478 * happen only if the policy is enabled - e.g. non durable topics
479 *
480 * @param context
481 * @param messageReference
482 */
483 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
484 if (advisoryForDiscardingMessages) {
485 broker.messageDiscarded(context, sub, messageReference);
486 }
487 }
488
489 /**
490 * Called when there is a slow consumer
491 *
492 * @param context
493 * @param subs
494 */
495 public void slowConsumer(ConnectionContext context, Subscription subs) {
496 if (advisoryForSlowConsumers) {
497 broker.slowConsumer(context, this, subs);
498 }
499 if (slowConsumerStrategy != null) {
500 slowConsumerStrategy.slowConsumer(context, subs);
501 }
502 }
503
504 /**
505 * Called to notify a producer is too fast
506 *
507 * @param context
508 * @param producerInfo
509 */
510 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
511 if (advisdoryForFastProducers) {
512 broker.fastProducer(context, producerInfo);
513 }
514 }
515
516 /**
517 * Called when a Usage reaches a limit
518 *
519 * @param context
520 * @param usage
521 */
522 public void isFull(ConnectionContext context, Usage<?> usage) {
523 if (advisoryWhenFull) {
524 broker.isFull(context, this, usage);
525 }
526 }
527
528 public void dispose(ConnectionContext context) throws IOException {
529 if (this.store != null) {
530 this.store.removeAllMessages(context);
531 this.store.dispose(context);
532 }
533 this.destinationStatistics.setParent(null);
534 this.memoryUsage.stop();
535 this.disposed = true;
536 }
537
538 public boolean isDisposed() {
539 return this.disposed;
540 }
541
542 /**
543 * Provides a hook to allow messages with no consumer to be processed in
544 * some way - such as to send to a dead letter queue or something..
545 */
546 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
547 if (!msg.isPersistent()) {
548 if (isSendAdvisoryIfNoConsumers()) {
549 // allow messages with no consumers to be dispatched to a dead
550 // letter queue
551 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
552
553 Message message = msg.copy();
554 // The original destination and transaction id do not get
555 // filled when the message is first sent,
556 // it is only populated if the message is routed to another
557 // destination like the DLQ
558 if (message.getOriginalDestination() != null) {
559 message.setOriginalDestination(message.getDestination());
560 }
561 if (message.getOriginalTransactionId() != null) {
562 message.setOriginalTransactionId(message.getTransactionId());
563 }
564
565 ActiveMQTopic advisoryTopic;
566 if (destination.isQueue()) {
567 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
568 } else {
569 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
570 }
571 message.setDestination(advisoryTopic);
572 message.setTransactionId(null);
573
574 // Disable flow control for this since since we don't want
575 // to block.
576 boolean originalFlowControl = context.isProducerFlowControl();
577 try {
578 context.setProducerFlowControl(false);
579 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
580 producerExchange.setMutable(false);
581 producerExchange.setConnectionContext(context);
582 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
583 context.getBroker().send(producerExchange, message);
584 } finally {
585 context.setProducerFlowControl(originalFlowControl);
586 }
587
588 }
589 }
590 }
591 }
592
593 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
594 }
595
596 public final int getStoreUsageHighWaterMark() {
597 return this.storeUsageHighWaterMark;
598 }
599
600 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
601 this.storeUsageHighWaterMark = storeUsageHighWaterMark;
602 }
603
604 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
605 waitForSpace(context, usage, 100, warning);
606 }
607
608 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
609 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
610 getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning);
611 throw new ResourceAllocationException(warning);
612 }
613 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
614 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
615 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
616 throw new ResourceAllocationException(warning);
617 }
618 } else {
619 long start = System.currentTimeMillis();
620 long nextWarn = start;
621 while (!usage.waitForSpace(1000, highWaterMark)) {
622 if (context.getStopping().get()) {
623 throw new IOException("Connection closed, send aborted.");
624 }
625
626 long now = System.currentTimeMillis();
627 if (now >= nextWarn) {
628 getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
629 nextWarn = now + blockedProducerWarningInterval;
630 }
631 }
632 }
633 }
634
635 protected abstract Logger getLog();
636
637 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
638 this.slowConsumerStrategy = slowConsumerStrategy;
639 }
640
641 public SlowConsumerStrategy getSlowConsumerStrategy() {
642 return this.slowConsumerStrategy;
643 }
644
645
646 public boolean isPrioritizedMessages() {
647 return this.prioritizedMessages;
648 }
649
650 public void setPrioritizedMessages(boolean prioritizedMessages) {
651 this.prioritizedMessages = prioritizedMessages;
652 if (store != null) {
653 store.setPrioritizedMessages(prioritizedMessages);
654 }
655 }
656
657 /**
658 * @return the inactiveTimoutBeforeGC
659 */
660 public long getInactiveTimoutBeforeGC() {
661 return this.inactiveTimoutBeforeGC;
662 }
663
664 /**
665 * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
666 */
667 public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
668 this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
669 }
670
671 /**
672 * @return the gcIfInactive
673 */
674 public boolean isGcIfInactive() {
675 return this.gcIfInactive;
676 }
677
678 /**
679 * @param gcIfInactive the gcIfInactive to set
680 */
681 public void setGcIfInactive(boolean gcIfInactive) {
682 this.gcIfInactive = gcIfInactive;
683 }
684
685 /**
686 * Indicate if it is ok to gc destinations that have only network consumers
687 * @param gcWithNetworkConsumers
688 */
689 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
690 this.gcWithNetworkConsumers = gcWithNetworkConsumers;
691 }
692
693 public boolean isGcWithNetworkConsumers() {
694 return gcWithNetworkConsumers;
695 }
696
697 public void markForGC(long timeStamp) {
698 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
699 && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
700 this.lastActiveTime = timeStamp;
701 }
702 }
703
704 public boolean canGC() {
705 boolean result = false;
706 if (isGcIfInactive()&& this.lastActiveTime != 0l) {
707 if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
708 result = true;
709 }
710 }
711 return result;
712 }
713
714 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
715 this.reduceMemoryFootprint = reduceMemoryFootprint;
716 }
717
718 protected boolean isReduceMemoryFootprint() {
719 return this.reduceMemoryFootprint;
720 }
721
722 public boolean isDoOptimzeMessageStorage() {
723 return doOptimzeMessageStorage;
724 }
725
726 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
727 this.doOptimzeMessageStorage = doOptimzeMessageStorage;
728 }
729
730 public int getOptimizeMessageStoreInFlightLimit() {
731 return optimizeMessageStoreInFlightLimit;
732 }
733
734 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
735 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
736 }
737
738
739 public abstract List<Subscription> getConsumers();
740
741 protected boolean hasRegularConsumers(List<Subscription> consumers) {
742 boolean hasRegularConsumers = false;
743 for (Subscription subscription: consumers) {
744 if (!subscription.getConsumerInfo().isNetworkSubscription()) {
745 hasRegularConsumers = true;
746 break;
747 }
748 }
749 return hasRegularConsumers;
750 }
751
752 protected ConnectionContext createConnectionContext() {
753 ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
754 answer.setBroker(this.broker);
755 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
756 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
757 return answer;
758 }
759 }