001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport.discovery.multicast;
019
020 import java.io.IOException;
021 import java.net.DatagramPacket;
022 import java.net.InetAddress;
023 import java.net.InetSocketAddress;
024 import java.net.MulticastSocket;
025 import java.net.NetworkInterface;
026 import java.net.SocketAddress;
027 import java.net.SocketTimeoutException;
028 import java.net.URI;
029 import java.util.Iterator;
030 import java.util.Map;
031 import java.util.concurrent.ConcurrentHashMap;
032 import java.util.concurrent.ExecutorService;
033 import java.util.concurrent.LinkedBlockingQueue;
034 import java.util.concurrent.ThreadFactory;
035 import java.util.concurrent.ThreadPoolExecutor;
036 import java.util.concurrent.TimeUnit;
037 import java.util.concurrent.atomic.AtomicBoolean;
038
039 import org.apache.activemq.command.DiscoveryEvent;
040 import org.apache.activemq.transport.discovery.DiscoveryAgent;
041 import org.apache.activemq.transport.discovery.DiscoveryListener;
042 import org.slf4j.Logger;
043 import org.slf4j.LoggerFactory;
044
045 /**
046 * A {@link DiscoveryAgent} using a multicast address and heartbeat packets
047 * encoded using any wireformat, but openwire by default.
048 *
049 *
050 */
051 public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
052
053 public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
054 public static final String DEFAULT_HOST_STR = "default";
055 public static final String DEFAULT_HOST_IP = System.getProperty("activemq.partition.discovery", "239.255.2.3");
056 public static final int DEFAULT_PORT = 6155;
057
058 private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);
059 private static final String TYPE_SUFFIX = "ActiveMQ-4.";
060 private static final String ALIVE = "alive.";
061 private static final String DEAD = "dead.";
062 private static final String DELIMITER = "%";
063 private static final int BUFF_SIZE = 8192;
064 private static final int DEFAULT_IDLE_TIME = 500;
065 private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
066
067 private long initialReconnectDelay = 1000 * 5;
068 private long maxReconnectDelay = 1000 * 30;
069 private long backOffMultiplier = 2;
070 private boolean useExponentialBackOff;
071 private int maxReconnectAttempts;
072
073 private int timeToLive = 1;
074 private boolean loopBackMode;
075 private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
076 private String group = "default";
077 private URI discoveryURI;
078 private InetAddress inetAddress;
079 private SocketAddress sockAddress;
080 private DiscoveryListener discoveryListener;
081 private String selfService;
082 private MulticastSocket mcast;
083 private Thread runner;
084 private long keepAliveInterval = DEFAULT_IDLE_TIME;
085 private String mcInterface;
086 private String mcNetworkInterface;
087 private String mcJoinNetworkInterface;
088 private long lastAdvertizeTime;
089 private AtomicBoolean started = new AtomicBoolean(false);
090 private boolean reportAdvertizeFailed = true;
091 private ExecutorService executor = null;
092
093 class RemoteBrokerData {
094 final String brokerName;
095 final String service;
096 long lastHeartBeat;
097 long recoveryTime;
098 int failureCount;
099 boolean failed;
100
101 public RemoteBrokerData(String brokerName, String service) {
102 this.brokerName = brokerName;
103 this.service = service;
104 this.lastHeartBeat = System.currentTimeMillis();
105 }
106
107 public synchronized void updateHeartBeat() {
108 lastHeartBeat = System.currentTimeMillis();
109
110 // Consider that the broker recovery has succeeded if it has not
111 // failed in 60 seconds.
112 if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
113 if (LOG.isDebugEnabled()) {
114 LOG.debug("I now think that the " + service + " service has recovered.");
115 }
116 failureCount = 0;
117 recoveryTime = 0;
118 }
119 }
120
121 public synchronized long getLastHeartBeat() {
122 return lastHeartBeat;
123 }
124
125 public synchronized boolean markFailed() {
126 if (!failed) {
127 failed = true;
128 failureCount++;
129
130 long reconnectDelay;
131 if (!useExponentialBackOff) {
132 reconnectDelay = initialReconnectDelay;
133 } else {
134 reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
135 if (reconnectDelay > maxReconnectDelay) {
136 reconnectDelay = maxReconnectDelay;
137 }
138 }
139
140 if (LOG.isDebugEnabled()) {
141 LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay
142 + " ms, the current failure count is: " + failureCount);
143 }
144
145 recoveryTime = System.currentTimeMillis() + reconnectDelay;
146 return true;
147 }
148 return false;
149 }
150
151 /**
152 * @return true if this broker is marked failed and it is now the right
153 * time to start recovery.
154 */
155 public synchronized boolean doRecovery() {
156 if (!failed) {
157 return false;
158 }
159
160 // Are we done trying to recover this guy?
161 if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
162 if (LOG.isDebugEnabled()) {
163 LOG.debug("Max reconnect attempts of the " + service + " service has been reached.");
164 }
165 return false;
166 }
167
168 // Is it not yet time?
169 if (System.currentTimeMillis() < recoveryTime) {
170 return false;
171 }
172
173 if (LOG.isDebugEnabled()) {
174 LOG.debug("Resuming event advertisement of the " + service + " service.");
175 }
176 failed = false;
177 return true;
178 }
179
180 public boolean isFailed() {
181 return failed;
182 }
183 }
184
185 /**
186 * Set the discovery listener
187 *
188 * @param listener
189 */
190 public void setDiscoveryListener(DiscoveryListener listener) {
191 this.discoveryListener = listener;
192 }
193
194 /**
195 * register a service
196 */
197 public void registerService(String name) throws IOException {
198 this.selfService = name;
199 if (started.get()) {
200 doAdvertizeSelf();
201 }
202 }
203
204 /**
205 * @return Returns the loopBackMode.
206 */
207 public boolean isLoopBackMode() {
208 return loopBackMode;
209 }
210
211 /**
212 * @param loopBackMode The loopBackMode to set.
213 */
214 public void setLoopBackMode(boolean loopBackMode) {
215 this.loopBackMode = loopBackMode;
216 }
217
218 /**
219 * @return Returns the timeToLive.
220 */
221 public int getTimeToLive() {
222 return timeToLive;
223 }
224
225 /**
226 * @param timeToLive The timeToLive to set.
227 */
228 public void setTimeToLive(int timeToLive) {
229 this.timeToLive = timeToLive;
230 }
231
232 /**
233 * @return the discoveryURI
234 */
235 public URI getDiscoveryURI() {
236 return discoveryURI;
237 }
238
239 /**
240 * Set the discoveryURI
241 *
242 * @param discoveryURI
243 */
244 public void setDiscoveryURI(URI discoveryURI) {
245 this.discoveryURI = discoveryURI;
246 }
247
248 public long getKeepAliveInterval() {
249 return keepAliveInterval;
250 }
251
252 public void setKeepAliveInterval(long keepAliveInterval) {
253 this.keepAliveInterval = keepAliveInterval;
254 }
255
256 public void setInterface(String mcInterface) {
257 this.mcInterface = mcInterface;
258 }
259
260 public void setNetworkInterface(String mcNetworkInterface) {
261 this.mcNetworkInterface = mcNetworkInterface;
262 }
263
264 public void setJoinNetworkInterface(String mcJoinNetwrokInterface) {
265 this.mcJoinNetworkInterface = mcJoinNetwrokInterface;
266 }
267
268 /**
269 * start the discovery agent
270 *
271 * @throws Exception
272 */
273 public void start() throws Exception {
274
275 if (started.compareAndSet(false, true)) {
276
277 if (group == null || group.length() == 0) {
278 throw new IOException("You must specify a group to discover");
279 }
280 String type = getType();
281 if (!type.endsWith(".")) {
282 LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
283 type += ".";
284 }
285
286 if (discoveryURI == null) {
287 discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
288 }
289
290 if (LOG.isTraceEnabled())
291 LOG.trace("start - discoveryURI = " + discoveryURI);
292
293 String myHost = discoveryURI.getHost();
294 int myPort = discoveryURI.getPort();
295
296 if( DEFAULT_HOST_STR.equals(myHost) )
297 myHost = DEFAULT_HOST_IP;
298
299 if(myPort < 0 )
300 myPort = DEFAULT_PORT;
301
302 if (LOG.isTraceEnabled()) {
303 LOG.trace("start - myHost = " + myHost);
304 LOG.trace("start - myPort = " + myPort);
305 LOG.trace("start - group = " + group );
306 LOG.trace("start - interface = " + mcInterface );
307 LOG.trace("start - network interface = " + mcNetworkInterface );
308 LOG.trace("start - join network interface = " + mcJoinNetworkInterface );
309 }
310
311 this.inetAddress = InetAddress.getByName(myHost);
312 this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
313 mcast = new MulticastSocket(myPort);
314 mcast.setLoopbackMode(loopBackMode);
315 mcast.setTimeToLive(getTimeToLive());
316 if (mcJoinNetworkInterface != null) {
317 mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface));
318 }
319 else {
320 mcast.joinGroup(inetAddress);
321 }
322 mcast.setSoTimeout((int)keepAliveInterval);
323 if (mcInterface != null) {
324 mcast.setInterface(InetAddress.getByName(mcInterface));
325 }
326 if (mcNetworkInterface != null) {
327 mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
328 }
329 runner = new Thread(this);
330 runner.setName(this.toString() + ":" + runner.getName());
331 runner.setDaemon(true);
332 runner.start();
333 doAdvertizeSelf();
334 }
335 }
336
337 /**
338 * stop the channel
339 *
340 * @throws Exception
341 */
342 public void stop() throws Exception {
343 if (started.compareAndSet(true, false)) {
344 doAdvertizeSelf();
345 if (mcast != null) {
346 mcast.close();
347 }
348 if (runner != null) {
349 runner.interrupt();
350 }
351 getExecutor().shutdownNow();
352 }
353 }
354
355 public String getType() {
356 return group + "." + TYPE_SUFFIX;
357 }
358
359 public void run() {
360 byte[] buf = new byte[BUFF_SIZE];
361 DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
362 while (started.get()) {
363 doTimeKeepingServices();
364 try {
365 mcast.receive(packet);
366 if (packet.getLength() > 0) {
367 String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
368 processData(str);
369 }
370 } catch (SocketTimeoutException se) {
371 // ignore
372 } catch (IOException e) {
373 if (started.get()) {
374 LOG.error("failed to process packet: " + e);
375 }
376 }
377 }
378 }
379
380 private void processData(String str) {
381 if (discoveryListener != null) {
382 if (str.startsWith(getType())) {
383 String payload = str.substring(getType().length());
384 if (payload.startsWith(ALIVE)) {
385 String brokerName = getBrokerName(payload.substring(ALIVE.length()));
386 String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
387 processAlive(brokerName, service);
388 } else {
389 String brokerName = getBrokerName(payload.substring(DEAD.length()));
390 String service = payload.substring(DEAD.length() + brokerName.length() + 2);
391 processDead(service);
392 }
393 }
394 }
395 }
396
397 private void doTimeKeepingServices() {
398 if (started.get()) {
399 long currentTime = System.currentTimeMillis();
400 if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) {
401 doAdvertizeSelf();
402 lastAdvertizeTime = currentTime;
403 }
404 doExpireOldServices();
405 }
406 }
407
408 private void doAdvertizeSelf() {
409 if (selfService != null) {
410 String payload = getType();
411 payload += started.get() ? ALIVE : DEAD;
412 payload += DELIMITER + "localhost" + DELIMITER;
413 payload += selfService;
414 try {
415 byte[] data = payload.getBytes();
416 DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress);
417 mcast.send(packet);
418 } catch (IOException e) {
419 // If a send fails, chances are all subsequent sends will fail
420 // too.. No need to keep reporting the
421 // same error over and over.
422 if (reportAdvertizeFailed) {
423 reportAdvertizeFailed = false;
424 LOG.error("Failed to advertise our service: " + payload, e);
425 if ("Operation not permitted".equals(e.getMessage())) {
426 LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. "
427 + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
428 }
429 }
430 }
431 }
432 }
433
434 private void processAlive(String brokerName, String service) {
435 if (selfService == null || !service.equals(selfService)) {
436 RemoteBrokerData data = brokersByService.get(service);
437 if (data == null) {
438 data = new RemoteBrokerData(brokerName, service);
439 brokersByService.put(service, data);
440 fireServiceAddEvent(data);
441 doAdvertizeSelf();
442 } else {
443 data.updateHeartBeat();
444 if (data.doRecovery()) {
445 fireServiceAddEvent(data);
446 }
447 }
448 }
449 }
450
451 private void processDead(String service) {
452 if (!service.equals(selfService)) {
453 RemoteBrokerData data = brokersByService.remove(service);
454 if (data != null && !data.isFailed()) {
455 fireServiceRemovedEvent(data);
456 }
457 }
458 }
459
460 private void doExpireOldServices() {
461 long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
462 for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
463 RemoteBrokerData data = i.next();
464 if (data.getLastHeartBeat() < expireTime) {
465 processDead(data.service);
466 }
467 }
468 }
469
470 private String getBrokerName(String str) {
471 String result = null;
472 int start = str.indexOf(DELIMITER);
473 if (start >= 0) {
474 int end = str.indexOf(DELIMITER, start + 1);
475 result = str.substring(start + 1, end);
476 }
477 return result;
478 }
479
480 public void serviceFailed(DiscoveryEvent event) throws IOException {
481 RemoteBrokerData data = brokersByService.get(event.getServiceName());
482 if (data != null && data.markFailed()) {
483 fireServiceRemovedEvent(data);
484 }
485 }
486
487 private void fireServiceRemovedEvent(RemoteBrokerData data) {
488 if (discoveryListener != null && started.get()) {
489 final DiscoveryEvent event = new DiscoveryEvent(data.service);
490 event.setBrokerName(data.brokerName);
491
492 // Have the listener process the event async so that
493 // he does not block this thread since we are doing time sensitive
494 // processing of events.
495 getExecutor().execute(new Runnable() {
496 public void run() {
497 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
498 if (discoveryListener != null) {
499 discoveryListener.onServiceRemove(event);
500 }
501 }
502 });
503 }
504 }
505
506 private void fireServiceAddEvent(RemoteBrokerData data) {
507 if (discoveryListener != null && started.get()) {
508 final DiscoveryEvent event = new DiscoveryEvent(data.service);
509 event.setBrokerName(data.brokerName);
510
511 // Have the listener process the event async so that
512 // he does not block this thread since we are doing time sensitive
513 // processing of events.
514 getExecutor().execute(new Runnable() {
515 public void run() {
516 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
517 if (discoveryListener != null) {
518 discoveryListener.onServiceAdd(event);
519 }
520 }
521 });
522 }
523 }
524
525 private ExecutorService getExecutor() {
526 if (executor == null) {
527 final String threadName = "Notifier-" + this.toString();
528 executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
529 public Thread newThread(Runnable runable) {
530 Thread t = new Thread(runable, threadName);
531 t.setDaemon(true);
532 return t;
533 }
534 });
535 }
536 return executor;
537 }
538
539 public long getBackOffMultiplier() {
540 return backOffMultiplier;
541 }
542
543 public void setBackOffMultiplier(long backOffMultiplier) {
544 this.backOffMultiplier = backOffMultiplier;
545 }
546
547 public long getInitialReconnectDelay() {
548 return initialReconnectDelay;
549 }
550
551 public void setInitialReconnectDelay(long initialReconnectDelay) {
552 this.initialReconnectDelay = initialReconnectDelay;
553 }
554
555 public int getMaxReconnectAttempts() {
556 return maxReconnectAttempts;
557 }
558
559 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
560 this.maxReconnectAttempts = maxReconnectAttempts;
561 }
562
563 public long getMaxReconnectDelay() {
564 return maxReconnectDelay;
565 }
566
567 public void setMaxReconnectDelay(long maxReconnectDelay) {
568 this.maxReconnectDelay = maxReconnectDelay;
569 }
570
571 public boolean isUseExponentialBackOff() {
572 return useExponentialBackOff;
573 }
574
575 public void setUseExponentialBackOff(boolean useExponentialBackOff) {
576 this.useExponentialBackOff = useExponentialBackOff;
577 }
578
579 public void setGroup(String group) {
580 this.group = group;
581 }
582
583 @Override
584 public String toString() {
585 return "MulticastDiscoveryAgent-"
586 + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
587 }
588 }