001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network.jms;
018
019 import java.util.Iterator;
020 import java.util.List;
021 import java.util.Map;
022 import java.util.concurrent.CopyOnWriteArrayList;
023 import java.util.concurrent.LinkedBlockingQueue;
024 import java.util.concurrent.ThreadFactory;
025 import java.util.concurrent.ThreadPoolExecutor;
026 import java.util.concurrent.TimeUnit;
027 import java.util.concurrent.atomic.AtomicBoolean;
028 import java.util.concurrent.atomic.AtomicReference;
029
030 import javax.jms.Connection;
031 import javax.jms.Destination;
032 import javax.jms.QueueConnection;
033
034 import org.apache.activemq.ActiveMQConnectionFactory;
035 import org.apache.activemq.Service;
036 import org.apache.activemq.broker.BrokerService;
037 import org.apache.activemq.util.LRUCache;
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040 import org.springframework.jndi.JndiTemplate;
041
042 /**
043 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
044 * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
045 * aimed to be in compliance with the JMS 1.0.2 specification.
046 */
047 public abstract class JmsConnector implements Service {
048
049 private static int nextId;
050 private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
051
052 protected JndiTemplate jndiLocalTemplate;
053 protected JndiTemplate jndiOutboundTemplate;
054 protected JmsMesageConvertor inboundMessageConvertor;
055 protected JmsMesageConvertor outboundMessageConvertor;
056 protected AtomicBoolean initialized = new AtomicBoolean(false);
057 protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
058 protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
059 protected AtomicBoolean started = new AtomicBoolean(false);
060 protected AtomicBoolean failed = new AtomicBoolean();
061 protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
062 protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
063 protected ActiveMQConnectionFactory embeddedConnectionFactory;
064 protected int replyToDestinationCacheSize = 10000;
065 protected String outboundUsername;
066 protected String outboundPassword;
067 protected String localUsername;
068 protected String localPassword;
069 protected String outboundClientId;
070 protected String localClientId;
071 protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
072
073 private ReconnectionPolicy policy = new ReconnectionPolicy();
074 protected ThreadPoolExecutor connectionSerivce;
075 private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
076 private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
077 private String name;
078
079 private static LRUCache<Destination, DestinationBridge> createLRUCache() {
080 return new LRUCache<Destination, DestinationBridge>() {
081 private static final long serialVersionUID = -7446792754185879286L;
082
083 protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
084 if (size() > maxCacheSize) {
085 Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
086 Map.Entry<Destination, DestinationBridge> lru = iter.next();
087 remove(lru.getKey());
088 DestinationBridge bridge = (DestinationBridge)lru.getValue();
089 try {
090 bridge.stop();
091 LOG.info("Expired bridge: " + bridge);
092 } catch (Exception e) {
093 LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
094 }
095 }
096 return false;
097 }
098 };
099 }
100
101 public boolean init() {
102 boolean result = initialized.compareAndSet(false, true);
103 if (result) {
104 if (jndiLocalTemplate == null) {
105 jndiLocalTemplate = new JndiTemplate();
106 }
107 if (jndiOutboundTemplate == null) {
108 jndiOutboundTemplate = new JndiTemplate();
109 }
110 if (inboundMessageConvertor == null) {
111 inboundMessageConvertor = new SimpleJmsMessageConvertor();
112 }
113 if (outboundMessageConvertor == null) {
114 outboundMessageConvertor = new SimpleJmsMessageConvertor();
115 }
116 replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
117
118 connectionSerivce = createExecutor();
119
120 // Subclasses can override this to customize their own it.
121 result = doConnectorInit();
122 }
123 return result;
124 }
125
126 protected boolean doConnectorInit() {
127
128 // We try to make a connection via a sync call first so that the
129 // JmsConnector is fully initialized before the start call returns
130 // in order to avoid missing any messages that are dispatched
131 // immediately after startup. If either side fails we queue an
132 // asynchronous task to manage the reconnect attempts.
133
134 try {
135 initializeLocalConnection();
136 localSideInitialized.set(true);
137 } catch(Exception e) {
138 // Queue up the task to attempt the local connection.
139 scheduleAsyncLocalConnectionReconnect();
140 }
141
142 try {
143 initializeForeignConnection();
144 foreignSideInitialized.set(true);
145 } catch(Exception e) {
146 // Queue up the task for the foreign connection now.
147 scheduleAsyncForeignConnectionReconnect();
148 }
149
150 return true;
151 }
152
153 public void start() throws Exception {
154 if (started.compareAndSet(false, true)) {
155 init();
156 for (DestinationBridge bridge : inboundBridges) {
157 bridge.start();
158 }
159 for (DestinationBridge bridge : outboundBridges) {
160 bridge.start();
161 }
162 LOG.info("JMS Connector " + getName() + " Started");
163 }
164 }
165
166 public void stop() throws Exception {
167 if (started.compareAndSet(true, false)) {
168
169 this.connectionSerivce.shutdown();
170
171 for (DestinationBridge bridge : inboundBridges) {
172 bridge.stop();
173 }
174 for (DestinationBridge bridge : outboundBridges) {
175 bridge.stop();
176 }
177 LOG.info("JMS Connector " + getName() + " Stopped");
178 }
179 }
180
181 public void clearBridges() {
182 inboundBridges.clear();
183 outboundBridges.clear();
184 replyToBridges.clear();
185 }
186
187 protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
188
189 /**
190 * One way to configure the local connection - this is called by The
191 * BrokerService when the Connector is embedded
192 *
193 * @param service
194 */
195 public void setBrokerService(BrokerService service) {
196 embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
197 }
198
199 public Connection getLocalConnection() {
200 return this.localConnection.get();
201 }
202
203 public Connection getForeignConnection() {
204 return this.foreignConnection.get();
205 }
206
207 /**
208 * @return Returns the jndiTemplate.
209 */
210 public JndiTemplate getJndiLocalTemplate() {
211 return jndiLocalTemplate;
212 }
213
214 /**
215 * @param jndiTemplate The jndiTemplate to set.
216 */
217 public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
218 this.jndiLocalTemplate = jndiTemplate;
219 }
220
221 /**
222 * @return Returns the jndiOutboundTemplate.
223 */
224 public JndiTemplate getJndiOutboundTemplate() {
225 return jndiOutboundTemplate;
226 }
227
228 /**
229 * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
230 */
231 public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
232 this.jndiOutboundTemplate = jndiOutboundTemplate;
233 }
234
235 /**
236 * @return Returns the inboundMessageConvertor.
237 */
238 public JmsMesageConvertor getInboundMessageConvertor() {
239 return inboundMessageConvertor;
240 }
241
242 /**
243 * @param inboundMessageConvertor The inboundMessageConvertor to set.
244 */
245 public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
246 this.inboundMessageConvertor = jmsMessageConvertor;
247 }
248
249 /**
250 * @return Returns the outboundMessageConvertor.
251 */
252 public JmsMesageConvertor getOutboundMessageConvertor() {
253 return outboundMessageConvertor;
254 }
255
256 /**
257 * @param outboundMessageConvertor The outboundMessageConvertor to set.
258 */
259 public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
260 this.outboundMessageConvertor = outboundMessageConvertor;
261 }
262
263 /**
264 * @return Returns the replyToDestinationCacheSize.
265 */
266 public int getReplyToDestinationCacheSize() {
267 return replyToDestinationCacheSize;
268 }
269
270 /**
271 * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
272 */
273 public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
274 this.replyToDestinationCacheSize = replyToDestinationCacheSize;
275 }
276
277 /**
278 * @return Returns the localPassword.
279 */
280 public String getLocalPassword() {
281 return localPassword;
282 }
283
284 /**
285 * @param localPassword The localPassword to set.
286 */
287 public void setLocalPassword(String localPassword) {
288 this.localPassword = localPassword;
289 }
290
291 /**
292 * @return Returns the localUsername.
293 */
294 public String getLocalUsername() {
295 return localUsername;
296 }
297
298 /**
299 * @param localUsername The localUsername to set.
300 */
301 public void setLocalUsername(String localUsername) {
302 this.localUsername = localUsername;
303 }
304
305 /**
306 * @return Returns the outboundPassword.
307 */
308 public String getOutboundPassword() {
309 return outboundPassword;
310 }
311
312 /**
313 * @param outboundPassword The outboundPassword to set.
314 */
315 public void setOutboundPassword(String outboundPassword) {
316 this.outboundPassword = outboundPassword;
317 }
318
319 /**
320 * @return Returns the outboundUsername.
321 */
322 public String getOutboundUsername() {
323 return outboundUsername;
324 }
325
326 /**
327 * @param outboundUsername The outboundUsername to set.
328 */
329 public void setOutboundUsername(String outboundUsername) {
330 this.outboundUsername = outboundUsername;
331 }
332
333 /**
334 * @return the outboundClientId
335 */
336 public String getOutboundClientId() {
337 return outboundClientId;
338 }
339
340 /**
341 * @param outboundClientId the outboundClientId to set
342 */
343 public void setOutboundClientId(String outboundClientId) {
344 this.outboundClientId = outboundClientId;
345 }
346
347 /**
348 * @return the localClientId
349 */
350 public String getLocalClientId() {
351 return localClientId;
352 }
353
354 /**
355 * @param localClientId the localClientId to set
356 */
357 public void setLocalClientId(String localClientId) {
358 this.localClientId = localClientId;
359 }
360
361 /**
362 * @return the currently configured reconnection policy.
363 */
364 public ReconnectionPolicy getReconnectionPolicy() {
365 return this.policy;
366 }
367
368 /**
369 * @param policy The new reconnection policy this {@link JmsConnector} should use.
370 */
371 public void setReconnectionPolicy(ReconnectionPolicy policy) {
372 this.policy = policy;
373 }
374
375 /**
376 * @return returns true if the {@link JmsConnector} is connected to both brokers.
377 */
378 public boolean isConnected() {
379 return localConnection.get() != null && foreignConnection.get() != null;
380 }
381
382 protected void addInboundBridge(DestinationBridge bridge) {
383 if (!inboundBridges.contains(bridge)) {
384 inboundBridges.add(bridge);
385 }
386 }
387
388 protected void addOutboundBridge(DestinationBridge bridge) {
389 if (!outboundBridges.contains(bridge)) {
390 outboundBridges.add(bridge);
391 }
392 }
393
394 protected void removeInboundBridge(DestinationBridge bridge) {
395 inboundBridges.remove(bridge);
396 }
397
398 protected void removeOutboundBridge(DestinationBridge bridge) {
399 outboundBridges.remove(bridge);
400 }
401
402 public String getName() {
403 if (name == null) {
404 name = "Connector:" + getNextId();
405 }
406 return name;
407 }
408
409 public void setName(String name) {
410 this.name = name;
411 }
412
413 private static synchronized int getNextId() {
414 return nextId++;
415 }
416
417 public boolean isFailed() {
418 return this.failed.get();
419 }
420
421 /**
422 * Performs the work of connection to the local side of the Connection.
423 * <p>
424 * This creates the initial connection to the local end of the {@link JmsConnector}
425 * and then sets up all the destination bridges with the information needed to bridge
426 * on the local side of the connection.
427 *
428 * @throws Exception if the connection cannot be established for any reason.
429 */
430 protected abstract void initializeLocalConnection() throws Exception;
431
432 /**
433 * Performs the work of connection to the foreign side of the Connection.
434 * <p>
435 * This creates the initial connection to the foreign end of the {@link JmsConnector}
436 * and then sets up all the destination bridges with the information needed to bridge
437 * on the foreign side of the connection.
438 *
439 * @throws Exception if the connection cannot be established for any reason.
440 */
441 protected abstract void initializeForeignConnection() throws Exception;
442
443 /**
444 * Callback method that the Destination bridges can use to report an exception to occurs
445 * during normal bridging operations.
446 *
447 * @param connection
448 * The connection that was in use when the failure occured.
449 */
450 void handleConnectionFailure(Connection connection) {
451
452 // Can happen if async exception listener kicks in at the same time.
453 if (connection == null || !this.started.get()) {
454 return;
455 }
456
457 LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]");
458
459 // TODO - How do we handle the re-wiring of replyToBridges in this case.
460 replyToBridges.clear();
461
462 if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) {
463
464 // Stop the inbound bridges when the foreign connection is dropped since
465 // the bridge has no consumer and needs to be restarted once a new connection
466 // to the foreign side is made.
467 for (DestinationBridge bridge : inboundBridges) {
468 try {
469 bridge.stop();
470 } catch(Exception e) {
471 }
472 }
473
474 // We got here first and cleared the connection, now we queue a reconnect.
475 this.connectionSerivce.execute(new Runnable() {
476
477 @Override
478 public void run() {
479 try {
480 doInitializeConnection(false);
481 } catch (Exception e) {
482 LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
483 }
484 }
485 });
486
487 } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) {
488
489 // Stop the outbound bridges when the local connection is dropped since
490 // the bridge has no consumer and needs to be restarted once a new connection
491 // to the local side is made.
492 for (DestinationBridge bridge : outboundBridges) {
493 try {
494 bridge.stop();
495 } catch(Exception e) {
496 }
497 }
498
499 // We got here first and cleared the connection, now we queue a reconnect.
500 this.connectionSerivce.execute(new Runnable() {
501
502 @Override
503 public void run() {
504 try {
505 doInitializeConnection(true);
506 } catch (Exception e) {
507 LOG.error("Failed to initialize local connection for the JMSConnector", e);
508 }
509 }
510 });
511 }
512 }
513
514 private void scheduleAsyncLocalConnectionReconnect() {
515 this.connectionSerivce.execute(new Runnable() {
516 @Override
517 public void run() {
518 try {
519 doInitializeConnection(true);
520 } catch (Exception e) {
521 LOG.error("Failed to initialize local connection for the JMSConnector", e);
522 }
523 }
524 });
525 }
526
527 private void scheduleAsyncForeignConnectionReconnect() {
528 this.connectionSerivce.execute(new Runnable() {
529 @Override
530 public void run() {
531 try {
532 doInitializeConnection(false);
533 } catch (Exception e) {
534 LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
535 }
536 }
537 });
538 }
539
540 private void doInitializeConnection(boolean local) throws Exception {
541
542 int attempt = 0;
543
544 final int maxRetries;
545 if (local) {
546 maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
547 policy.getMaxReconnectAttempts();
548 } else {
549 maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
550 policy.getMaxReconnectAttempts();
551 }
552
553 do
554 {
555 if (attempt > 0) {
556 try {
557 Thread.sleep(policy.getNextDelay(attempt));
558 } catch(InterruptedException e) {
559 }
560 }
561
562 if (connectionSerivce.isTerminating()) {
563 return;
564 }
565
566 try {
567
568 if (local) {
569 initializeLocalConnection();
570 localSideInitialized.set(true);
571 } else {
572 initializeForeignConnection();
573 foreignSideInitialized.set(true);
574 }
575
576 // Once we are connected we ensure all the bridges are started.
577 if (localConnection.get() != null && foreignConnection.get() != null) {
578 for (DestinationBridge bridge : inboundBridges) {
579 bridge.start();
580 }
581 for (DestinationBridge bridge : outboundBridges) {
582 bridge.start();
583 }
584 }
585
586 return;
587 } catch(Exception e) {
588 LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") +
589 " connection for JmsConnector [" + attempt + "]: " + e.getMessage());
590 }
591 }
592 while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
593
594 this.failed.set(true);
595 }
596
597 private ThreadFactory factory = new ThreadFactory() {
598 public Thread newThread(Runnable runnable) {
599 Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
600 thread.setDaemon(true);
601 return thread;
602 }
603 };
604
605 private ThreadPoolExecutor createExecutor() {
606 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
607 exec.allowCoreThreadTimeOut(true);
608 return exec;
609 }
610 }