001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transaction;
018
019 import java.io.IOException;
020 import javax.transaction.xa.XAException;
021 import org.apache.activemq.broker.ConnectionContext;
022 import org.apache.activemq.command.LocalTransactionId;
023 import org.apache.activemq.command.TransactionId;
024 import org.apache.activemq.store.TransactionStore;
025 import org.slf4j.Logger;
026 import org.slf4j.LoggerFactory;
027
028 /**
029 *
030 */
031 public class LocalTransaction extends Transaction {
032
033 private static final Logger LOG = LoggerFactory.getLogger(LocalTransaction.class);
034
035 private final TransactionStore transactionStore;
036 private final LocalTransactionId xid;
037 private final ConnectionContext context;
038
039 public LocalTransaction(TransactionStore transactionStore, LocalTransactionId xid, ConnectionContext context) {
040 this.transactionStore = transactionStore;
041 this.xid = xid;
042 this.context = context;
043 }
044
045 @Override
046 public void commit(boolean onePhase) throws XAException, IOException {
047 if (LOG.isDebugEnabled()) {
048 LOG.debug("commit: " + xid
049 + " syncCount: " + size());
050 }
051
052 // Get ready for commit.
053 try {
054 prePrepare();
055 } catch (XAException e) {
056 throw e;
057 } catch (Throwable e) {
058 LOG.warn("COMMIT FAILED: ", e);
059 rollback();
060 // Let them know we rolled back.
061 XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
062 xae.errorCode = XAException.XA_RBOTHER;
063 xae.initCause(e);
064 throw xae;
065 }
066
067 setState(Transaction.FINISHED_STATE);
068 context.getTransactions().remove(xid);
069 // Sync on transaction store to avoid out of order messages in the cursor
070 // https://issues.apache.org/activemq/browse/AMQ-2594
071 try {
072 transactionStore.commit(getTransactionId(), false,preCommitTask, postCommitTask);
073 this.waitPostCommitDone(postCommitTask);
074 } catch (Throwable t) {
075 LOG.warn("Store COMMIT FAILED: ", t);
076 rollback();
077 XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
078 xae.errorCode = XAException.XA_RBOTHER;
079 xae.initCause(t);
080 throw xae;
081 }
082 }
083
084 @Override
085 public void rollback() throws XAException, IOException {
086
087 if (LOG.isDebugEnabled()) {
088 LOG.debug("rollback: " + xid
089 + " syncCount: " + size());
090 }
091 setState(Transaction.FINISHED_STATE);
092 context.getTransactions().remove(xid);
093 // Sync on transaction store to avoid out of order messages in the cursor
094 // https://issues.apache.org/activemq/browse/AMQ-2594
095 synchronized (transactionStore) {
096 transactionStore.rollback(getTransactionId());
097
098 try {
099 fireAfterRollback();
100 } catch (Throwable e) {
101 LOG.warn("POST ROLLBACK FAILED: ", e);
102 XAException xae = new XAException("POST ROLLBACK FAILED");
103 xae.errorCode = XAException.XAER_RMERR;
104 xae.initCause(e);
105 throw xae;
106 }
107 }
108 }
109
110 @Override
111 public int prepare() throws XAException {
112 XAException xae = new XAException("Prepare not implemented on Local Transactions.");
113 xae.errorCode = XAException.XAER_RMERR;
114 throw xae;
115 }
116
117 @Override
118 public TransactionId getTransactionId() {
119 return xid;
120 }
121
122 @Override
123 public Logger getLog() {
124 return LOG;
125 }
126 }