001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.util.List;
020 import org.apache.activemq.broker.region.Subscription;
021 import org.apache.activemq.command.BrokerId;
022 import org.apache.activemq.command.ConsumerInfo;
023 import org.apache.activemq.command.Message;
024 import org.apache.activemq.command.NetworkBridgeFilter;
025 import org.apache.activemq.filter.MessageEvaluationContext;
026 import org.slf4j.Logger;
027 import org.slf4j.LoggerFactory;
028
029 /**
030 * implement conditional behaviour for queue consumers,
031 * allows replaying back to origin if no consumers are present on the local broker
032 * after a configurable delay, irrespective of the networkTTL
033 * Also allows rate limiting of messages through the network, useful for static includes
034 *
035 * @org.apache.xbean.XBean
036 */
037
038 public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
039 boolean replayWhenNoConsumers = false;
040 int replayDelay = 0;
041 int rateLimit = 0;
042 int rateDuration = 1000;
043
044 @Override
045 public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
046 ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
047 filter.setNetworkBrokerId(remoteBrokerPath[0]);
048 filter.setNetworkTTL(networkTimeToLive);
049 filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
050 filter.setRateLimit(getRateLimit());
051 filter.setRateDuration(getRateDuration());
052 filter.setReplayDelay(getReplayDelay());
053 return filter;
054 }
055
056 public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) {
057 this.replayWhenNoConsumers = replayWhenNoConsumers;
058 }
059
060 public boolean isReplayWhenNoConsumers() {
061 return replayWhenNoConsumers;
062 }
063
064 public void setRateLimit(int rateLimit) {
065 this.rateLimit = rateLimit;
066 }
067
068 public int getRateLimit() {
069 return rateLimit;
070 }
071
072 public int getRateDuration() {
073 return rateDuration;
074 }
075
076 public void setRateDuration(int rateDuration) {
077 this.rateDuration = rateDuration;
078 }
079
080 public int getReplayDelay() {
081 return replayDelay;
082 }
083
084 public void setReplayDelay(int replayDelay) {
085 this.replayDelay = replayDelay;
086 }
087
088 private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter {
089 final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);
090 private int rateLimit;
091 private int rateDuration = 1000;
092 private boolean allowReplayWhenNoConsumers = true;
093 private int replayDelay = 1000;
094
095 private int matchCount;
096 private long rateDurationEnd;
097
098 @Override
099 protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
100 boolean match = true;
101 if (mec.getDestination().isQueue()) {
102 if (contains(message.getBrokerPath(), networkBrokerId)) {
103 // potential replay back to origin
104 match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
105
106 if (match && LOG.isTraceEnabled()) {
107 LOG.trace("Replaying [" + message.getMessageId() +"] for [" + message.getDestination() +"] back to origin in the absence of a local consumer");
108 }
109 }
110
111 if (match && rateLimitExceeded()) {
112 if (LOG.isTraceEnabled()) {
113 LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + ">" + rateLimit + "/" + rateDuration);
114 }
115 match = false;
116 }
117
118 } else {
119 // use existing logic for topics
120 match = super.matchesForwardingFilter(message, mec);
121 }
122
123 return match;
124 }
125
126 private boolean hasNotJustArrived(Message message) {
127 return replayDelay ==0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
128 }
129
130 private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
131 List<Subscription> consumers = mec.getMessageReference().getRegionDestination().getConsumers();
132 for (Subscription sub : consumers) {
133 if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
134 if (LOG.isTraceEnabled()) {
135 LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() +"] to origin due to existing local consumer: " + sub.getConsumerInfo());
136 }
137 return false;
138 }
139 }
140 return true;
141 }
142
143 private boolean rateLimitExceeded() {
144 if (rateLimit == 0) {
145 return false;
146 }
147
148 if (rateDurationEnd < System.currentTimeMillis()) {
149 rateDurationEnd = System.currentTimeMillis() + rateDuration;
150 matchCount = 0;
151 }
152 return ++matchCount > rateLimit;
153 }
154
155 public void setReplayDelay(int replayDelay) {
156 this.replayDelay = replayDelay;
157 }
158
159 public void setRateLimit(int rateLimit) {
160 this.rateLimit = rateLimit;
161 }
162
163 public void setRateDuration(int rateDuration) {
164 this.rateDuration = rateDuration;
165 }
166
167 public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) {
168 this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers;
169 }
170 }
171 }