001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.pool;
018
019 import java.io.IOException;
020
021 import javax.jms.ConnectionFactory;
022 import javax.jms.Session;
023 import javax.jms.JMSException;
024 import javax.transaction.SystemException;
025 import javax.transaction.TransactionManager;
026
027 import javax.transaction.xa.XAResource;
028 import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031 import org.apache.activemq.ActiveMQConnectionFactory;
032 import org.apache.activemq.ActiveMQConnection;
033 import org.apache.activemq.ActiveMQSession;
034 import org.apache.activemq.util.IOExceptionSupport;
035 import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
036 import org.apache.geronimo.transaction.manager.NamedXAResource;
037 import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
038
039
040 /**
041 * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
042 * in a way that will allow the transaction manager to correctly recover XA transactions.
043 *
044 * For example, it can be used the following way:
045 * <pre>
046 * <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
047 * <property name="brokerURL" value="tcp://localhost:61616" />
048 * </bean>
049 *
050 * <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
051 * <property name="maxConnections" value="8" />
052 * <property name="transactionManager" ref="transactionManager" />
053 * <property name="connectionFactory" ref="activemqConnectionFactory" />
054 * <property name="resourceName" value="activemq.broker" />
055 * </bean>
056 *
057 * <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
058 * <property name="transactionManager" ref="transactionManager" />
059 * <property name="connectionFactory" ref="activemqConnectionFactory" />
060 * <property name="resourceName" value="activemq.broker" />
061 * </bean>
062 * </pre>
063 */
064 public class ActiveMQResourceManager {
065
066 private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQResourceManager.class);
067
068 private String resourceName;
069
070 private TransactionManager transactionManager;
071
072 private ConnectionFactory connectionFactory;
073
074 public void recoverResource() {
075 try {
076 if (!Recovery.recover(this)) {
077 LOGGER.info("Resource manager is unrecoverable");
078 }
079 } catch (NoClassDefFoundError e) {
080 LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
081 } catch (Throwable e) {
082 LOGGER.warn("Error while recovering resource manager", e);
083 }
084 }
085
086 public String getResourceName() {
087 return resourceName;
088 }
089
090 public void setResourceName(String resourceName) {
091 this.resourceName = resourceName;
092 }
093
094 public TransactionManager getTransactionManager() {
095 return transactionManager;
096 }
097
098 public void setTransactionManager(TransactionManager transactionManager) {
099 this.transactionManager = transactionManager;
100 }
101
102 public ConnectionFactory getConnectionFactory() {
103 return connectionFactory;
104 }
105
106 public void setConnectionFactory(ConnectionFactory connectionFactory) {
107 this.connectionFactory = connectionFactory;
108 }
109
110 /**
111 * This class will ensure the broker is properly recovered when wired with
112 * the Geronimo transaction manager.
113 */
114 public static class Recovery {
115
116 public static boolean isRecoverable(ActiveMQResourceManager rm) {
117 return rm.getConnectionFactory() instanceof ActiveMQConnectionFactory &&
118 rm.getTransactionManager() instanceof RecoverableTransactionManager &&
119 rm.getResourceName() != null && !"".equals(rm.getResourceName());
120 }
121
122 public static boolean recover(final ActiveMQResourceManager rm) throws IOException {
123 if (isRecoverable(rm)) {
124 try {
125 final ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
126 ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
127 final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
128 NamedXAResource namedXaResource = new WrapperNamedXAResource(session.getTransactionContext(), rm.getResourceName());
129
130 RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
131 rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
132
133 @Override
134 public String getName() {
135 return rm.getResourceName();
136 }
137
138 @Override
139 public NamedXAResource getNamedXAResource() throws SystemException {
140 try {
141 final ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
142 final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
143 activeConn.start();
144 LOGGER.debug("new namedXAResource's connection: " + activeConn);
145
146 return new ConnectionAndWrapperNamedXAResource(session.getTransactionContext(), getName(), activeConn);
147 } catch (Exception e) {
148 SystemException se = new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
149 se.initCause(e);
150 LOGGER.error(se.getLocalizedMessage(), se);
151 throw se;
152 }
153 }
154
155 @Override
156 public void returnNamedXAResource(NamedXAResource namedXaResource) {
157 if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
158 try {
159 LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
160 ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
161 } catch (Exception ignored) {
162 LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
163 }
164 }
165 }
166 });
167 return true;
168 } catch (JMSException e) {
169 throw IOExceptionSupport.create(e);
170 }
171 } else {
172 return false;
173 }
174 }
175 }
176
177 public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
178 final ActiveMQConnection connection;
179 public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, ActiveMQConnection connection) {
180 super(xaResource, name);
181 this.connection = connection;
182 }
183 }
184 }