001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.policy;
018
019 import java.util.HashMap;
020 import java.util.Map;
021 import java.util.Map.Entry;
022 import java.util.concurrent.ConcurrentHashMap;
023 import java.util.concurrent.atomic.AtomicBoolean;
024
025 import org.apache.activemq.broker.Broker;
026 import org.apache.activemq.broker.Connection;
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.broker.region.Subscription;
029 import org.apache.activemq.command.ConsumerControl;
030 import org.apache.activemq.thread.Scheduler;
031 import org.apache.activemq.transport.InactivityIOException;
032 import org.slf4j.Logger;
033 import org.slf4j.LoggerFactory;
034
035 /**
036 * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds
037 *
038 * @org.apache.xbean.XBean
039 */
040 public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
041
042 private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class);
043
044 private String name = "AbortSlowConsumerStrategy@" + hashCode();
045 private Scheduler scheduler;
046 private Broker broker;
047 private final AtomicBoolean taskStarted = new AtomicBoolean(false);
048 private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
049
050 private long maxSlowCount = -1;
051 private long maxSlowDuration = 30*1000;
052 private long checkPeriod = 30*1000;
053 private boolean abortConnection = false;
054
055 public void setBrokerService(Broker broker) {
056 this.scheduler = broker.getScheduler();
057 this.broker = broker;
058 }
059
060 public void slowConsumer(ConnectionContext context, Subscription subs) {
061 if (maxSlowCount < 0 && maxSlowDuration < 0) {
062 // nothing to do
063 LOG.info("no limits set, slowConsumer strategy has nothing to do");
064 return;
065 }
066
067 if (taskStarted.compareAndSet(false, true)) {
068 scheduler.executePeriodically(this, checkPeriod);
069 }
070
071 if (!slowConsumers.containsKey(subs)) {
072 slowConsumers.put(subs, new SlowConsumerEntry(context));
073 } else if (maxSlowCount > 0) {
074 slowConsumers.get(subs).slow();
075 }
076 }
077
078 public void run() {
079 if (maxSlowDuration > 0) {
080 // mark
081 for (SlowConsumerEntry entry : slowConsumers.values()) {
082 entry.mark();
083 }
084 }
085
086 HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
087 for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
088 if (entry.getKey().isSlowConsumer()) {
089 if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration)
090 || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) {
091 toAbort.put(entry.getKey(), entry.getValue());
092 slowConsumers.remove(entry.getKey());
093 }
094 } else {
095 LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
096 slowConsumers.remove(entry.getKey());
097 }
098 }
099
100 abortSubscription(toAbort, abortConnection);
101 }
102
103 private void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
104 for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
105 ConnectionContext connectionContext = entry.getValue().context;
106 if (connectionContext!= null) {
107 try {
108 LOG.info("aborting "
109 + (abortSubscriberConnection ? "connection" : "consumer")
110 + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
111
112 final Connection connection = connectionContext.getConnection();
113 if (connection != null) {
114 if (abortSubscriberConnection) {
115 scheduler.executeAfterDelay(new Runnable() {
116 public void run() {
117 connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
118 + maxSlowCount + ") or too long (>"
119 + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
120 }}, 0l);
121 } else {
122 // just abort the consumer by telling it to stop
123 ConsumerControl stopConsumer = new ConsumerControl();
124 stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
125 stopConsumer.setClose(true);
126 connection.dispatchAsync(stopConsumer);
127 }
128 } else {
129 LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext);
130 }
131 } catch (Exception e) {
132 LOG.info("exception on stopping "
133 + (abortSubscriberConnection ? "connection" : "consumer")
134 + " to abort slow consumer: " + entry.getKey(), e);
135 }
136 }
137 }
138 }
139
140
141 public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) {
142 if (sub != null) {
143 SlowConsumerEntry entry = slowConsumers.remove(sub);
144 if (entry != null) {
145 Map toAbort = new HashMap<Subscription, SlowConsumerEntry>();
146 toAbort.put(sub, entry);
147 abortSubscription(toAbort, abortSubscriberConnection);
148 } else {
149 LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub);
150 }
151 }
152 }
153
154
155 public long getMaxSlowCount() {
156 return maxSlowCount;
157 }
158
159 /**
160 * number of times a subscription can be deemed slow before triggering abort
161 * effect depends on dispatch rate as slow determination is done on dispatch
162 */
163 public void setMaxSlowCount(long maxSlowCount) {
164 this.maxSlowCount = maxSlowCount;
165 }
166
167 public long getMaxSlowDuration() {
168 return maxSlowDuration;
169 }
170
171 /**
172 * time in milliseconds that a sub can remain slow before triggering
173 * an abort.
174 * @param maxSlowDuration
175 */
176 public void setMaxSlowDuration(long maxSlowDuration) {
177 this.maxSlowDuration = maxSlowDuration;
178 }
179
180 public long getCheckPeriod() {
181 return checkPeriod;
182 }
183
184 /**
185 * time in milliseconds between checks for slow subscriptions
186 * @param checkPeriod
187 */
188 public void setCheckPeriod(long checkPeriod) {
189 this.checkPeriod = checkPeriod;
190 }
191
192 public boolean isAbortConnection() {
193 return abortConnection;
194 }
195
196 /**
197 * abort the consumers connection rather than sending a stop command to the remote consumer
198 * @param abortConnection
199 */
200 public void setAbortConnection(boolean abortConnection) {
201 this.abortConnection = abortConnection;
202 }
203
204 public void setName(String name) {
205 this.name = name;
206 }
207
208 public String getName() {
209 return name;
210 }
211
212 public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
213 return slowConsumers;
214 }
215 }