001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.failover;
018
019 import java.io.BufferedReader;
020 import java.io.FileReader;
021 import java.io.IOException;
022 import java.io.InputStreamReader;
023 import java.io.InterruptedIOException;
024 import java.net.InetAddress;
025 import java.net.MalformedURLException;
026 import java.net.URI;
027 import java.net.URL;
028 import java.util.ArrayList;
029 import java.util.Collections;
030 import java.util.HashSet;
031 import java.util.Iterator;
032 import java.util.LinkedHashMap;
033 import java.util.List;
034 import java.util.Map;
035 import java.util.StringTokenizer;
036 import java.util.concurrent.CopyOnWriteArrayList;
037 import java.util.concurrent.atomic.AtomicReference;
038
039 import org.apache.activemq.broker.SslContext;
040 import org.apache.activemq.command.Command;
041 import org.apache.activemq.command.ConnectionControl;
042 import org.apache.activemq.command.ConnectionId;
043 import org.apache.activemq.command.RemoveInfo;
044 import org.apache.activemq.command.Response;
045 import org.apache.activemq.state.ConnectionStateTracker;
046 import org.apache.activemq.state.Tracked;
047 import org.apache.activemq.thread.DefaultThreadPools;
048 import org.apache.activemq.thread.Task;
049 import org.apache.activemq.thread.TaskRunner;
050 import org.apache.activemq.transport.CompositeTransport;
051 import org.apache.activemq.transport.DefaultTransportListener;
052 import org.apache.activemq.transport.FutureResponse;
053 import org.apache.activemq.transport.ResponseCallback;
054 import org.apache.activemq.transport.Transport;
055 import org.apache.activemq.transport.TransportFactory;
056 import org.apache.activemq.transport.TransportListener;
057 import org.apache.activemq.util.IOExceptionSupport;
058 import org.apache.activemq.util.ServiceSupport;
059 import org.slf4j.Logger;
060 import org.slf4j.LoggerFactory;
061
062 /**
063 * A Transport that is made reliable by being able to fail over to another
064 * transport when a transport failure is detected.
065 */
066 public class FailoverTransport implements CompositeTransport {
067
068 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
069 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
070 private static final int INFINITE = -1;
071 private TransportListener transportListener;
072 private boolean disposed;
073 private boolean connected;
074 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
075 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
076
077 private final Object reconnectMutex = new Object();
078 private final Object backupMutex = new Object();
079 private final Object sleepMutex = new Object();
080 private final Object listenerMutex = new Object();
081 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
082 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
083
084 private URI connectedTransportURI;
085 private URI failedConnectTransportURI;
086 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
087 private final TaskRunner reconnectTask;
088 private boolean started;
089 private boolean initialized;
090 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
091 private long maxReconnectDelay = 1000 * 30;
092 private double backOffMultiplier = 2d;
093 private long timeout = INFINITE;
094 private boolean useExponentialBackOff = true;
095 private boolean randomize = true;
096 private int maxReconnectAttempts = INFINITE;
097 private int startupMaxReconnectAttempts = INFINITE;
098 private int connectFailures;
099 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
100 private Exception connectionFailure;
101 private boolean firstConnection = true;
102 // optionally always have a backup created
103 private boolean backup = false;
104 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
105 private int backupPoolSize = 1;
106 private boolean trackMessages = false;
107 private boolean trackTransactionProducers = true;
108 private int maxCacheSize = 128 * 1024;
109 private final TransportListener disposedListener = new DefaultTransportListener() {
110 };
111 private final TransportListener myTransportListener = createTransportListener();
112 private boolean updateURIsSupported = true;
113 private boolean reconnectSupported = true;
114 // remember for reconnect thread
115 private SslContext brokerSslContext;
116 private String updateURIsURL = null;
117 private boolean rebalanceUpdateURIs = true;
118 private boolean doRebalance = false;
119 private boolean connectedToPriority = false;
120
121 private boolean priorityBackup = false;
122 private ArrayList<URI> priorityList = new ArrayList<URI>();
123 private boolean priorityBackupAvailable = false;
124
125 public FailoverTransport() throws InterruptedIOException {
126 brokerSslContext = SslContext.getCurrentSslContext();
127 stateTracker.setTrackTransactions(true);
128 // Setup a task that is used to reconnect the a connection async.
129 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
130 public boolean iterate() {
131 boolean result = false;
132 if (!started) {
133 return result;
134 }
135 boolean buildBackup = true;
136 synchronized (backupMutex) {
137 if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
138 result = doReconnect();
139 buildBackup = false;
140 connectedToPriority = isPriority(connectedTransportURI);
141 }
142 }
143 if (buildBackup) {
144 buildBackups();
145 if (priorityBackup && !connectedToPriority) {
146 try {
147 doDelay();
148 if (reconnectTask == null) {
149 return true;
150 }
151 reconnectTask.wakeup();
152 } catch (InterruptedException e) {
153 LOG.debug("Reconnect task has been interrupted.", e);
154 }
155 }
156 } else {
157 // build backups on the next iteration
158 buildBackup = true;
159 try {
160 if (reconnectTask == null) {
161 return true;
162 }
163 reconnectTask.wakeup();
164 } catch (InterruptedException e) {
165 LOG.debug("Reconnect task has been interrupted.", e);
166 }
167 }
168 return result;
169 }
170
171 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
172 }
173
174 TransportListener createTransportListener() {
175 return new TransportListener() {
176 public void onCommand(Object o) {
177 Command command = (Command) o;
178 if (command == null) {
179 return;
180 }
181 if (command.isResponse()) {
182 Object object = null;
183 synchronized (requestMap) {
184 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
185 }
186 if (object != null && object.getClass() == Tracked.class) {
187 ((Tracked) object).onResponses(command);
188 }
189 }
190 if (!initialized) {
191 initialized = true;
192 }
193
194 if (command.isConnectionControl()) {
195 handleConnectionControl((ConnectionControl) command);
196 }
197 if (transportListener != null) {
198 transportListener.onCommand(command);
199 }
200 }
201
202 public void onException(IOException error) {
203 try {
204 handleTransportFailure(error);
205 } catch (InterruptedException e) {
206 Thread.currentThread().interrupt();
207 transportListener.onException(new InterruptedIOException());
208 }
209 }
210
211 public void transportInterupted() {
212 if (transportListener != null) {
213 transportListener.transportInterupted();
214 }
215 }
216
217 public void transportResumed() {
218 if (transportListener != null) {
219 transportListener.transportResumed();
220 }
221 }
222 };
223 }
224
225 public final void disposeTransport(Transport transport) {
226 transport.setTransportListener(disposedListener);
227 ServiceSupport.dispose(transport);
228 }
229
230 public final void handleTransportFailure(IOException e) throws InterruptedException {
231 if (LOG.isTraceEnabled()) {
232 LOG.trace(this + " handleTransportFailure: " + e);
233 }
234 Transport transport = connectedTransport.getAndSet(null);
235 if (transport == null) {
236 // sync with possible in progress reconnect
237 synchronized (reconnectMutex) {
238 transport = connectedTransport.getAndSet(null);
239 }
240 }
241 if (transport != null) {
242
243 disposeTransport(transport);
244
245 boolean reconnectOk = false;
246 synchronized (reconnectMutex) {
247 if (canReconnect()) {
248 reconnectOk = true;
249 }
250 LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason: " + e
251 + (reconnectOk ? "," : ", not") +" attempting to automatically reconnect");
252
253 initialized = false;
254 failedConnectTransportURI = connectedTransportURI;
255 connectedTransportURI = null;
256 connected = false;
257
258 // notify before any reconnect attempt so ack state can be whacked
259 if (transportListener != null) {
260 transportListener.transportInterupted();
261 }
262
263 if (reconnectOk) {
264 updated.remove(failedConnectTransportURI);
265 reconnectTask.wakeup();
266 } else {
267 propagateFailureToExceptionListener(e);
268 }
269 }
270 }
271 }
272
273 private boolean canReconnect() {
274 return started && 0 != calculateReconnectAttemptLimit();
275 }
276
277 public final void handleConnectionControl(ConnectionControl control) {
278 String reconnectStr = control.getReconnectTo();
279 if (reconnectStr != null) {
280 reconnectStr = reconnectStr.trim();
281 if (reconnectStr.length() > 0) {
282 try {
283 URI uri = new URI(reconnectStr);
284 if (isReconnectSupported()) {
285 reconnect(uri);
286 LOG.info("Reconnected to: " + uri);
287 }
288 } catch (Exception e) {
289 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
290 }
291 }
292 }
293 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
294 }
295
296 private final void processNewTransports(boolean rebalance, String newTransports) {
297 if (newTransports != null) {
298 newTransports = newTransports.trim();
299 if (newTransports.length() > 0 && isUpdateURIsSupported()) {
300 List<URI> list = new ArrayList<URI>();
301 StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
302 while (tokenizer.hasMoreTokens()) {
303 String str = tokenizer.nextToken();
304 try {
305 URI uri = new URI(str);
306 list.add(uri);
307 } catch (Exception e) {
308 LOG.error("Failed to parse broker address: " + str, e);
309 }
310 }
311 if (list.isEmpty() == false) {
312 try {
313 updateURIs(rebalance, list.toArray(new URI[list.size()]));
314 } catch (IOException e) {
315 LOG.error("Failed to update transport URI's from: " + newTransports, e);
316 }
317 }
318 }
319 }
320 }
321
322 public void start() throws Exception {
323 synchronized (reconnectMutex) {
324 if (LOG.isDebugEnabled()) {
325 LOG.debug("Started " + this);
326 }
327 if (started) {
328 return;
329 }
330 started = true;
331 stateTracker.setMaxCacheSize(getMaxCacheSize());
332 stateTracker.setTrackMessages(isTrackMessages());
333 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
334 if (connectedTransport.get() != null) {
335 stateTracker.restore(connectedTransport.get());
336 } else {
337 reconnect(false);
338 }
339 }
340 }
341
342 public void stop() throws Exception {
343 Transport transportToStop = null;
344 synchronized (reconnectMutex) {
345 if (LOG.isDebugEnabled()) {
346 LOG.debug("Stopped " + this);
347 }
348 if (!started) {
349 return;
350 }
351 started = false;
352 disposed = true;
353 connected = false;
354 for (BackupTransport t : backups) {
355 t.setDisposed(true);
356 }
357 backups.clear();
358
359 if (connectedTransport.get() != null) {
360 transportToStop = connectedTransport.getAndSet(null);
361 }
362 reconnectMutex.notifyAll();
363 }
364 synchronized (sleepMutex) {
365 sleepMutex.notifyAll();
366 }
367 reconnectTask.shutdown();
368 if (transportToStop != null) {
369 transportToStop.stop();
370 }
371 }
372
373 public long getInitialReconnectDelay() {
374 return initialReconnectDelay;
375 }
376
377 public void setInitialReconnectDelay(long initialReconnectDelay) {
378 this.initialReconnectDelay = initialReconnectDelay;
379 }
380
381 public long getMaxReconnectDelay() {
382 return maxReconnectDelay;
383 }
384
385 public void setMaxReconnectDelay(long maxReconnectDelay) {
386 this.maxReconnectDelay = maxReconnectDelay;
387 }
388
389 public long getReconnectDelay() {
390 return reconnectDelay;
391 }
392
393 public void setReconnectDelay(long reconnectDelay) {
394 this.reconnectDelay = reconnectDelay;
395 }
396
397 public double getReconnectDelayExponent() {
398 return backOffMultiplier;
399 }
400
401 public void setReconnectDelayExponent(double reconnectDelayExponent) {
402 this.backOffMultiplier = reconnectDelayExponent;
403 }
404
405 public Transport getConnectedTransport() {
406 return connectedTransport.get();
407 }
408
409 public URI getConnectedTransportURI() {
410 return connectedTransportURI;
411 }
412
413 public int getMaxReconnectAttempts() {
414 return maxReconnectAttempts;
415 }
416
417 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
418 this.maxReconnectAttempts = maxReconnectAttempts;
419 }
420
421 public int getStartupMaxReconnectAttempts() {
422 return this.startupMaxReconnectAttempts;
423 }
424
425 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
426 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
427 }
428
429 public long getTimeout() {
430 return timeout;
431 }
432
433 public void setTimeout(long timeout) {
434 this.timeout = timeout;
435 }
436
437 /**
438 * @return Returns the randomize.
439 */
440 public boolean isRandomize() {
441 return randomize;
442 }
443
444 /**
445 * @param randomize The randomize to set.
446 */
447 public void setRandomize(boolean randomize) {
448 this.randomize = randomize;
449 }
450
451 public boolean isBackup() {
452 return backup;
453 }
454
455 public void setBackup(boolean backup) {
456 this.backup = backup;
457 }
458
459 public int getBackupPoolSize() {
460 return backupPoolSize;
461 }
462
463 public void setBackupPoolSize(int backupPoolSize) {
464 this.backupPoolSize = backupPoolSize;
465 }
466
467 public int getCurrentBackups() {
468 return this.backups.size();
469 }
470
471 public boolean isTrackMessages() {
472 return trackMessages;
473 }
474
475 public void setTrackMessages(boolean trackMessages) {
476 this.trackMessages = trackMessages;
477 }
478
479 public boolean isTrackTransactionProducers() {
480 return this.trackTransactionProducers;
481 }
482
483 public void setTrackTransactionProducers(boolean trackTransactionProducers) {
484 this.trackTransactionProducers = trackTransactionProducers;
485 }
486
487 public int getMaxCacheSize() {
488 return maxCacheSize;
489 }
490
491 public void setMaxCacheSize(int maxCacheSize) {
492 this.maxCacheSize = maxCacheSize;
493 }
494
495 public boolean isPriorityBackup() {
496 return priorityBackup;
497 }
498
499 public void setPriorityBackup(boolean priorityBackup) {
500 this.priorityBackup = priorityBackup;
501 }
502
503 public void setPriorityURIs(String priorityURIs) {
504 StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
505 while (tokenizer.hasMoreTokens()) {
506 String str = tokenizer.nextToken();
507 try {
508 URI uri = new URI(str);
509 priorityList.add(uri);
510 } catch (Exception e) {
511 LOG.error("Failed to parse broker address: " + str, e);
512 }
513 }
514 }
515
516 public void oneway(Object o) throws IOException {
517
518 Command command = (Command) o;
519 Exception error = null;
520 try {
521
522 synchronized (reconnectMutex) {
523
524 if (command != null && connectedTransport.get() == null) {
525 if (command.isShutdownInfo()) {
526 // Skipping send of ShutdownInfo command when not connected.
527 return;
528 } else if (command instanceof RemoveInfo || command.isMessageAck()) {
529 // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
530 stateTracker.track(command);
531 if (command.isResponseRequired()) {
532 Response response = new Response();
533 response.setCorrelationId(command.getCommandId());
534 myTransportListener.onCommand(response);
535 }
536 return;
537 }
538 }
539
540 // Keep trying until the message is sent.
541 for (int i = 0; !disposed; i++) {
542 try {
543
544 // Wait for transport to be connected.
545 Transport transport = connectedTransport.get();
546 long start = System.currentTimeMillis();
547 boolean timedout = false;
548 while (transport == null && !disposed && connectionFailure == null
549 && !Thread.currentThread().isInterrupted()) {
550 if (LOG.isTraceEnabled()) {
551 LOG.trace("Waiting for transport to reconnect..: " + command);
552 }
553 long end = System.currentTimeMillis();
554 if (timeout > 0 && (end - start > timeout)) {
555 timedout = true;
556 if (LOG.isInfoEnabled()) {
557 LOG.info("Failover timed out after " + (end - start) + "ms");
558 }
559 break;
560 }
561 try {
562 reconnectMutex.wait(100);
563 } catch (InterruptedException e) {
564 Thread.currentThread().interrupt();
565 if (LOG.isDebugEnabled()) {
566 LOG.debug("Interupted: " + e, e);
567 }
568 }
569 transport = connectedTransport.get();
570 }
571
572 if (transport == null) {
573 // Previous loop may have exited due to use being
574 // disposed.
575 if (disposed) {
576 error = new IOException("Transport disposed.");
577 } else if (connectionFailure != null) {
578 error = connectionFailure;
579 } else if (timedout == true) {
580 error = new IOException("Failover timeout of " + timeout + " ms reached.");
581 } else {
582 error = new IOException("Unexpected failure.");
583 }
584 break;
585 }
586
587 // If it was a request and it was not being tracked by
588 // the state tracker,
589 // then hold it in the requestMap so that we can replay
590 // it later.
591 Tracked tracked = stateTracker.track(command);
592 synchronized (requestMap) {
593 if (tracked != null && tracked.isWaitingForResponse()) {
594 requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
595 } else if (tracked == null && command.isResponseRequired()) {
596 requestMap.put(Integer.valueOf(command.getCommandId()), command);
597 }
598 }
599
600 // Send the message.
601 try {
602 transport.oneway(command);
603 stateTracker.trackBack(command);
604 } catch (IOException e) {
605
606 // If the command was not tracked.. we will retry in
607 // this method
608 if (tracked == null) {
609
610 // since we will retry in this method.. take it
611 // out of the request
612 // map so that it is not sent 2 times on
613 // recovery
614 if (command.isResponseRequired()) {
615 requestMap.remove(Integer.valueOf(command.getCommandId()));
616 }
617
618 // Rethrow the exception so it will handled by
619 // the outer catch
620 throw e;
621 } else {
622 // Handle the error but allow the method to return since the
623 // tracked commands are replayed on reconnect.
624 if (LOG.isDebugEnabled()) {
625 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
626 }
627 handleTransportFailure(e);
628 }
629 }
630
631 return;
632
633 } catch (IOException e) {
634 if (LOG.isDebugEnabled()) {
635 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
636 }
637 handleTransportFailure(e);
638 }
639 }
640 }
641 } catch (InterruptedException e) {
642 // Some one may be trying to stop our thread.
643 Thread.currentThread().interrupt();
644 throw new InterruptedIOException();
645 }
646
647 if (!disposed) {
648 if (error != null) {
649 if (error instanceof IOException) {
650 throw (IOException) error;
651 }
652 throw IOExceptionSupport.create(error);
653 }
654 }
655 }
656
657 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
658 throw new AssertionError("Unsupported Method");
659 }
660
661 public Object request(Object command) throws IOException {
662 throw new AssertionError("Unsupported Method");
663 }
664
665 public Object request(Object command, int timeout) throws IOException {
666 throw new AssertionError("Unsupported Method");
667 }
668
669 public void add(boolean rebalance, URI u[]) {
670 boolean newURI = false;
671 for (URI uri : u) {
672 if (!contains(uri)) {
673 uris.add(uri);
674 newURI = true;
675 }
676 }
677 if (newURI) {
678 reconnect(rebalance);
679 }
680 }
681
682 public void remove(boolean rebalance, URI u[]) {
683 for (URI uri : u) {
684 uris.remove(uri);
685 }
686 // rebalance is automatic if any connected to removed/stopped broker
687 }
688
689 public void add(boolean rebalance, String u) {
690 try {
691 URI newURI = new URI(u);
692 if (contains(newURI) == false) {
693 uris.add(newURI);
694 reconnect(rebalance);
695 }
696
697 } catch (Exception e) {
698 LOG.error("Failed to parse URI: " + u);
699 }
700 }
701
702 public void reconnect(boolean rebalance) {
703 synchronized (reconnectMutex) {
704 if (started) {
705 if (rebalance) {
706 doRebalance = true;
707 }
708 LOG.debug("Waking up reconnect task");
709 try {
710 reconnectTask.wakeup();
711 } catch (InterruptedException e) {
712 Thread.currentThread().interrupt();
713 }
714 } else {
715 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
716 }
717 }
718 }
719
720 private List<URI> getConnectList() {
721 if (!updated.isEmpty()) {
722 if (failedConnectTransportURI != null) {
723 boolean removed = updated.remove(failedConnectTransportURI);
724 if (removed) {
725 updated.add(failedConnectTransportURI);
726 }
727 }
728 return updated;
729 }
730 ArrayList<URI> l = new ArrayList<URI>(uris);
731 boolean removed = false;
732 if (failedConnectTransportURI != null) {
733 removed = l.remove(failedConnectTransportURI);
734 }
735 if (randomize) {
736 // Randomly, reorder the list by random swapping
737 for (int i = 0; i < l.size(); i++) {
738 int p = (int) (Math.random() * 100 % l.size());
739 URI t = l.get(p);
740 l.set(p, l.get(i));
741 l.set(i, t);
742 }
743 }
744 if (removed) {
745 l.add(failedConnectTransportURI);
746 }
747 if (LOG.isDebugEnabled()) {
748 LOG.debug("urlList connectionList:" + l + ", from: " + uris);
749 }
750 return l;
751 }
752
753 public TransportListener getTransportListener() {
754 return transportListener;
755 }
756
757 public void setTransportListener(TransportListener commandListener) {
758 synchronized (listenerMutex) {
759 this.transportListener = commandListener;
760 listenerMutex.notifyAll();
761 }
762 }
763
764 public <T> T narrow(Class<T> target) {
765
766 if (target.isAssignableFrom(getClass())) {
767 return target.cast(this);
768 }
769 Transport transport = connectedTransport.get();
770 if (transport != null) {
771 return transport.narrow(target);
772 }
773 return null;
774
775 }
776
777 protected void restoreTransport(Transport t) throws Exception, IOException {
778 t.start();
779 // send information to the broker - informing it we are an ft client
780 ConnectionControl cc = new ConnectionControl();
781 cc.setFaultTolerant(true);
782 t.oneway(cc);
783 stateTracker.restore(t);
784 Map<Integer, Command> tmpMap = null;
785 synchronized (requestMap) {
786 tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
787 }
788 for (Command command : tmpMap.values()) {
789 if (LOG.isTraceEnabled()) {
790 LOG.trace("restore requestMap, replay: " + command);
791 }
792 t.oneway(command);
793 }
794 }
795
796 public boolean isUseExponentialBackOff() {
797 return useExponentialBackOff;
798 }
799
800 public void setUseExponentialBackOff(boolean useExponentialBackOff) {
801 this.useExponentialBackOff = useExponentialBackOff;
802 }
803
804 @Override
805 public String toString() {
806 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
807 }
808
809 public String getRemoteAddress() {
810 Transport transport = connectedTransport.get();
811 if (transport != null) {
812 return transport.getRemoteAddress();
813 }
814 return null;
815 }
816
817 public boolean isFaultTolerant() {
818 return true;
819 }
820
821 private void doUpdateURIsFromDisk() {
822 // If updateURIsURL is specified, read the file and add any new
823 // transport URI's to this FailOverTransport.
824 // Note: Could track file timestamp to avoid unnecessary reading.
825 String fileURL = getUpdateURIsURL();
826 if (fileURL != null) {
827 BufferedReader in = null;
828 String newUris = null;
829 StringBuffer buffer = new StringBuffer();
830
831 try {
832 in = new BufferedReader(getURLStream(fileURL));
833 while (true) {
834 String line = in.readLine();
835 if (line == null) {
836 break;
837 }
838 buffer.append(line);
839 }
840 newUris = buffer.toString();
841 } catch (IOException ioe) {
842 LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
843 } finally {
844 if (in != null) {
845 try {
846 in.close();
847 } catch (IOException ioe) {
848 // ignore
849 }
850 }
851 }
852
853 processNewTransports(isRebalanceUpdateURIs(), newUris);
854 }
855 }
856
857 final boolean doReconnect() {
858 Exception failure = null;
859 synchronized (reconnectMutex) {
860
861 // First ensure we are up to date.
862 doUpdateURIsFromDisk();
863
864 if (disposed || connectionFailure != null) {
865 reconnectMutex.notifyAll();
866 }
867 if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
868 return false;
869 } else {
870 List<URI> connectList = getConnectList();
871 if (connectList.isEmpty()) {
872 failure = new IOException("No uris available to connect to.");
873 } else {
874 if (doRebalance) {
875 if (connectList.get(0).equals(connectedTransportURI)) {
876 // already connected to first in the list, no need to rebalance
877 doRebalance = false;
878 return false;
879 } else {
880 if (LOG.isDebugEnabled()) {
881 LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
882 }
883 try {
884 Transport transport = this.connectedTransport.getAndSet(null);
885 if (transport != null) {
886 disposeTransport(transport);
887 }
888 } catch (Exception e) {
889 if (LOG.isDebugEnabled()) {
890 LOG.debug("Caught an exception stopping existing transport for rebalance", e);
891 }
892 }
893 }
894 doRebalance = false;
895 }
896
897 resetReconnectDelay();
898
899 Transport transport = null;
900 URI uri = null;
901
902 // If we have a backup already waiting lets try it.
903 synchronized (backupMutex) {
904 if ((priorityBackup || backup) && !backups.isEmpty()) {
905 ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
906 if (randomize) {
907 Collections.shuffle(l);
908 }
909 BackupTransport bt = l.remove(0);
910 backups.remove(bt);
911 transport = bt.getTransport();
912 uri = bt.getUri();
913 if (priorityBackup && priorityBackupAvailable) {
914 Transport old = this.connectedTransport.getAndSet(null);
915 if (transport != null) {
916 disposeTransport(old);
917 }
918 priorityBackupAvailable = false;
919 }
920 }
921 }
922
923 // Sleep for the reconnectDelay if there's no backup and we aren't trying
924 // for the first time, or we were disposed for some reason.
925 if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
926 synchronized (sleepMutex) {
927 if (LOG.isDebugEnabled()) {
928 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
929 }
930 try {
931 sleepMutex.wait(reconnectDelay);
932 } catch (InterruptedException e) {
933 Thread.currentThread().interrupt();
934 }
935 }
936 }
937
938 Iterator<URI> iter = connectList.iterator();
939 while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
940
941 try {
942 SslContext.setCurrentSslContext(brokerSslContext);
943
944 // We could be starting with a backup and if so we wait to grab a
945 // URI from the pool until next time around.
946 if (transport == null) {
947 uri = iter.next();
948 transport = TransportFactory.compositeConnect(uri);
949 }
950
951 if (LOG.isDebugEnabled()) {
952 LOG.debug("Attempting " + connectFailures + "th connect to: " + uri);
953 }
954 transport.setTransportListener(myTransportListener);
955 transport.start();
956
957 if (started && !firstConnection) {
958 restoreTransport(transport);
959 }
960
961 if (LOG.isDebugEnabled()) {
962 LOG.debug("Connection established");
963 }
964 reconnectDelay = initialReconnectDelay;
965 connectedTransportURI = uri;
966 connectedTransport.set(transport);
967 reconnectMutex.notifyAll();
968 connectFailures = 0;
969
970 // Make sure on initial startup, that the transportListener
971 // has been initialized for this instance.
972 synchronized (listenerMutex) {
973 if (transportListener == null) {
974 try {
975 // if it isn't set after 2secs - it probably never will be
976 listenerMutex.wait(2000);
977 } catch (InterruptedException ex) {
978 }
979 }
980 }
981
982 if (transportListener != null) {
983 transportListener.transportResumed();
984 } else {
985 if (LOG.isDebugEnabled()) {
986 LOG.debug("transport resumed by transport listener not set");
987 }
988 }
989
990 if (firstConnection) {
991 firstConnection = false;
992 LOG.info("Successfully connected to " + uri);
993 } else {
994 LOG.info("Successfully reconnected to " + uri);
995 }
996
997 connected = true;
998 return false;
999 } catch (Exception e) {
1000 failure = e;
1001 if (LOG.isDebugEnabled()) {
1002 LOG.debug("Connect fail to: " + uri + ", reason: " + e);
1003 }
1004 if (transport != null) {
1005 try {
1006 transport.stop();
1007 transport = null;
1008 } catch (Exception ee) {
1009 if (LOG.isDebugEnabled()) {
1010 LOG.debug("Stop of failed transport: " + transport +
1011 " failed with reason: " + ee);
1012 }
1013 }
1014 }
1015 } finally {
1016 SslContext.setCurrentSslContext(null);
1017 }
1018 }
1019 }
1020 }
1021
1022 int reconnectLimit = calculateReconnectAttemptLimit();
1023
1024 connectFailures++;
1025 if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1026 LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
1027 connectionFailure = failure;
1028
1029 // Make sure on initial startup, that the transportListener has been
1030 // initialized for this instance.
1031 synchronized (listenerMutex) {
1032 if (transportListener == null) {
1033 try {
1034 listenerMutex.wait(2000);
1035 } catch (InterruptedException ex) {
1036 }
1037 }
1038 }
1039
1040 propagateFailureToExceptionListener(connectionFailure);
1041 return false;
1042 }
1043 }
1044
1045 if (!disposed) {
1046 doDelay();
1047 }
1048
1049 return !disposed;
1050 }
1051
1052 private void doDelay() {
1053 if (reconnectDelay > 0) {
1054 synchronized (sleepMutex) {
1055 if (LOG.isDebugEnabled()) {
1056 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
1057 }
1058 try {
1059 sleepMutex.wait(reconnectDelay);
1060 } catch (InterruptedException e) {
1061 Thread.currentThread().interrupt();
1062 }
1063 }
1064 }
1065
1066 if (useExponentialBackOff) {
1067 // Exponential increment of reconnect delay.
1068 reconnectDelay *= backOffMultiplier;
1069 if (reconnectDelay > maxReconnectDelay) {
1070 reconnectDelay = maxReconnectDelay;
1071 }
1072 }
1073 }
1074
1075 private void resetReconnectDelay() {
1076 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1077 reconnectDelay = initialReconnectDelay;
1078 }
1079 }
1080
1081 /*
1082 * called with reconnectMutex held
1083 */
1084 private void propagateFailureToExceptionListener(Exception exception) {
1085 if (transportListener != null) {
1086 if (exception instanceof IOException) {
1087 transportListener.onException((IOException)exception);
1088 } else {
1089 transportListener.onException(IOExceptionSupport.create(exception));
1090 }
1091 }
1092 reconnectMutex.notifyAll();
1093 }
1094
1095 private int calculateReconnectAttemptLimit() {
1096 int maxReconnectValue = this.maxReconnectAttempts;
1097 if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1098 maxReconnectValue = this.startupMaxReconnectAttempts;
1099 }
1100 return maxReconnectValue;
1101 }
1102
1103 final boolean buildBackups() {
1104 synchronized (backupMutex) {
1105 if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) {
1106 ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1107 List<URI> connectList = getConnectList();
1108 for (URI uri: connectList) {
1109 if (!backupList.contains(uri)) {
1110 backupList.add(uri);
1111 }
1112 }
1113 // removed disposed backups
1114 List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1115 for (BackupTransport bt : backups) {
1116 if (bt.isDisposed()) {
1117 disposedList.add(bt);
1118 }
1119 }
1120 backups.removeAll(disposedList);
1121 disposedList.clear();
1122 for (Iterator<URI> iter = backupList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
1123 URI uri = iter.next();
1124 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1125 try {
1126 SslContext.setCurrentSslContext(brokerSslContext);
1127 BackupTransport bt = new BackupTransport(this);
1128 bt.setUri(uri);
1129 if (!backups.contains(bt)) {
1130 Transport t = TransportFactory.compositeConnect(uri);
1131 t.setTransportListener(bt);
1132 t.start();
1133 bt.setTransport(t);
1134 backups.add(bt);
1135 if (priorityBackup && isPriority(uri)) {
1136 priorityBackupAvailable = true;
1137 }
1138 }
1139 } catch (Exception e) {
1140 LOG.debug("Failed to build backup ", e);
1141 } finally {
1142 SslContext.setCurrentSslContext(null);
1143 }
1144 }
1145 }
1146 }
1147 }
1148 return false;
1149 }
1150
1151 protected boolean isPriority(URI uri) {
1152 if (!priorityList.isEmpty()) {
1153 return priorityList.contains(uri);
1154 }
1155 return uris.indexOf(uri) == 0;
1156 }
1157
1158 public boolean isDisposed() {
1159 return disposed;
1160 }
1161
1162 public boolean isConnected() {
1163 return connected;
1164 }
1165
1166 public void reconnect(URI uri) throws IOException {
1167 add(true, new URI[]{uri});
1168 }
1169
1170 public boolean isReconnectSupported() {
1171 return this.reconnectSupported;
1172 }
1173
1174 public void setReconnectSupported(boolean value) {
1175 this.reconnectSupported = value;
1176 }
1177
1178 public boolean isUpdateURIsSupported() {
1179 return this.updateURIsSupported;
1180 }
1181
1182 public void setUpdateURIsSupported(boolean value) {
1183 this.updateURIsSupported = value;
1184 }
1185
1186 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1187 if (isUpdateURIsSupported()) {
1188 HashSet<URI> copy = new HashSet<URI>(this.updated);
1189 updated.clear();
1190 if (updatedURIs != null && updatedURIs.length > 0) {
1191 for (URI uri : updatedURIs) {
1192 if (uri != null && !updated.contains(uri)) {
1193 updated.add(uri);
1194 }
1195 }
1196 if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1197 buildBackups();
1198 synchronized (reconnectMutex) {
1199 reconnect(rebalance);
1200 }
1201 }
1202 }
1203 }
1204 }
1205
1206 /**
1207 * @return the updateURIsURL
1208 */
1209 public String getUpdateURIsURL() {
1210 return this.updateURIsURL;
1211 }
1212
1213 /**
1214 * @param updateURIsURL the updateURIsURL to set
1215 */
1216 public void setUpdateURIsURL(String updateURIsURL) {
1217 this.updateURIsURL = updateURIsURL;
1218 }
1219
1220 /**
1221 * @return the rebalanceUpdateURIs
1222 */
1223 public boolean isRebalanceUpdateURIs() {
1224 return this.rebalanceUpdateURIs;
1225 }
1226
1227 /**
1228 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1229 */
1230 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1231 this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1232 }
1233
1234 public int getReceiveCounter() {
1235 Transport transport = connectedTransport.get();
1236 if (transport == null) {
1237 return 0;
1238 }
1239 return transport.getReceiveCounter();
1240 }
1241
1242 public int getConnectFailures() {
1243 return connectFailures;
1244 }
1245
1246 public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1247 synchronized (reconnectMutex) {
1248 stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1249 }
1250 }
1251
1252 public ConnectionStateTracker getStateTracker() {
1253 return stateTracker;
1254 }
1255
1256 private boolean contains(URI newURI) {
1257 boolean result = false;
1258 for (URI uri : uris) {
1259 if (newURI.getPort() == uri.getPort()) {
1260 InetAddress newAddr = null;
1261 InetAddress addr = null;
1262 try {
1263 newAddr = InetAddress.getByName(newURI.getHost());
1264 addr = InetAddress.getByName(uri.getHost());
1265 } catch(IOException e) {
1266
1267 if (newAddr == null) {
1268 LOG.error("Failed to Lookup INetAddress for URI[ " + newURI + " ] : " + e);
1269 } else {
1270 LOG.error("Failed to Lookup INetAddress for URI[ " + uri + " ] : " + e);
1271 }
1272
1273 if (newURI.getHost().equalsIgnoreCase(uri.getHost())) {
1274 result = true;
1275 break;
1276 } else {
1277 continue;
1278 }
1279 }
1280
1281 if (addr.equals(newAddr)) {
1282 result = true;
1283 break;
1284 }
1285 }
1286 }
1287
1288 return result;
1289 }
1290
1291 private InputStreamReader getURLStream(String path) throws IOException {
1292 InputStreamReader result = null;
1293 URL url = null;
1294 try {
1295 url = new URL(path);
1296 result = new InputStreamReader(url.openStream());
1297 } catch (MalformedURLException e) {
1298 // ignore - it could be a path to a a local file
1299 }
1300 if (result == null) {
1301 result = new FileReader(path);
1302 }
1303 return result;
1304 }
1305 }