001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.fanout;
018
019 import java.io.IOException;
020 import java.io.InterruptedIOException;
021 import java.net.URI;
022 import java.util.ArrayList;
023 import java.util.Iterator;
024 import java.util.concurrent.ConcurrentHashMap;
025 import java.util.concurrent.atomic.AtomicInteger;
026
027 import org.apache.activemq.command.Command;
028 import org.apache.activemq.command.ConsumerInfo;
029 import org.apache.activemq.command.Message;
030 import org.apache.activemq.command.RemoveInfo;
031 import org.apache.activemq.command.Response;
032 import org.apache.activemq.state.ConnectionStateTracker;
033 import org.apache.activemq.thread.DefaultThreadPools;
034 import org.apache.activemq.thread.Task;
035 import org.apache.activemq.thread.TaskRunner;
036 import org.apache.activemq.transport.CompositeTransport;
037 import org.apache.activemq.transport.DefaultTransportListener;
038 import org.apache.activemq.transport.FutureResponse;
039 import org.apache.activemq.transport.ResponseCallback;
040 import org.apache.activemq.transport.Transport;
041 import org.apache.activemq.transport.TransportFactory;
042 import org.apache.activemq.transport.TransportListener;
043 import org.apache.activemq.util.IOExceptionSupport;
044 import org.apache.activemq.util.ServiceStopper;
045 import org.apache.activemq.util.ServiceSupport;
046 import org.slf4j.Logger;
047 import org.slf4j.LoggerFactory;
048
049 /**
050 * A Transport that fans out a connection to multiple brokers.
051 *
052 *
053 */
054 public class FanoutTransport implements CompositeTransport {
055
056 private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class);
057
058 private TransportListener transportListener;
059 private boolean disposed;
060 private boolean connected;
061
062 private final Object reconnectMutex = new Object();
063 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
064 private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
065
066 private final TaskRunner reconnectTask;
067 private boolean started;
068
069 private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
070 private int connectedCount;
071
072 private int minAckCount = 2;
073
074 private long initialReconnectDelay = 10;
075 private long maxReconnectDelay = 1000 * 30;
076 private long backOffMultiplier = 2;
077 private final boolean useExponentialBackOff = true;
078 private int maxReconnectAttempts;
079 private Exception connectionFailure;
080 private FanoutTransportHandler primary;
081 private boolean fanOutQueues = false;
082
083 static class RequestCounter {
084
085 final Command command;
086 final AtomicInteger ackCount;
087
088 RequestCounter(Command command, int count) {
089 this.command = command;
090 this.ackCount = new AtomicInteger(count);
091 }
092
093 @Override
094 public String toString() {
095 return command.getCommandId() + "=" + ackCount.get();
096 }
097 }
098
099 class FanoutTransportHandler extends DefaultTransportListener {
100
101 private final URI uri;
102 private Transport transport;
103
104 private int connectFailures;
105 private long reconnectDelay = initialReconnectDelay;
106 private long reconnectDate;
107
108 public FanoutTransportHandler(URI uri) {
109 this.uri = uri;
110 }
111
112 @Override
113 public void onCommand(Object o) {
114 Command command = (Command)o;
115 if (command.isResponse()) {
116 Integer id = new Integer(((Response)command).getCorrelationId());
117 RequestCounter rc = requestMap.get(id);
118 if (rc != null) {
119 if (rc.ackCount.decrementAndGet() <= 0) {
120 requestMap.remove(id);
121 transportListenerOnCommand(command);
122 }
123 } else {
124 transportListenerOnCommand(command);
125 }
126 } else {
127 transportListenerOnCommand(command);
128 }
129 }
130
131 @Override
132 public void onException(IOException error) {
133 try {
134 synchronized (reconnectMutex) {
135 if (transport == null || !transport.isConnected()) {
136 return;
137 }
138
139 LOG.debug("Transport failed, starting up reconnect task", error);
140
141 ServiceSupport.dispose(transport);
142 transport = null;
143 connectedCount--;
144 if (primary == this) {
145 primary = null;
146 }
147 reconnectTask.wakeup();
148 }
149 } catch (InterruptedException e) {
150 Thread.currentThread().interrupt();
151 if (transportListener != null) {
152 transportListener.onException(new InterruptedIOException());
153 }
154 }
155 }
156 }
157
158 public FanoutTransport() throws InterruptedIOException {
159 // Setup a task that is used to reconnect the a connection async.
160 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
161 public boolean iterate() {
162 return doConnect();
163 }
164 }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
165 }
166
167 /**
168 * @return
169 */
170 private boolean doConnect() {
171 long closestReconnectDate = 0;
172 synchronized (reconnectMutex) {
173
174 if (disposed || connectionFailure != null) {
175 reconnectMutex.notifyAll();
176 }
177
178 if (transports.size() == connectedCount || disposed || connectionFailure != null) {
179 return false;
180 } else {
181
182 if (transports.isEmpty()) {
183 // connectionFailure = new IOException("No uris available to
184 // connect to.");
185 } else {
186
187 // Try to connect them up.
188 Iterator<FanoutTransportHandler> iter = transports.iterator();
189 for (int i = 0; iter.hasNext() && !disposed; i++) {
190
191 long now = System.currentTimeMillis();
192
193 FanoutTransportHandler fanoutHandler = iter.next();
194 if (fanoutHandler.transport != null) {
195 continue;
196 }
197
198 // Are we waiting a little to try to reconnect this one?
199 if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
200 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
201 closestReconnectDate = fanoutHandler.reconnectDate;
202 }
203 continue;
204 }
205
206 URI uri = fanoutHandler.uri;
207 try {
208 LOG.debug("Stopped: " + this);
209 LOG.debug("Attempting connect to: " + uri);
210 Transport t = TransportFactory.compositeConnect(uri);
211 fanoutHandler.transport = t;
212 t.setTransportListener(fanoutHandler);
213 if (started) {
214 restoreTransport(fanoutHandler);
215 }
216 LOG.debug("Connection established");
217 fanoutHandler.reconnectDelay = initialReconnectDelay;
218 fanoutHandler.connectFailures = 0;
219 if (primary == null) {
220 primary = fanoutHandler;
221 }
222 connectedCount++;
223 } catch (Exception e) {
224 LOG.debug("Connect fail to: " + uri + ", reason: " + e);
225
226 if( fanoutHandler.transport !=null ) {
227 ServiceSupport.dispose(fanoutHandler.transport);
228 fanoutHandler.transport=null;
229 }
230
231 if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
232 LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
233 connectionFailure = e;
234 reconnectMutex.notifyAll();
235 return false;
236 } else {
237
238 if (useExponentialBackOff) {
239 // Exponential increment of reconnect delay.
240 fanoutHandler.reconnectDelay *= backOffMultiplier;
241 if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
242 fanoutHandler.reconnectDelay = maxReconnectDelay;
243 }
244 }
245
246 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
247
248 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
249 closestReconnectDate = fanoutHandler.reconnectDate;
250 }
251 }
252 }
253 }
254 if (transports.size() == connectedCount || disposed) {
255 reconnectMutex.notifyAll();
256 return false;
257 }
258
259 }
260 }
261
262 }
263
264 try {
265 long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
266 if (reconnectDelay > 0) {
267 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
268 Thread.sleep(reconnectDelay);
269 }
270 } catch (InterruptedException e1) {
271 Thread.currentThread().interrupt();
272 }
273 return true;
274 }
275
276 public void start() throws Exception {
277 synchronized (reconnectMutex) {
278 LOG.debug("Started.");
279 if (started) {
280 return;
281 }
282 started = true;
283 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
284 FanoutTransportHandler th = iter.next();
285 if (th.transport != null) {
286 restoreTransport(th);
287 }
288 }
289 connected=true;
290 }
291 }
292
293 public void stop() throws Exception {
294 synchronized (reconnectMutex) {
295 ServiceStopper ss = new ServiceStopper();
296
297 if (!started) {
298 return;
299 }
300 started = false;
301 disposed = true;
302 connected=false;
303
304 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
305 FanoutTransportHandler th = iter.next();
306 if (th.transport != null) {
307 ss.stop(th.transport);
308 }
309 }
310
311 LOG.debug("Stopped: " + this);
312 ss.throwFirstException();
313 }
314 reconnectTask.shutdown();
315 }
316
317 public int getMinAckCount() {
318 return minAckCount;
319 }
320
321 public void setMinAckCount(int minAckCount) {
322 this.minAckCount = minAckCount;
323 }
324
325 public long getInitialReconnectDelay() {
326 return initialReconnectDelay;
327 }
328
329 public void setInitialReconnectDelay(long initialReconnectDelay) {
330 this.initialReconnectDelay = initialReconnectDelay;
331 }
332
333 public long getMaxReconnectDelay() {
334 return maxReconnectDelay;
335 }
336
337 public void setMaxReconnectDelay(long maxReconnectDelay) {
338 this.maxReconnectDelay = maxReconnectDelay;
339 }
340
341 public long getReconnectDelayExponent() {
342 return backOffMultiplier;
343 }
344
345 public void setReconnectDelayExponent(long reconnectDelayExponent) {
346 this.backOffMultiplier = reconnectDelayExponent;
347 }
348
349 public int getMaxReconnectAttempts() {
350 return maxReconnectAttempts;
351 }
352
353 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
354 this.maxReconnectAttempts = maxReconnectAttempts;
355 }
356
357 public void oneway(Object o) throws IOException {
358 final Command command = (Command)o;
359 try {
360 synchronized (reconnectMutex) {
361
362 // Wait for transport to be connected.
363 while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
364 LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
365 reconnectMutex.wait(1000);
366 }
367
368 // Still not fully connected.
369 if (connectedCount < minAckCount) {
370
371 Exception error;
372
373 // Throw the right kind of error..
374 if (disposed) {
375 error = new IOException("Transport disposed.");
376 } else if (connectionFailure != null) {
377 error = connectionFailure;
378 } else {
379 error = new IOException("Unexpected failure.");
380 }
381
382 if (error instanceof IOException) {
383 throw (IOException)error;
384 }
385 throw IOExceptionSupport.create(error);
386 }
387
388 // If it was a request and it was not being tracked by
389 // the state tracker,
390 // then hold it in the requestMap so that we can replay
391 // it later.
392 boolean fanout = isFanoutCommand(command);
393 if (stateTracker.track(command) == null && command.isResponseRequired()) {
394 int size = fanout ? minAckCount : 1;
395 requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
396 }
397
398 // Send the message.
399 if (fanout) {
400 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
401 FanoutTransportHandler th = iter.next();
402 if (th.transport != null) {
403 try {
404 th.transport.oneway(command);
405 } catch (IOException e) {
406 LOG.debug("Send attempt: failed.");
407 th.onException(e);
408 }
409 }
410 }
411 } else {
412 try {
413 primary.transport.oneway(command);
414 } catch (IOException e) {
415 LOG.debug("Send attempt: failed.");
416 primary.onException(e);
417 }
418 }
419
420 }
421 } catch (InterruptedException e) {
422 // Some one may be trying to stop our thread.
423 Thread.currentThread().interrupt();
424 throw new InterruptedIOException();
425 }
426 }
427
428 /**
429 * @param command
430 * @return
431 */
432 private boolean isFanoutCommand(Command command) {
433 if (command.isMessage()) {
434 if( fanOutQueues ) {
435 return true;
436 }
437 return ((Message)command).getDestination().isTopic();
438 }
439 if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
440 command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
441 return false;
442 }
443 return true;
444 }
445
446 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
447 throw new AssertionError("Unsupported Method");
448 }
449
450 public Object request(Object command) throws IOException {
451 throw new AssertionError("Unsupported Method");
452 }
453
454 public Object request(Object command, int timeout) throws IOException {
455 throw new AssertionError("Unsupported Method");
456 }
457
458 public void reconnect() {
459 LOG.debug("Waking up reconnect task");
460 try {
461 reconnectTask.wakeup();
462 } catch (InterruptedException e) {
463 Thread.currentThread().interrupt();
464 }
465 }
466
467 public TransportListener getTransportListener() {
468 return transportListener;
469 }
470
471 public void setTransportListener(TransportListener commandListener) {
472 this.transportListener = commandListener;
473 }
474
475 public <T> T narrow(Class<T> target) {
476
477 if (target.isAssignableFrom(getClass())) {
478 return target.cast(this);
479 }
480
481 synchronized (reconnectMutex) {
482 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
483 FanoutTransportHandler th = iter.next();
484 if (th.transport != null) {
485 T rc = th.transport.narrow(target);
486 if (rc != null) {
487 return rc;
488 }
489 }
490 }
491 }
492
493 return null;
494
495 }
496
497 protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
498 th.transport.start();
499 stateTracker.setRestoreConsumers(th.transport == primary);
500 stateTracker.restore(th.transport);
501 for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
502 RequestCounter rc = iter2.next();
503 th.transport.oneway(rc.command);
504 }
505 }
506
507 public void add(boolean reblance,URI uris[]) {
508
509 synchronized (reconnectMutex) {
510 for (int i = 0; i < uris.length; i++) {
511 URI uri = uris[i];
512
513 boolean match = false;
514 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
515 FanoutTransportHandler th = iter.next();
516 if (th.uri.equals(uri)) {
517 match = true;
518 break;
519 }
520 }
521 if (!match) {
522 FanoutTransportHandler th = new FanoutTransportHandler(uri);
523 transports.add(th);
524 reconnect();
525 }
526 }
527 }
528
529 }
530
531 public void remove(boolean rebalance,URI uris[]) {
532
533 synchronized (reconnectMutex) {
534 for (int i = 0; i < uris.length; i++) {
535 URI uri = uris[i];
536
537 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
538 FanoutTransportHandler th = iter.next();
539 if (th.uri.equals(uri)) {
540 if (th.transport != null) {
541 ServiceSupport.dispose(th.transport);
542 connectedCount--;
543 }
544 iter.remove();
545 break;
546 }
547 }
548 }
549 }
550
551 }
552
553 public void reconnect(URI uri) throws IOException {
554 add(true,new URI[]{uri});
555
556 }
557
558 public boolean isReconnectSupported() {
559 return true;
560 }
561
562 public boolean isUpdateURIsSupported() {
563 return true;
564 }
565 public void updateURIs(boolean reblance,URI[] uris) throws IOException {
566 add(reblance,uris);
567 }
568
569
570 public String getRemoteAddress() {
571 if (primary != null) {
572 if (primary.transport != null) {
573 return primary.transport.getRemoteAddress();
574 }
575 }
576 return null;
577 }
578
579 protected void transportListenerOnCommand(Command command) {
580 if (transportListener != null) {
581 transportListener.onCommand(command);
582 }
583 }
584
585 public boolean isFaultTolerant() {
586 return true;
587 }
588
589 public boolean isFanOutQueues() {
590 return fanOutQueues;
591 }
592
593 public void setFanOutQueues(boolean fanOutQueues) {
594 this.fanOutQueues = fanOutQueues;
595 }
596
597 public boolean isDisposed() {
598 return disposed;
599 }
600
601
602 public boolean isConnected() {
603 return connected;
604 }
605
606 public int getReceiveCounter() {
607 int rc = 0;
608 synchronized (reconnectMutex) {
609 for (FanoutTransportHandler th : transports) {
610 if (th.transport != null) {
611 rc += th.transport.getReceiveCounter();
612 }
613 }
614 }
615 return rc;
616 }
617 }