001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network.jms;
018
019 import javax.jms.Connection;
020 import javax.jms.Destination;
021 import javax.jms.ExceptionListener;
022 import javax.jms.JMSException;
023 import javax.jms.Topic;
024 import javax.jms.TopicConnection;
025 import javax.jms.TopicConnectionFactory;
026 import javax.jms.TopicSession;
027 import javax.jms.Session;
028 import javax.naming.NamingException;
029
030 import org.slf4j.Logger;
031 import org.slf4j.LoggerFactory;
032
033 /**
034 * A Bridge to other JMS Topic providers
035 *
036 * @org.apache.xbean.XBean
037 */
038 public class JmsTopicConnector extends JmsConnector {
039 private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class);
040 private String outboundTopicConnectionFactoryName;
041 private String localConnectionFactoryName;
042 private TopicConnectionFactory outboundTopicConnectionFactory;
043 private TopicConnectionFactory localTopicConnectionFactory;
044 private InboundTopicBridge[] inboundTopicBridges;
045 private OutboundTopicBridge[] outboundTopicBridges;
046
047 /**
048 * @return Returns the inboundTopicBridges.
049 */
050 public InboundTopicBridge[] getInboundTopicBridges() {
051 return inboundTopicBridges;
052 }
053
054 /**
055 * @param inboundTopicBridges The inboundTopicBridges to set.
056 */
057 public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
058 this.inboundTopicBridges = inboundTopicBridges;
059 }
060
061 /**
062 * @return Returns the outboundTopicBridges.
063 */
064 public OutboundTopicBridge[] getOutboundTopicBridges() {
065 return outboundTopicBridges;
066 }
067
068 /**
069 * @param outboundTopicBridges The outboundTopicBridges to set.
070 */
071 public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
072 this.outboundTopicBridges = outboundTopicBridges;
073 }
074
075 /**
076 * @return Returns the localTopicConnectionFactory.
077 */
078 public TopicConnectionFactory getLocalTopicConnectionFactory() {
079 return localTopicConnectionFactory;
080 }
081
082 /**
083 * @param localTopicConnectionFactory The localTopicConnectionFactory to set.
084 */
085 public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
086 this.localTopicConnectionFactory = localConnectionFactory;
087 }
088
089 /**
090 * @return Returns the outboundTopicConnectionFactory.
091 */
092 public TopicConnectionFactory getOutboundTopicConnectionFactory() {
093 return outboundTopicConnectionFactory;
094 }
095
096 /**
097 * @return Returns the outboundTopicConnectionFactoryName.
098 */
099 public String getOutboundTopicConnectionFactoryName() {
100 return outboundTopicConnectionFactoryName;
101 }
102
103 /**
104 * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set.
105 */
106 public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
107 this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
108 }
109
110 /**
111 * @return Returns the localConnectionFactoryName.
112 */
113 public String getLocalConnectionFactoryName() {
114 return localConnectionFactoryName;
115 }
116
117 /**
118 * @param localConnectionFactoryName The localConnectionFactoryName to set.
119 */
120 public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
121 this.localConnectionFactoryName = localConnectionFactoryName;
122 }
123
124 /**
125 * @return Returns the localTopicConnection.
126 */
127 public TopicConnection getLocalTopicConnection() {
128 return (TopicConnection) localConnection.get();
129 }
130
131 /**
132 * @param localTopicConnection The localTopicConnection to set.
133 */
134 public void setLocalTopicConnection(TopicConnection localTopicConnection) {
135 this.localConnection.set(localTopicConnection);
136 }
137
138 /**
139 * @return Returns the outboundTopicConnection.
140 */
141 public TopicConnection getOutboundTopicConnection() {
142 return (TopicConnection) foreignConnection.get();
143 }
144
145 /**
146 * @param outboundTopicConnection The outboundTopicConnection to set.
147 */
148 public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
149 this.foreignConnection.set(foreignTopicConnection);
150 }
151
152 /**
153 * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set.
154 */
155 public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
156 this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
157 }
158
159 @Override
160 protected void initializeForeignConnection() throws NamingException, JMSException {
161
162 final TopicConnection newConnection;
163
164 if (foreignConnection.get() == null) {
165 // get the connection factories
166 if (outboundTopicConnectionFactory == null) {
167 // look it up from JNDI
168 if (outboundTopicConnectionFactoryName != null) {
169 outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
170 .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
171 if (outboundUsername != null) {
172 newConnection = outboundTopicConnectionFactory
173 .createTopicConnection(outboundUsername, outboundPassword);
174 } else {
175 newConnection = outboundTopicConnectionFactory.createTopicConnection();
176 }
177 } else {
178 throw new JMSException("Cannot create foreignConnection - no information");
179 }
180 } else {
181 if (outboundUsername != null) {
182 newConnection = outboundTopicConnectionFactory
183 .createTopicConnection(outboundUsername, outboundPassword);
184 } else {
185 newConnection = outboundTopicConnectionFactory.createTopicConnection();
186 }
187 }
188 } else {
189 // Clear if for now in case something goes wrong during the init.
190 newConnection = (TopicConnection) foreignConnection.getAndSet(null);
191 }
192
193 if (outboundClientId != null && outboundClientId.length() > 0) {
194 newConnection.setClientID(getOutboundClientId());
195 }
196 newConnection.start();
197
198 outboundMessageConvertor.setConnection(newConnection);
199
200 // Configure the bridges with the new Outbound connection.
201 initializeInboundDestinationBridgesOutboundSide(newConnection);
202 initializeOutboundDestinationBridgesOutboundSide(newConnection);
203
204 // Register for any async error notifications now so we can reset in the
205 // case where there's not a lot of activity and a connection drops.
206 newConnection.setExceptionListener(new ExceptionListener() {
207 @Override
208 public void onException(JMSException exception) {
209 handleConnectionFailure(newConnection);
210 }
211 });
212
213 // At this point all looks good, so this our current connection now.
214 foreignConnection.set(newConnection);
215 }
216
217 @Override
218 protected void initializeLocalConnection() throws NamingException, JMSException {
219
220 final TopicConnection newConnection;
221
222 if (localConnection.get() == null) {
223 // get the connection factories
224 if (localTopicConnectionFactory == null) {
225 if (embeddedConnectionFactory == null) {
226 // look it up from JNDI
227 if (localConnectionFactoryName != null) {
228 localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
229 .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
230 if (localUsername != null) {
231 newConnection = localTopicConnectionFactory
232 .createTopicConnection(localUsername, localPassword);
233 } else {
234 newConnection = localTopicConnectionFactory.createTopicConnection();
235 }
236 } else {
237 throw new JMSException("Cannot create localConnection - no information");
238 }
239 } else {
240 newConnection = embeddedConnectionFactory.createTopicConnection();
241 }
242 } else {
243 if (localUsername != null) {
244 newConnection = localTopicConnectionFactory.
245 createTopicConnection(localUsername, localPassword);
246 } else {
247 newConnection = localTopicConnectionFactory.createTopicConnection();
248 }
249 }
250
251 } else {
252 // Clear if for now in case something goes wrong during the init.
253 newConnection = (TopicConnection) localConnection.getAndSet(null);
254 }
255
256 if (localClientId != null && localClientId.length() > 0) {
257 newConnection.setClientID(getLocalClientId());
258 }
259 newConnection.start();
260
261 inboundMessageConvertor.setConnection(newConnection);
262
263 // Configure the bridges with the new Local connection.
264 initializeInboundDestinationBridgesLocalSide(newConnection);
265 initializeOutboundDestinationBridgesLocalSide(newConnection);
266
267 // Register for any async error notifications now so we can reset in the
268 // case where there's not a lot of activity and a connection drops.
269 newConnection.setExceptionListener(new ExceptionListener() {
270 @Override
271 public void onException(JMSException exception) {
272 handleConnectionFailure(newConnection);
273 }
274 });
275
276 // At this point all looks good, so this our current connection now.
277 localConnection.set(newConnection);
278 }
279
280 protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
281 if (inboundTopicBridges != null) {
282 TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
283
284 for (InboundTopicBridge bridge : inboundTopicBridges) {
285 String TopicName = bridge.getInboundTopicName();
286 Topic foreignTopic = createForeignTopic(outboundSession, TopicName);
287 bridge.setConsumer(null);
288 bridge.setConsumerTopic(foreignTopic);
289 bridge.setConsumerConnection(connection);
290 bridge.setJmsConnector(this);
291 addInboundBridge(bridge);
292 }
293 outboundSession.close();
294 }
295 }
296
297 protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
298 if (inboundTopicBridges != null) {
299 TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
300
301 for (InboundTopicBridge bridge : inboundTopicBridges) {
302 String localTopicName = bridge.getLocalTopicName();
303 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
304 bridge.setProducerTopic(activemqTopic);
305 bridge.setProducerConnection(connection);
306 if (bridge.getJmsMessageConvertor() == null) {
307 bridge.setJmsMessageConvertor(getInboundMessageConvertor());
308 }
309 bridge.setJmsConnector(this);
310 addInboundBridge(bridge);
311 }
312 localSession.close();
313 }
314 }
315
316 protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
317 if (outboundTopicBridges != null) {
318 TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
319
320 for (OutboundTopicBridge bridge : outboundTopicBridges) {
321 String topicName = bridge.getOutboundTopicName();
322 Topic foreignTopic = createForeignTopic(outboundSession, topicName);
323 bridge.setProducerTopic(foreignTopic);
324 bridge.setProducerConnection(connection);
325 if (bridge.getJmsMessageConvertor() == null) {
326 bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
327 }
328 bridge.setJmsConnector(this);
329 addOutboundBridge(bridge);
330 }
331 outboundSession.close();
332 }
333 }
334
335 protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
336 if (outboundTopicBridges != null) {
337 TopicSession localSession =
338 connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
339
340 for (OutboundTopicBridge bridge : outboundTopicBridges) {
341 String localTopicName = bridge.getLocalTopicName();
342 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
343 bridge.setConsumer(null);
344 bridge.setConsumerTopic(activemqTopic);
345 bridge.setConsumerConnection(connection);
346 bridge.setJmsConnector(this);
347 addOutboundBridge(bridge);
348 }
349 localSession.close();
350 }
351 }
352
353 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
354 Connection replyToConsumerConnection) {
355 Topic replyToProducerTopic = (Topic)destination;
356 boolean isInbound = replyToProducerConnection.equals(localConnection.get());
357
358 if (isInbound) {
359 InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
360 if (bridge == null) {
361 bridge = new InboundTopicBridge() {
362 protected Destination processReplyToDestination(Destination destination) {
363 return null;
364 }
365 };
366 try {
367 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
368 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
369 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
370 replyToConsumerSession.close();
371 bridge.setConsumerTopic(replyToConsumerTopic);
372 bridge.setProducerTopic(replyToProducerTopic);
373 bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
374 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
375 bridge.setDoHandleReplyTo(false);
376 if (bridge.getJmsMessageConvertor() == null) {
377 bridge.setJmsMessageConvertor(getInboundMessageConvertor());
378 }
379 bridge.setJmsConnector(this);
380 bridge.start();
381 LOG.info("Created replyTo bridge for " + replyToProducerTopic);
382 } catch (Exception e) {
383 LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
384 return null;
385 }
386 replyToBridges.put(replyToProducerTopic, bridge);
387 }
388 return bridge.getConsumerTopic();
389 } else {
390 OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
391 if (bridge == null) {
392 bridge = new OutboundTopicBridge() {
393 protected Destination processReplyToDestination(Destination destination) {
394 return null;
395 }
396 };
397 try {
398 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
399 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
400 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
401 replyToConsumerSession.close();
402 bridge.setConsumerTopic(replyToConsumerTopic);
403 bridge.setProducerTopic(replyToProducerTopic);
404 bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
405 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
406 bridge.setDoHandleReplyTo(false);
407 if (bridge.getJmsMessageConvertor() == null) {
408 bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
409 }
410 bridge.setJmsConnector(this);
411 bridge.start();
412 LOG.info("Created replyTo bridge for " + replyToProducerTopic);
413 } catch (Exception e) {
414 LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
415 return null;
416 }
417 replyToBridges.put(replyToProducerTopic, bridge);
418 }
419 return bridge.getConsumerTopic();
420 }
421 }
422
423 protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
424 return session.createTopic(topicName);
425 }
426
427 protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
428 Topic result = null;
429 try {
430 result = session.createTopic(topicName);
431 } catch (JMSException e) {
432 // look-up the Topic
433 try {
434 result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
435 } catch (NamingException e1) {
436 String errStr = "Failed to look-up Topic for name: " + topicName;
437 LOG.error(errStr, e);
438 JMSException jmsEx = new JMSException(errStr);
439 jmsEx.setLinkedException(e1);
440 throw jmsEx;
441 }
442 }
443 return result;
444 }
445
446 }