001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020 import java.util.Timer;
021 import java.util.concurrent.SynchronousQueue;
022 import java.util.concurrent.ThreadFactory;
023 import java.util.concurrent.ThreadPoolExecutor;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicBoolean;
026 import java.util.concurrent.atomic.AtomicInteger;
027 import java.util.concurrent.locks.ReentrantReadWriteLock;
028
029 import org.apache.activemq.command.KeepAliveInfo;
030 import org.apache.activemq.command.WireFormatInfo;
031 import org.apache.activemq.thread.SchedulerTimerTask;
032 import org.apache.activemq.wireformat.WireFormat;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 /**
037 * Used to make sure that commands are arriving periodically from the peer of
038 * the transport.
039 */
040 public abstract class AbstractInactivityMonitor extends TransportFilter {
041
042 private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
043
044 private static ThreadPoolExecutor ASYNC_TASKS;
045 private static int CHECKER_COUNTER;
046 private static long DEFAULT_CHECK_TIME_MILLS = 30000;
047 private static Timer READ_CHECK_TIMER;
048 private static Timer WRITE_CHECK_TIMER;
049
050 private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
051
052 private final AtomicBoolean commandSent = new AtomicBoolean(false);
053 private final AtomicBoolean inSend = new AtomicBoolean(false);
054 private final AtomicBoolean failed = new AtomicBoolean(false);
055
056 private final AtomicBoolean commandReceived = new AtomicBoolean(true);
057 private final AtomicBoolean inReceive = new AtomicBoolean(false);
058 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
059
060 private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
061
062 private SchedulerTimerTask writeCheckerTask;
063 private SchedulerTimerTask readCheckerTask;
064
065 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
066 private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
067 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
068 private boolean useKeepAlive = true;
069 private boolean keepAliveResponseRequired;
070
071 protected WireFormat wireFormat;
072
073 private final Runnable readChecker = new Runnable() {
074 long lastRunTime;
075 public void run() {
076 long now = System.currentTimeMillis();
077 long elapsed = (now-lastRunTime);
078
079 if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
080 LOG.debug(""+elapsed+" ms elapsed since last read check.");
081 }
082
083 // Perhaps the timer executed a read check late.. and then executes
084 // the next read check on time which causes the time elapsed between
085 // read checks to be small..
086
087 // If less than 90% of the read check Time elapsed then abort this readcheck.
088 if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
089 LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
090 return;
091 }
092
093 lastRunTime = now;
094 readCheck();
095 }
096 };
097
098 private boolean allowReadCheck(long elapsed) {
099 return elapsed > (readCheckTime * 9 / 10);
100 }
101
102 private final Runnable writeChecker = new Runnable() {
103 long lastRunTime;
104 public void run() {
105 long now = System.currentTimeMillis();
106 if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
107 LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
108
109 }
110 lastRunTime = now;
111 writeCheck();
112 }
113 };
114
115 public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
116 super(next);
117 this.wireFormat = wireFormat;
118 }
119
120 public void start() throws Exception {
121 next.start();
122 startMonitorThreads();
123 }
124
125 public void stop() throws Exception {
126 stopMonitorThreads();
127 next.stop();
128 }
129
130 final void writeCheck() {
131 if (inSend.get()) {
132 if (LOG.isTraceEnabled()) {
133 LOG.trace("A send is in progress");
134 }
135 return;
136 }
137
138 if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
139 if (LOG.isTraceEnabled()) {
140 LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
141 }
142 ASYNC_TASKS.execute(new Runnable() {
143 public void run() {
144 if (monitorStarted.get()) {
145 try {
146 // If we can't get the lock it means another write beat us into the
147 // send and we don't need to heart beat now.
148 if (sendLock.writeLock().tryLock()) {
149 KeepAliveInfo info = new KeepAliveInfo();
150 info.setResponseRequired(keepAliveResponseRequired);
151 doOnewaySend(info);
152 }
153 } catch (IOException e) {
154 onException(e);
155 } finally {
156 if (sendLock.writeLock().isHeldByCurrentThread()) {
157 sendLock.writeLock().unlock();
158 }
159 }
160 }
161 };
162 });
163 } else {
164 if (LOG.isTraceEnabled()) {
165 LOG.trace(this + " message sent since last write check, resetting flag");
166 }
167 }
168
169 commandSent.set(false);
170 }
171
172 final void readCheck() {
173 int currentCounter = next.getReceiveCounter();
174 int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
175 if (inReceive.get() || currentCounter!=previousCounter ) {
176 if (LOG.isTraceEnabled()) {
177 LOG.trace("A receive is in progress");
178 }
179 return;
180 }
181 if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
182 if (LOG.isDebugEnabled()) {
183 LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
184 }
185 ASYNC_TASKS.execute(new Runnable() {
186 public void run() {
187 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
188 };
189 });
190 } else {
191 if (LOG.isTraceEnabled()) {
192 LOG.trace("Message received since last read check, resetting flag: ");
193 }
194 }
195 commandReceived.set(false);
196 }
197
198 protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
199 protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
200
201 public void onCommand(Object command) {
202 commandReceived.set(true);
203 inReceive.set(true);
204 try {
205 if (command.getClass() == KeepAliveInfo.class) {
206 KeepAliveInfo info = (KeepAliveInfo) command;
207 if (info.isResponseRequired()) {
208 sendLock.readLock().lock();
209 try {
210 info.setResponseRequired(false);
211 oneway(info);
212 } catch (IOException e) {
213 onException(e);
214 } finally {
215 sendLock.readLock().unlock();
216 }
217 }
218 } else {
219 if (command.getClass() == WireFormatInfo.class) {
220 synchronized (this) {
221 try {
222 processInboundWireFormatInfo((WireFormatInfo) command);
223 } catch (IOException e) {
224 onException(e);
225 }
226 }
227 }
228
229 transportListener.onCommand(command);
230 }
231 } finally {
232 inReceive.set(false);
233 }
234 }
235
236 public void oneway(Object o) throws IOException {
237 // To prevent the inactivity monitor from sending a message while we
238 // are performing a send we take a read lock. The inactivity monitor
239 // sends its Heart-beat commands under a write lock. This means that
240 // the MutexTransport is still responsible for synchronizing sends
241 this.sendLock.readLock().lock();
242 inSend.set(true);
243 try {
244 doOnewaySend(o);
245 } finally {
246 commandSent.set(true);
247 inSend.set(false);
248 this.sendLock.readLock().unlock();
249 }
250 }
251
252 // Must be called under lock, either read or write on sendLock.
253 private void doOnewaySend(Object command) throws IOException {
254 if( failed.get() ) {
255 throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
256 }
257 if (command.getClass() == WireFormatInfo.class) {
258 synchronized (this) {
259 processOutboundWireFormatInfo((WireFormatInfo) command);
260 }
261 }
262 next.oneway(command);
263 }
264
265 public void onException(IOException error) {
266 if (failed.compareAndSet(false, true)) {
267 stopMonitorThreads();
268 transportListener.onException(error);
269 }
270 }
271
272 public void setUseKeepAlive(boolean val) {
273 useKeepAlive = val;
274 }
275
276 public long getReadCheckTime() {
277 return readCheckTime;
278 }
279
280 public void setReadCheckTime(long readCheckTime) {
281 this.readCheckTime = readCheckTime;
282 }
283
284 public long getWriteCheckTime() {
285 return writeCheckTime;
286 }
287
288 public void setWriteCheckTime(long writeCheckTime) {
289 this.writeCheckTime = writeCheckTime;
290 }
291
292 public long getInitialDelayTime() {
293 return initialDelayTime;
294 }
295
296 public void setInitialDelayTime(long initialDelayTime) {
297 this.initialDelayTime = initialDelayTime;
298 }
299
300 public boolean isKeepAliveResponseRequired() {
301 return this.keepAliveResponseRequired;
302 }
303
304 public void setKeepAliveResponseRequired(boolean value) {
305 this.keepAliveResponseRequired = value;
306 }
307
308 public boolean isMonitorStarted() {
309 return this.monitorStarted.get();
310 }
311
312 protected synchronized void startMonitorThreads() throws IOException {
313 if (monitorStarted.get()) {
314 return;
315 }
316
317 if (!configuredOk()) {
318 return;
319 }
320
321 if (readCheckTime > 0) {
322 readCheckerTask = new SchedulerTimerTask(readChecker);
323 }
324
325 if (writeCheckTime > 0) {
326 writeCheckerTask = new SchedulerTimerTask(writeChecker);
327 }
328
329 if (writeCheckTime > 0 || readCheckTime > 0) {
330 monitorStarted.set(true);
331 synchronized(AbstractInactivityMonitor.class) {
332 if( CHECKER_COUNTER == 0 ) {
333 ASYNC_TASKS = createExecutor();
334 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
335 WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
336 }
337 CHECKER_COUNTER++;
338 if (readCheckTime > 0) {
339 READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
340 }
341 if (writeCheckTime > 0) {
342 WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
343 }
344 }
345 }
346 }
347
348 abstract protected boolean configuredOk() throws IOException;
349
350 protected synchronized void stopMonitorThreads() {
351 if (monitorStarted.compareAndSet(true, false)) {
352 if (readCheckerTask != null) {
353 readCheckerTask.cancel();
354 }
355 if (writeCheckerTask != null) {
356 writeCheckerTask.cancel();
357 }
358 synchronized( AbstractInactivityMonitor.class ) {
359 WRITE_CHECK_TIMER.purge();
360 READ_CHECK_TIMER.purge();
361 CHECKER_COUNTER--;
362 if(CHECKER_COUNTER==0) {
363 WRITE_CHECK_TIMER.cancel();
364 READ_CHECK_TIMER.cancel();
365 WRITE_CHECK_TIMER = null;
366 READ_CHECK_TIMER = null;
367 ASYNC_TASKS.shutdown();
368 ASYNC_TASKS = null;
369 }
370 }
371 }
372 }
373
374 private ThreadFactory factory = new ThreadFactory() {
375 public Thread newThread(Runnable runnable) {
376 Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
377 thread.setDaemon(true);
378 return thread;
379 }
380 };
381
382 private ThreadPoolExecutor createExecutor() {
383 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
384 exec.allowCoreThreadTimeOut(true);
385 return exec;
386 }
387 }