001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.discovery.simple;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.util.concurrent.atomic.AtomicBoolean;
022
023 import org.apache.activemq.command.DiscoveryEvent;
024 import org.apache.activemq.thread.DefaultThreadPools;
025 import org.apache.activemq.transport.discovery.DiscoveryAgent;
026 import org.apache.activemq.transport.discovery.DiscoveryListener;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * A simple DiscoveryAgent that allows static configuration of the discovered
032 * services.
033 *
034 *
035 */
036 public class SimpleDiscoveryAgent implements DiscoveryAgent {
037
038 private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
039 private long initialReconnectDelay = 1000;
040 private long maxReconnectDelay = 1000 * 30;
041 private long backOffMultiplier = 2;
042 private boolean useExponentialBackOff=true;
043 private int maxReconnectAttempts;
044 private final Object sleepMutex = new Object();
045 private long minConnectTime = 5000;
046 private DiscoveryListener listener;
047 private String services[] = new String[] {};
048 private final AtomicBoolean running = new AtomicBoolean(false);
049
050 class SimpleDiscoveryEvent extends DiscoveryEvent {
051
052 private int connectFailures;
053 private long reconnectDelay = initialReconnectDelay;
054 private long connectTime = System.currentTimeMillis();
055 private AtomicBoolean failed = new AtomicBoolean(false);
056
057 public SimpleDiscoveryEvent(String service) {
058 super(service);
059 }
060
061 @Override
062 public String toString() {
063 return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]";
064 }
065 }
066
067 public void setDiscoveryListener(DiscoveryListener listener) {
068 this.listener = listener;
069 }
070
071 public void registerService(String name) throws IOException {
072 }
073
074 public void start() throws Exception {
075 running.set(true);
076 for (int i = 0; i < services.length; i++) {
077 listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
078 }
079 }
080
081 public void stop() throws Exception {
082 running.set(false);
083 synchronized (sleepMutex) {
084 sleepMutex.notifyAll();
085 }
086 }
087
088 public String[] getServices() {
089 return services;
090 }
091
092 public void setServices(String services) {
093 this.services = services.split(",");
094 }
095
096 public void setServices(String services[]) {
097 this.services = services;
098 }
099
100 public void setServices(URI services[]) {
101 this.services = new String[services.length];
102 for (int i = 0; i < services.length; i++) {
103 this.services[i] = services[i].toString();
104 }
105 }
106
107 public void serviceFailed(DiscoveryEvent devent) throws IOException {
108
109 final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
110 if (event.failed.compareAndSet(false, true)) {
111
112 listener.onServiceRemove(event);
113 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
114 public void run() {
115
116 // We detect a failed connection attempt because the service
117 // fails right
118 // away.
119 if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
120 LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event);
121
122 event.connectFailures++;
123
124 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
125 LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled for: " + event);
126 return;
127 }
128
129 synchronized (sleepMutex) {
130 try {
131 if (!running.get()) {
132 LOG.debug("Reconnecting disabled: stopped");
133 return;
134 }
135
136 LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
137 sleepMutex.wait(event.reconnectDelay);
138 } catch (InterruptedException ie) {
139 LOG.debug("Reconnecting disabled: " + ie);
140 Thread.currentThread().interrupt();
141 return;
142 }
143 }
144
145 if (!useExponentialBackOff) {
146 event.reconnectDelay = initialReconnectDelay;
147 } else {
148 // Exponential increment of reconnect delay.
149 event.reconnectDelay *= backOffMultiplier;
150 if (event.reconnectDelay > maxReconnectDelay) {
151 event.reconnectDelay = maxReconnectDelay;
152 }
153 }
154
155 } else {
156 event.connectFailures = 0;
157 event.reconnectDelay = initialReconnectDelay;
158 }
159
160 if (!running.get()) {
161 LOG.debug("Reconnecting disabled: stopped");
162 return;
163 }
164
165 event.connectTime = System.currentTimeMillis();
166 event.failed.set(false);
167 listener.onServiceAdd(event);
168 }
169 }, "Simple Discovery Agent");
170 }
171 }
172
173 public long getBackOffMultiplier() {
174 return backOffMultiplier;
175 }
176
177 public void setBackOffMultiplier(long backOffMultiplier) {
178 this.backOffMultiplier = backOffMultiplier;
179 }
180
181 public long getInitialReconnectDelay() {
182 return initialReconnectDelay;
183 }
184
185 public void setInitialReconnectDelay(long initialReconnectDelay) {
186 this.initialReconnectDelay = initialReconnectDelay;
187 }
188
189 public int getMaxReconnectAttempts() {
190 return maxReconnectAttempts;
191 }
192
193 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
194 this.maxReconnectAttempts = maxReconnectAttempts;
195 }
196
197 public long getMaxReconnectDelay() {
198 return maxReconnectDelay;
199 }
200
201 public void setMaxReconnectDelay(long maxReconnectDelay) {
202 this.maxReconnectDelay = maxReconnectDelay;
203 }
204
205 public long getMinConnectTime() {
206 return minConnectTime;
207 }
208
209 public void setMinConnectTime(long minConnectTime) {
210 this.minConnectTime = minConnectTime;
211 }
212
213 public boolean isUseExponentialBackOff() {
214 return useExponentialBackOff;
215 }
216
217 public void setUseExponentialBackOff(boolean useExponentialBackOff) {
218 this.useExponentialBackOff = useExponentialBackOff;
219 }
220 }