001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network.jms;
018
019 import java.util.concurrent.atomic.AtomicBoolean;
020 import javax.jms.Connection;
021 import javax.jms.Destination;
022 import javax.jms.JMSException;
023 import javax.jms.Message;
024 import javax.jms.MessageConsumer;
025 import javax.jms.MessageListener;
026 import javax.jms.MessageProducer;
027 import org.apache.activemq.Service;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 /**
032 * A Destination bridge is used to bridge between to different JMS systems
033 */
034 public abstract class DestinationBridge implements Service, MessageListener {
035
036 private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
037
038 protected MessageConsumer consumer;
039 protected AtomicBoolean started = new AtomicBoolean(false);
040 protected JmsMesageConvertor jmsMessageConvertor;
041 protected boolean doHandleReplyTo = true;
042 protected JmsConnector jmsConnector;
043
044 /**
045 * @return Returns the consumer.
046 */
047 public MessageConsumer getConsumer() {
048 return consumer;
049 }
050
051 /**
052 * @param consumer The consumer to set.
053 */
054 public void setConsumer(MessageConsumer consumer) {
055 this.consumer = consumer;
056 }
057
058 /**
059 * @param connector
060 */
061 public void setJmsConnector(JmsConnector connector) {
062 this.jmsConnector = connector;
063 }
064
065 /**
066 * @return Returns the inboundMessageConvertor.
067 */
068 public JmsMesageConvertor getJmsMessageConvertor() {
069 return jmsMessageConvertor;
070 }
071
072 /**
073 * @param jmsMessageConvertor
074 */
075 public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
076 this.jmsMessageConvertor = jmsMessageConvertor;
077 }
078
079 protected Destination processReplyToDestination(Destination destination) {
080 return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
081 }
082
083 public void start() throws Exception {
084 if (started.compareAndSet(false, true)) {
085 createConsumer();
086 createProducer();
087 }
088 }
089
090 public void stop() throws Exception {
091 started.set(false);
092 }
093
094 public void onMessage(Message message) {
095
096 int attempt = 0;
097 final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries();
098
099 while (started.get() && message != null && ++attempt <= maxRetries) {
100
101 try {
102
103 if (attempt > 0) {
104 try {
105 Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt));
106 } catch(InterruptedException e) {
107 break;
108 }
109 }
110
111 Message converted;
112 if (jmsMessageConvertor != null) {
113 if (doHandleReplyTo) {
114 Destination replyTo = message.getJMSReplyTo();
115 if (replyTo != null) {
116 converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
117 } else {
118 converted = jmsMessageConvertor.convert(message);
119 }
120 } else {
121 message.setJMSReplyTo(null);
122 converted = jmsMessageConvertor.convert(message);
123 }
124 } else {
125 // The Producer side is not up or not yet configured, retry.
126 continue;
127 }
128
129 try {
130 sendMessage(converted);
131 } catch(Exception e) {
132 jmsConnector.handleConnectionFailure(getConnectionForProducer());
133 continue;
134 }
135
136 try {
137 message.acknowledge();
138 } catch(Exception e) {
139 jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
140 continue;
141 }
142
143 // if we got here then it made it out and was ack'd
144 return;
145
146 } catch (Exception e) {
147 LOG.info("failed to forward message on attempt: " + attempt +
148 " reason: " + e + " message: " + message, e);
149 }
150 }
151 }
152
153 /**
154 * @return Returns the doHandleReplyTo.
155 */
156 protected boolean isDoHandleReplyTo() {
157 return doHandleReplyTo;
158 }
159
160 /**
161 * @param doHandleReplyTo The doHandleReplyTo to set.
162 */
163 protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
164 this.doHandleReplyTo = doHandleReplyTo;
165 }
166
167 protected abstract MessageConsumer createConsumer() throws JMSException;
168
169 protected abstract MessageProducer createProducer() throws JMSException;
170
171 protected abstract void sendMessage(Message message) throws JMSException;
172
173 protected abstract Connection getConnnectionForConsumer();
174
175 protected abstract Connection getConnectionForProducer();
176
177 }