001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network.jms;
018
019 import javax.jms.Connection;
020 import javax.jms.Destination;
021 import javax.jms.ExceptionListener;
022 import javax.jms.JMSException;
023 import javax.jms.Queue;
024 import javax.jms.QueueConnection;
025 import javax.jms.QueueConnectionFactory;
026 import javax.jms.QueueSession;
027 import javax.jms.Session;
028 import javax.naming.NamingException;
029
030 import org.slf4j.Logger;
031 import org.slf4j.LoggerFactory;
032
033 /**
034 * A Bridge to other JMS Queue providers
035 *
036 * @org.apache.xbean.XBean
037 */
038 public class JmsQueueConnector extends JmsConnector {
039 private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class);
040 private String outboundQueueConnectionFactoryName;
041 private String localConnectionFactoryName;
042 private QueueConnectionFactory outboundQueueConnectionFactory;
043 private QueueConnectionFactory localQueueConnectionFactory;
044 private InboundQueueBridge[] inboundQueueBridges;
045 private OutboundQueueBridge[] outboundQueueBridges;
046
047 /**
048 * @return Returns the inboundQueueBridges.
049 */
050 public InboundQueueBridge[] getInboundQueueBridges() {
051 return inboundQueueBridges;
052 }
053
054 /**
055 * @param inboundQueueBridges The inboundQueueBridges to set.
056 */
057 public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
058 this.inboundQueueBridges = inboundQueueBridges;
059 }
060
061 /**
062 * @return Returns the outboundQueueBridges.
063 */
064 public OutboundQueueBridge[] getOutboundQueueBridges() {
065 return outboundQueueBridges;
066 }
067
068 /**
069 * @param outboundQueueBridges The outboundQueueBridges to set.
070 */
071 public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
072 this.outboundQueueBridges = outboundQueueBridges;
073 }
074
075 /**
076 * @return Returns the localQueueConnectionFactory.
077 */
078 public QueueConnectionFactory getLocalQueueConnectionFactory() {
079 return localQueueConnectionFactory;
080 }
081
082 /**
083 * @param localQueueConnectionFactory The localQueueConnectionFactory to
084 * set.
085 */
086 public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
087 this.localQueueConnectionFactory = localConnectionFactory;
088 }
089
090 /**
091 * @return Returns the outboundQueueConnectionFactory.
092 */
093 public QueueConnectionFactory getOutboundQueueConnectionFactory() {
094 return outboundQueueConnectionFactory;
095 }
096
097 /**
098 * @return Returns the outboundQueueConnectionFactoryName.
099 */
100 public String getOutboundQueueConnectionFactoryName() {
101 return outboundQueueConnectionFactoryName;
102 }
103
104 /**
105 * @param outboundQueueConnectionFactoryName The
106 * outboundQueueConnectionFactoryName to set.
107 */
108 public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
109 this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
110 }
111
112 /**
113 * @return Returns the localConnectionFactoryName.
114 */
115 public String getLocalConnectionFactoryName() {
116 return localConnectionFactoryName;
117 }
118
119 /**
120 * @param localConnectionFactoryName The localConnectionFactoryName to set.
121 */
122 public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
123 this.localConnectionFactoryName = localConnectionFactoryName;
124 }
125
126 /**
127 * @return Returns the localQueueConnection.
128 */
129 public QueueConnection getLocalQueueConnection() {
130 return (QueueConnection) localConnection.get();
131 }
132
133 /**
134 * @param localQueueConnection The localQueueConnection to set.
135 */
136 public void setLocalQueueConnection(QueueConnection localQueueConnection) {
137 this.localConnection.set(localQueueConnection);
138 }
139
140 /**
141 * @return Returns the outboundQueueConnection.
142 */
143 public QueueConnection getOutboundQueueConnection() {
144 return (QueueConnection) foreignConnection.get();
145 }
146
147 /**
148 * @param outboundQueueConnection The outboundQueueConnection to set.
149 */
150 public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
151 this.foreignConnection.set(foreignQueueConnection);
152 }
153
154 /**
155 * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
156 * to set.
157 */
158 public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
159 this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
160 }
161
162 @Override
163 protected void initializeForeignConnection() throws NamingException, JMSException {
164
165 final QueueConnection newConnection;
166
167 if (foreignConnection.get() == null) {
168 // get the connection factories
169 if (outboundQueueConnectionFactory == null) {
170 // look it up from JNDI
171 if (outboundQueueConnectionFactoryName != null) {
172 outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
173 .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
174 if (outboundUsername != null) {
175 newConnection = outboundQueueConnectionFactory
176 .createQueueConnection(outboundUsername, outboundPassword);
177 } else {
178 newConnection = outboundQueueConnectionFactory.createQueueConnection();
179 }
180 } else {
181 throw new JMSException("Cannot create foreignConnection - no information");
182 }
183 } else {
184 if (outboundUsername != null) {
185 newConnection = outboundQueueConnectionFactory
186 .createQueueConnection(outboundUsername, outboundPassword);
187 } else {
188 newConnection = outboundQueueConnectionFactory.createQueueConnection();
189 }
190 }
191 } else {
192 // Clear if for now in case something goes wrong during the init.
193 newConnection = (QueueConnection) foreignConnection.getAndSet(null);
194 }
195
196 if (outboundClientId != null && outboundClientId.length() > 0) {
197 newConnection.setClientID(getOutboundClientId());
198 }
199 newConnection.start();
200
201 outboundMessageConvertor.setConnection(newConnection);
202
203 // Configure the bridges with the new Outbound connection.
204 initializeInboundDestinationBridgesOutboundSide(newConnection);
205 initializeOutboundDestinationBridgesOutboundSide(newConnection);
206
207 // Register for any async error notifications now so we can reset in the
208 // case where there's not a lot of activity and a connection drops.
209 newConnection.setExceptionListener(new ExceptionListener() {
210 @Override
211 public void onException(JMSException exception) {
212 handleConnectionFailure(newConnection);
213 }
214 });
215
216 // At this point all looks good, so this our current connection now.
217 foreignConnection.set(newConnection);
218 }
219
220 @Override
221 protected void initializeLocalConnection() throws NamingException, JMSException {
222
223 final QueueConnection newConnection;
224
225 if (localConnection.get() == null) {
226 // get the connection factories
227 if (localQueueConnectionFactory == null) {
228 if (embeddedConnectionFactory == null) {
229 // look it up from JNDI
230 if (localConnectionFactoryName != null) {
231 localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
232 .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
233 if (localUsername != null) {
234 newConnection = localQueueConnectionFactory
235 .createQueueConnection(localUsername, localPassword);
236 } else {
237 newConnection = localQueueConnectionFactory.createQueueConnection();
238 }
239 } else {
240 throw new JMSException("Cannot create localConnection - no information");
241 }
242 } else {
243 newConnection = embeddedConnectionFactory.createQueueConnection();
244 }
245 } else {
246 if (localUsername != null) {
247 newConnection = localQueueConnectionFactory.
248 createQueueConnection(localUsername, localPassword);
249 } else {
250 newConnection = localQueueConnectionFactory.createQueueConnection();
251 }
252 }
253
254 } else {
255 // Clear if for now in case something goes wrong during the init.
256 newConnection = (QueueConnection) localConnection.getAndSet(null);
257 }
258
259 if (localClientId != null && localClientId.length() > 0) {
260 newConnection.setClientID(getLocalClientId());
261 }
262 newConnection.start();
263
264 inboundMessageConvertor.setConnection(newConnection);
265
266 // Configure the bridges with the new Local connection.
267 initializeInboundDestinationBridgesLocalSide(newConnection);
268 initializeOutboundDestinationBridgesLocalSide(newConnection);
269
270 // Register for any async error notifications now so we can reset in the
271 // case where there's not a lot of activity and a connection drops.
272 newConnection.setExceptionListener(new ExceptionListener() {
273 @Override
274 public void onException(JMSException exception) {
275 handleConnectionFailure(newConnection);
276 }
277 });
278
279 // At this point all looks good, so this our current connection now.
280 localConnection.set(newConnection);
281 }
282
283 protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
284 if (inboundQueueBridges != null) {
285 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
286
287 for (InboundQueueBridge bridge : inboundQueueBridges) {
288 String queueName = bridge.getInboundQueueName();
289 Queue foreignQueue = createForeignQueue(outboundSession, queueName);
290 bridge.setConsumer(null);
291 bridge.setConsumerQueue(foreignQueue);
292 bridge.setConsumerConnection(connection);
293 bridge.setJmsConnector(this);
294 addInboundBridge(bridge);
295 }
296 outboundSession.close();
297 }
298 }
299
300 protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
301 if (inboundQueueBridges != null) {
302 QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
303
304 for (InboundQueueBridge bridge : inboundQueueBridges) {
305 String localQueueName = bridge.getLocalQueueName();
306 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
307 bridge.setProducerQueue(activemqQueue);
308 bridge.setProducerConnection(connection);
309 if (bridge.getJmsMessageConvertor() == null) {
310 bridge.setJmsMessageConvertor(getInboundMessageConvertor());
311 }
312 bridge.setJmsConnector(this);
313 addInboundBridge(bridge);
314 }
315 localSession.close();
316 }
317 }
318
319 protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
320 if (outboundQueueBridges != null) {
321 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
322
323 for (OutboundQueueBridge bridge : outboundQueueBridges) {
324 String queueName = bridge.getOutboundQueueName();
325 Queue foreignQueue = createForeignQueue(outboundSession, queueName);
326 bridge.setProducerQueue(foreignQueue);
327 bridge.setProducerConnection(connection);
328 if (bridge.getJmsMessageConvertor() == null) {
329 bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
330 }
331 bridge.setJmsConnector(this);
332 addOutboundBridge(bridge);
333 }
334 outboundSession.close();
335 }
336 }
337
338 protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
339 if (outboundQueueBridges != null) {
340 QueueSession localSession =
341 connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
342
343 for (OutboundQueueBridge bridge : outboundQueueBridges) {
344 String localQueueName = bridge.getLocalQueueName();
345 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
346 bridge.setConsumer(null);
347 bridge.setConsumerQueue(activemqQueue);
348 bridge.setConsumerConnection(connection);
349 bridge.setJmsConnector(this);
350 addOutboundBridge(bridge);
351 }
352 localSession.close();
353 }
354 }
355
356 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
357 Connection replyToConsumerConnection) {
358 Queue replyToProducerQueue = (Queue)destination;
359 boolean isInbound = replyToProducerConnection.equals(localConnection.get());
360
361 if (isInbound) {
362 InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
363 if (bridge == null) {
364 bridge = new InboundQueueBridge() {
365 protected Destination processReplyToDestination(Destination destination) {
366 return null;
367 }
368 };
369 try {
370 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
371 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
372 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
373 replyToConsumerSession.close();
374 bridge.setConsumerQueue(replyToConsumerQueue);
375 bridge.setProducerQueue(replyToProducerQueue);
376 bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
377 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
378 bridge.setDoHandleReplyTo(false);
379 if (bridge.getJmsMessageConvertor() == null) {
380 bridge.setJmsMessageConvertor(getInboundMessageConvertor());
381 }
382 bridge.setJmsConnector(this);
383 bridge.start();
384 LOG.info("Created replyTo bridge for " + replyToProducerQueue);
385 } catch (Exception e) {
386 LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
387 return null;
388 }
389 replyToBridges.put(replyToProducerQueue, bridge);
390 }
391 return bridge.getConsumerQueue();
392 } else {
393 OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
394 if (bridge == null) {
395 bridge = new OutboundQueueBridge() {
396 protected Destination processReplyToDestination(Destination destination) {
397 return null;
398 }
399 };
400 try {
401 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
402 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
403 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
404 replyToConsumerSession.close();
405 bridge.setConsumerQueue(replyToConsumerQueue);
406 bridge.setProducerQueue(replyToProducerQueue);
407 bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
408 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
409 bridge.setDoHandleReplyTo(false);
410 if (bridge.getJmsMessageConvertor() == null) {
411 bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
412 }
413 bridge.setJmsConnector(this);
414 bridge.start();
415 LOG.info("Created replyTo bridge for " + replyToProducerQueue);
416 } catch (Exception e) {
417 LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
418 return null;
419 }
420 replyToBridges.put(replyToProducerQueue, bridge);
421 }
422 return bridge.getConsumerQueue();
423 }
424 }
425
426 protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
427 return session.createQueue(queueName);
428 }
429
430 protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
431 Queue result = null;
432 try {
433 result = session.createQueue(queueName);
434 } catch (JMSException e) {
435 // look-up the Queue
436 try {
437 result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
438 } catch (NamingException e1) {
439 String errStr = "Failed to look-up Queue for name: " + queueName;
440 LOG.error(errStr, e);
441 JMSException jmsEx = new JMSException(errStr);
442 jmsEx.setLinkedException(e1);
443 throw jmsEx;
444 }
445 }
446 return result;
447 }
448
449 }