001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.LinkedHashMap;
023 import java.util.List;
024 import java.util.Map;
025 import java.util.concurrent.ConcurrentHashMap;
026
027 import javax.jms.JMSException;
028 import javax.transaction.xa.XAException;
029
030 import org.apache.activemq.ActiveMQMessageAudit;
031 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
032 import org.apache.activemq.broker.region.Destination;
033 import org.apache.activemq.broker.region.Queue;
034 import org.apache.activemq.command.ActiveMQDestination;
035 import org.apache.activemq.command.BaseCommand;
036 import org.apache.activemq.command.ConnectionInfo;
037 import org.apache.activemq.command.LocalTransactionId;
038 import org.apache.activemq.command.Message;
039 import org.apache.activemq.command.MessageAck;
040 import org.apache.activemq.command.ProducerInfo;
041 import org.apache.activemq.command.TransactionId;
042 import org.apache.activemq.command.XATransactionId;
043 import org.apache.activemq.state.ProducerState;
044 import org.apache.activemq.store.TransactionRecoveryListener;
045 import org.apache.activemq.store.TransactionStore;
046 import org.apache.activemq.transaction.LocalTransaction;
047 import org.apache.activemq.transaction.Synchronization;
048 import org.apache.activemq.transaction.Transaction;
049 import org.apache.activemq.transaction.XATransaction;
050 import org.apache.activemq.util.IOExceptionSupport;
051 import org.apache.activemq.util.WrappedException;
052 import org.slf4j.Logger;
053 import org.slf4j.LoggerFactory;
054
055 /**
056 * This broker filter handles the transaction related operations in the Broker
057 * interface.
058 *
059 *
060 */
061 public class TransactionBroker extends BrokerFilter {
062
063 private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
064
065 // The prepared XA transactions.
066 private TransactionStore transactionStore;
067 private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
068 private ActiveMQMessageAudit audit;
069
070 public TransactionBroker(Broker next, TransactionStore transactionStore) {
071 super(next);
072 this.transactionStore = transactionStore;
073 }
074
075 // ////////////////////////////////////////////////////////////////////////////
076 //
077 // Life cycle Methods
078 //
079 // ////////////////////////////////////////////////////////////////////////////
080
081 /**
082 * Recovers any prepared transactions.
083 */
084 public void start() throws Exception {
085 transactionStore.start();
086 try {
087 final ConnectionContext context = new ConnectionContext();
088 context.setBroker(this);
089 context.setInRecoveryMode(true);
090 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
091 context.setProducerFlowControl(false);
092 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
093 producerExchange.setMutable(true);
094 producerExchange.setConnectionContext(context);
095 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
096 final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
097 consumerExchange.setConnectionContext(context);
098 transactionStore.recover(new TransactionRecoveryListener() {
099 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
100 try {
101 beginTransaction(context, xid);
102 XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
103 for (int i = 0; i < addedMessages.length; i++) {
104 forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
105 }
106 for (int i = 0; i < aks.length; i++) {
107 forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
108 }
109 transaction.setState(Transaction.PREPARED_STATE);
110 registerMBean(transaction);
111 if (LOG.isDebugEnabled()) {
112 LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
113 }
114 } catch (Throwable e) {
115 throw new WrappedException(e);
116 }
117 }
118 });
119 } catch (WrappedException e) {
120 Throwable cause = e.getCause();
121 throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
122 }
123 next.start();
124 }
125
126 private void registerMBean(XATransaction transaction) {
127 if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
128 ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
129 managedRegionBroker.registerRecoveredTransactionMBean(transaction);
130 }
131 }
132
133 private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
134 ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
135 Destination destination = addDestination(context, amqDestination, false);
136 registerSync(destination, transaction, ack);
137 }
138
139 private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
140 if (destination instanceof Queue) {
141 Synchronization sync = new PreparedDestinationCompletion((Queue) destination, command.isMessage());
142 // ensure one per destination in the list
143 transaction.removeSynchronization(sync);
144 transaction.addSynchronization(sync);
145 }
146 }
147
148 static class PreparedDestinationCompletion extends Synchronization {
149 final Queue queue;
150 final boolean messageSend;
151 public PreparedDestinationCompletion(final Queue queue, boolean messageSend) {
152 this.queue = queue;
153 // rollback relevant to acks, commit to sends
154 this.messageSend = messageSend;
155 }
156
157 @Override
158 public int hashCode() {
159 return System.identityHashCode(queue) +
160 System.identityHashCode(Boolean.valueOf(messageSend));
161 }
162
163 @Override
164 public boolean equals(Object other) {
165 return other instanceof PreparedDestinationCompletion &&
166 queue.equals(((PreparedDestinationCompletion) other).queue) &&
167 messageSend == ((PreparedDestinationCompletion) other).messageSend;
168 }
169
170 @Override
171 public void afterRollback() throws Exception {
172 if (!messageSend) {
173 queue.clearPendingMessages();
174 if (LOG.isDebugEnabled()) {
175 LOG.debug("cleared pending from afterRollback : " + queue);
176 }
177 }
178 }
179
180 @Override
181 public void afterCommit() throws Exception {
182 if (messageSend) {
183 queue.clearPendingMessages();
184 if (LOG.isDebugEnabled()) {
185 LOG.debug("cleared pending from afterCommit : " + queue);
186 }
187 }
188 }
189 }
190
191 public void stop() throws Exception {
192 transactionStore.stop();
193 next.stop();
194 }
195
196 // ////////////////////////////////////////////////////////////////////////////
197 //
198 // BrokerFilter overrides
199 //
200 // ////////////////////////////////////////////////////////////////////////////
201 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
202 List<TransactionId> txs = new ArrayList<TransactionId>();
203 synchronized (xaTransactions) {
204 for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
205 Transaction tx = iter.next();
206 if (tx.isPrepared()) {
207 if (LOG.isDebugEnabled()) {
208 LOG.debug("prepared transaction: " + tx.getTransactionId());
209 }
210 txs.add(tx.getTransactionId());
211 }
212 }
213 }
214 XATransactionId rc[] = new XATransactionId[txs.size()];
215 txs.toArray(rc);
216 if (LOG.isDebugEnabled()) {
217 LOG.debug("prepared transaction list size: " + rc.length);
218 }
219 return rc;
220 }
221
222 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
223 // the transaction may have already been started.
224 if (xid.isXATransaction()) {
225 XATransaction transaction = null;
226 synchronized (xaTransactions) {
227 transaction = xaTransactions.get(xid);
228 if (transaction != null) {
229 return;
230 }
231 transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
232 xaTransactions.put(xid, transaction);
233 }
234 } else {
235 Map<TransactionId, Transaction> transactionMap = context.getTransactions();
236 Transaction transaction = transactionMap.get(xid);
237 if (transaction != null) {
238 throw new JMSException("Transaction '" + xid + "' has already been started.");
239 }
240 transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
241 transactionMap.put(xid, transaction);
242 }
243 }
244
245 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
246 Transaction transaction = getTransaction(context, xid, false);
247 return transaction.prepare();
248 }
249
250 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
251 Transaction transaction = getTransaction(context, xid, true);
252 transaction.commit(onePhase);
253 }
254
255 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
256 Transaction transaction = getTransaction(context, xid, true);
257 transaction.rollback();
258 }
259
260 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
261 Transaction transaction = getTransaction(context, xid, true);
262 transaction.rollback();
263 }
264
265 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
266 // This method may be invoked recursively.
267 // Track original tx so that it can be restored.
268 final ConnectionContext context = consumerExchange.getConnectionContext();
269 Transaction originalTx = context.getTransaction();
270 Transaction transaction = null;
271 if (ack.isInTransaction()) {
272 transaction = getTransaction(context, ack.getTransactionId(), false);
273 }
274 context.setTransaction(transaction);
275 try {
276 next.acknowledge(consumerExchange, ack);
277 } finally {
278 context.setTransaction(originalTx);
279 }
280 }
281
282 public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
283 // This method may be invoked recursively.
284 // Track original tx so that it can be restored.
285 final ConnectionContext context = producerExchange.getConnectionContext();
286 Transaction originalTx = context.getTransaction();
287 Transaction transaction = null;
288 Synchronization sync = null;
289 if (message.getTransactionId() != null) {
290 transaction = getTransaction(context, message.getTransactionId(), false);
291 if (transaction != null) {
292 sync = new Synchronization() {
293
294 public void afterRollback() {
295 if (audit != null) {
296 audit.rollback(message);
297 }
298 }
299 };
300 transaction.addSynchronization(sync);
301 }
302 }
303 if (audit == null || !audit.isDuplicate(message)) {
304 context.setTransaction(transaction);
305 try {
306 next.send(producerExchange, message);
307 } finally {
308 context.setTransaction(originalTx);
309 }
310 } else {
311 if (sync != null && transaction != null) {
312 transaction.removeSynchronization(sync);
313 }
314 if (LOG.isDebugEnabled()) {
315 LOG.debug("IGNORING duplicate message " + message);
316 }
317 }
318 }
319
320 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
321 for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
322 try {
323 Transaction transaction = iter.next();
324 transaction.rollback();
325 } catch (Exception e) {
326 LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
327 }
328 iter.remove();
329 }
330
331 synchronized (xaTransactions) {
332 // first find all txs that belongs to the connection
333 ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
334 for (XATransaction tx : xaTransactions.values()) {
335 if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
336 txs.add(tx);
337 }
338 }
339
340 // then remove them
341 // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
342 for (XATransaction tx : txs) {
343 try {
344 tx.rollback();
345 } catch (Exception e) {
346 LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
347 }
348 }
349
350 }
351 next.removeConnection(context, info, error);
352 }
353
354 // ////////////////////////////////////////////////////////////////////////////
355 //
356 // Implementation help methods.
357 //
358 // ////////////////////////////////////////////////////////////////////////////
359 public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
360 Map transactionMap = null;
361 synchronized (xaTransactions) {
362 transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
363 }
364 Transaction transaction = (Transaction)transactionMap.get(xid);
365 if (transaction != null) {
366 return transaction;
367 }
368 if (xid.isXATransaction()) {
369 XAException e = new XAException("Transaction '" + xid + "' has not been started.");
370 e.errorCode = XAException.XAER_NOTA;
371 throw e;
372 } else {
373 throw new JMSException("Transaction '" + xid + "' has not been started.");
374 }
375 }
376
377 public void removeTransaction(XATransactionId xid) {
378 synchronized (xaTransactions) {
379 xaTransactions.remove(xid);
380 }
381 }
382
383 public synchronized void brokerServiceStarted() {
384 super.brokerServiceStarted();
385 if (getBrokerService().isSupportFailOver() && audit == null) {
386 audit = new ActiveMQMessageAudit();
387 }
388 }
389
390 }