001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.util.Iterator;
020 import java.util.Set;
021 import java.util.concurrent.CopyOnWriteArraySet;
022 import java.util.concurrent.atomic.AtomicBoolean;
023
024 import org.apache.activemq.Service;
025 import org.apache.activemq.ThreadPriorities;
026 import org.slf4j.Logger;
027 import org.slf4j.LoggerFactory;
028
029 /**
030 * Used to provide information on the status of the Connection
031 *
032 *
033 */
034 public class TransportStatusDetector implements Service, Runnable {
035 private static final Logger LOG = LoggerFactory.getLogger(TransportStatusDetector.class);
036 private TransportConnector connector;
037 private Set<TransportConnection> collectionCandidates = new CopyOnWriteArraySet<TransportConnection>();
038 private AtomicBoolean started = new AtomicBoolean(false);
039 private Thread runner;
040 private int sweepInterval = 5000;
041
042 TransportStatusDetector(TransportConnector connector) {
043 this.connector = connector;
044 }
045
046 /**
047 * @return Returns the sweepInterval.
048 */
049 public int getSweepInterval() {
050 return sweepInterval;
051 }
052
053 /**
054 * The sweepInterval to set.
055 *
056 * @param sweepInterval
057 */
058 public void setSweepInterval(int sweepInterval) {
059 this.sweepInterval = sweepInterval;
060 }
061
062 protected void doCollection() {
063 for (Iterator<TransportConnection> i = collectionCandidates.iterator(); i.hasNext();) {
064 TransportConnection tc = i.next();
065 if (tc.isMarkedCandidate()) {
066 if (tc.isBlockedCandidate()) {
067 collectionCandidates.remove(tc);
068 doCollection(tc);
069 } else {
070 tc.doMark();
071 }
072 } else {
073 collectionCandidates.remove(tc);
074 }
075 }
076 }
077
078 protected void doSweep() {
079 for (Iterator i = connector.getConnections().iterator(); i.hasNext();) {
080 TransportConnection connection = (TransportConnection)i.next();
081 if (connection.isMarkedCandidate()) {
082 connection.doMark();
083 collectionCandidates.add(connection);
084 }
085 }
086 }
087
088 protected void doCollection(TransportConnection tc) {
089 LOG.warn("found a blocked client - stopping: " + tc);
090 try {
091 tc.stop();
092 } catch (Exception e) {
093 LOG.error("Error stopping " + tc, e);
094 }
095 }
096
097 public void run() {
098 while (started.get()) {
099 try {
100 doCollection();
101 doSweep();
102 Thread.sleep(sweepInterval);
103 } catch (Throwable e) {
104 LOG.error("failed to complete a sweep for blocked clients", e);
105 }
106 }
107 }
108
109 public void start() throws Exception {
110 if (started.compareAndSet(false, true)) {
111 runner = new Thread(this, "ActiveMQ Transport Status Monitor: " + connector);
112 runner.setDaemon(true);
113 runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
114 runner.start();
115 }
116 }
117
118 public void stop() throws Exception {
119 started.set(false);
120 if (runner != null) {
121 runner.join(getSweepInterval() * 5);
122 }
123 }
124 }