001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport.mqtt;
019
020 import java.io.IOException;
021 import java.util.Timer;
022 import java.util.concurrent.SynchronousQueue;
023 import java.util.concurrent.ThreadFactory;
024 import java.util.concurrent.ThreadPoolExecutor;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.atomic.AtomicBoolean;
027 import java.util.concurrent.atomic.AtomicInteger;
028 import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030 import org.apache.activemq.command.KeepAliveInfo;
031 import org.apache.activemq.thread.SchedulerTimerTask;
032 import org.apache.activemq.transport.AbstractInactivityMonitor;
033 import org.apache.activemq.transport.InactivityIOException;
034 import org.apache.activemq.transport.Transport;
035 import org.apache.activemq.transport.TransportFilter;
036 import org.apache.activemq.wireformat.WireFormat;
037 import org.slf4j.Logger;
038 import org.slf4j.LoggerFactory;
039
040 public class MQTTInactivityMonitor extends TransportFilter {
041
042 private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
043
044 private static ThreadPoolExecutor ASYNC_TASKS;
045 private static int CHECKER_COUNTER;
046 private static long DEFAULT_CHECK_TIME_MILLS = 30000;
047 private static Timer READ_CHECK_TIMER;
048
049 private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
050
051 private final AtomicBoolean commandSent = new AtomicBoolean(false);
052 private final AtomicBoolean inSend = new AtomicBoolean(false);
053 private final AtomicBoolean failed = new AtomicBoolean(false);
054
055 private final AtomicBoolean commandReceived = new AtomicBoolean(true);
056 private final AtomicBoolean inReceive = new AtomicBoolean(false);
057 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
058
059 private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
060 private SchedulerTimerTask readCheckerTask;
061
062 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
063 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
064 private boolean keepAliveResponseRequired;
065 private MQTTProtocolConverter protocolConverter;
066
067
068 private final Runnable readChecker = new Runnable() {
069 long lastRunTime;
070
071 public void run() {
072 long now = System.currentTimeMillis();
073 long elapsed = (now - lastRunTime);
074
075 if (lastRunTime != 0 && LOG.isDebugEnabled()) {
076 LOG.debug("" + elapsed + " ms elapsed since last read check.");
077 }
078
079 // Perhaps the timer executed a read check late.. and then executes
080 // the next read check on time which causes the time elapsed between
081 // read checks to be small..
082
083 // If less than 90% of the read check Time elapsed then abort this readcheck.
084 if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression.
085 LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
086 return;
087 }
088
089 lastRunTime = now;
090 readCheck();
091 }
092 };
093
094 private boolean allowReadCheck(long elapsed) {
095 return elapsed > (readCheckTime * 9 / 10);
096 }
097
098
099 public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
100 super(next);
101 }
102
103 public void start() throws Exception {
104 next.start();
105 startMonitorThread();
106 }
107
108 public void stop() throws Exception {
109 stopMonitorThread();
110 next.stop();
111 }
112
113
114 final void readCheck() {
115 int currentCounter = next.getReceiveCounter();
116 int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
117 if (inReceive.get() || currentCounter != previousCounter) {
118 if (LOG.isTraceEnabled()) {
119 LOG.trace("A receive is in progress");
120 }
121 return;
122 }
123 if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
124 if (LOG.isDebugEnabled()) {
125 LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
126 }
127 ASYNC_TASKS.execute(new Runnable() {
128 public void run() {
129 if (protocolConverter != null) {
130 protocolConverter.onTransportError();
131 }
132 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
133 }
134
135 ;
136 });
137 } else {
138 if (LOG.isTraceEnabled()) {
139 LOG.trace("Message received since last read check, resetting flag: ");
140 }
141 }
142 commandReceived.set(false);
143 }
144
145
146 public void onCommand(Object command) {
147 commandReceived.set(true);
148 inReceive.set(true);
149 try {
150 if (command.getClass() == KeepAliveInfo.class) {
151 KeepAliveInfo info = (KeepAliveInfo) command;
152 if (info.isResponseRequired()) {
153 sendLock.readLock().lock();
154 try {
155 info.setResponseRequired(false);
156 oneway(info);
157 } catch (IOException e) {
158 onException(e);
159 } finally {
160 sendLock.readLock().unlock();
161 }
162 }
163 } else {
164 transportListener.onCommand(command);
165 }
166 } finally {
167 inReceive.set(false);
168 }
169 }
170
171 public void oneway(Object o) throws IOException {
172 // To prevent the inactivity monitor from sending a message while we
173 // are performing a send we take a read lock. The inactivity monitor
174 // sends its Heart-beat commands under a write lock. This means that
175 // the MutexTransport is still responsible for synchronizing sends
176 this.sendLock.readLock().lock();
177 inSend.set(true);
178 try {
179 doOnewaySend(o);
180 } finally {
181 commandSent.set(true);
182 inSend.set(false);
183 this.sendLock.readLock().unlock();
184 }
185 }
186
187 // Must be called under lock, either read or write on sendLock.
188 private void doOnewaySend(Object command) throws IOException {
189 if (failed.get()) {
190 throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
191 }
192 next.oneway(command);
193 }
194
195 public void onException(IOException error) {
196 if (failed.compareAndSet(false, true)) {
197 stopMonitorThread();
198 transportListener.onException(error);
199 }
200 }
201
202
203 public long getReadCheckTime() {
204 return readCheckTime;
205 }
206
207 public void setReadCheckTime(long readCheckTime) {
208 this.readCheckTime = readCheckTime;
209 }
210
211
212 public long getInitialDelayTime() {
213 return initialDelayTime;
214 }
215
216 public void setInitialDelayTime(long initialDelayTime) {
217 this.initialDelayTime = initialDelayTime;
218 }
219
220 public boolean isKeepAliveResponseRequired() {
221 return this.keepAliveResponseRequired;
222 }
223
224 public void setKeepAliveResponseRequired(boolean value) {
225 this.keepAliveResponseRequired = value;
226 }
227
228 public boolean isMonitorStarted() {
229 return this.monitorStarted.get();
230 }
231
232 public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
233 this.protocolConverter = protocolConverter;
234 }
235
236 public MQTTProtocolConverter getProtocolConverter() {
237 return protocolConverter;
238 }
239
240 synchronized void startMonitorThread() {
241 if (monitorStarted.get()) {
242 return;
243 }
244
245
246 if (readCheckTime > 0) {
247 readCheckerTask = new SchedulerTimerTask(readChecker);
248 }
249
250
251 if (readCheckTime > 0) {
252 monitorStarted.set(true);
253 synchronized (AbstractInactivityMonitor.class) {
254 if (CHECKER_COUNTER == 0) {
255 ASYNC_TASKS = createExecutor();
256 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
257 }
258 CHECKER_COUNTER++;
259 if (readCheckTime > 0) {
260 READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
261 }
262 }
263 }
264 }
265
266
267 synchronized void stopMonitorThread() {
268 if (monitorStarted.compareAndSet(true, false)) {
269 if (readCheckerTask != null) {
270 readCheckerTask.cancel();
271 }
272
273 synchronized (AbstractInactivityMonitor.class) {
274 READ_CHECK_TIMER.purge();
275 CHECKER_COUNTER--;
276 if (CHECKER_COUNTER == 0) {
277 READ_CHECK_TIMER.cancel();
278 READ_CHECK_TIMER = null;
279 ASYNC_TASKS.shutdown();
280 ASYNC_TASKS = null;
281 }
282 }
283 }
284 }
285
286 private ThreadFactory factory = new ThreadFactory() {
287 public Thread newThread(Runnable runnable) {
288 Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
289 thread.setDaemon(true);
290 return thread;
291 }
292 };
293
294 private ThreadPoolExecutor createExecutor() {
295 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
296 exec.allowCoreThreadTimeOut(true);
297 return exec;
298 }
299 }
300