001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transaction;
018
019 import java.io.IOException;
020 import javax.transaction.xa.XAException;
021 import javax.transaction.xa.XAResource;
022 import org.apache.activemq.broker.TransactionBroker;
023 import org.apache.activemq.command.ConnectionId;
024 import org.apache.activemq.command.TransactionId;
025 import org.apache.activemq.command.XATransactionId;
026 import org.apache.activemq.store.TransactionStore;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 *
032 */
033 public class XATransaction extends Transaction {
034
035 private static final Logger LOG = LoggerFactory.getLogger(XATransaction.class);
036
037 private final TransactionStore transactionStore;
038 private final XATransactionId xid;
039 private final TransactionBroker broker;
040 private final ConnectionId connectionId;
041
042 public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) {
043 this.transactionStore = transactionStore;
044 this.xid = xid;
045 this.broker = broker;
046 this.connectionId = connectionId;
047 if (LOG.isDebugEnabled()) {
048 LOG.debug("XA Transaction new/begin : " + xid);
049 }
050 }
051
052 @Override
053 public void commit(boolean onePhase) throws XAException, IOException {
054 if (LOG.isDebugEnabled()) {
055 LOG.debug("XA Transaction commit onePhase:" + onePhase + ", xid: " + xid);
056 }
057
058 switch (getState()) {
059 case START_STATE:
060 // 1 phase commit, no work done.
061 checkForPreparedState(onePhase);
062 setStateFinished();
063 break;
064 case IN_USE_STATE:
065 // 1 phase commit, work done.
066 checkForPreparedState(onePhase);
067 doPrePrepare();
068 setStateFinished();
069 storeCommit(getTransactionId(), false, preCommitTask, postCommitTask);
070 break;
071 case PREPARED_STATE:
072 // 2 phase commit, work done.
073 // We would record commit here.
074 setStateFinished();
075 storeCommit(getTransactionId(), true, preCommitTask, postCommitTask);
076 break;
077 default:
078 illegalStateTransition("commit");
079 }
080 }
081
082 private void storeCommit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit)
083 throws XAException, IOException {
084 try {
085 transactionStore.commit(getTransactionId(), wasPrepared, preCommitTask, postCommitTask);
086 waitPostCommitDone(postCommitTask);
087 } catch (XAException xae) {
088 throw xae;
089 } catch (Throwable t) {
090 LOG.warn("Store COMMIT FAILED: ", t);
091 rollback();
092 XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
093 xae.errorCode = XAException.XA_RBOTHER;
094 xae.initCause(t);
095 throw xae;
096 }
097 }
098
099 private void illegalStateTransition(String callName) throws XAException {
100 XAException xae = new XAException("Cannot call " + callName + " now.");
101 xae.errorCode = XAException.XAER_PROTO;
102 throw xae;
103 }
104
105 private void checkForPreparedState(boolean onePhase) throws XAException {
106 if (!onePhase) {
107 XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
108 xae.errorCode = XAException.XAER_PROTO;
109 throw xae;
110 }
111 }
112
113 private void doPrePrepare() throws XAException, IOException {
114 try {
115 prePrepare();
116 } catch (XAException e) {
117 throw e;
118 } catch (Throwable e) {
119 LOG.warn("PRE-PREPARE FAILED: ", e);
120 rollback();
121 XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back.");
122 xae.errorCode = XAException.XA_RBOTHER;
123 xae.initCause(e);
124 throw xae;
125 }
126 }
127
128 @Override
129 public void rollback() throws XAException, IOException {
130
131 if (LOG.isDebugEnabled()) {
132 LOG.debug("XA Transaction rollback: " + xid);
133 }
134
135 switch (getState()) {
136 case START_STATE:
137 // 1 phase rollback no work done.
138 setStateFinished();
139 break;
140 case IN_USE_STATE:
141 // 1 phase rollback work done.
142 setStateFinished();
143 transactionStore.rollback(getTransactionId());
144 doPostRollback();
145 break;
146 case PREPARED_STATE:
147 // 2 phase rollback work done.
148 setStateFinished();
149 transactionStore.rollback(getTransactionId());
150 doPostRollback();
151 break;
152 case FINISHED_STATE:
153 // failure to commit
154 transactionStore.rollback(getTransactionId());
155 doPostRollback();
156 break;
157 default:
158 throw new XAException("Invalid state");
159 }
160
161 }
162
163 private void doPostRollback() throws XAException {
164 try {
165 fireAfterRollback();
166 } catch (Throwable e) {
167 // I guess this could happen. Post commit task failed
168 // to execute properly.
169 LOG.warn("POST ROLLBACK FAILED: ", e);
170 XAException xae = new XAException("POST ROLLBACK FAILED");
171 xae.errorCode = XAException.XAER_RMERR;
172 xae.initCause(e);
173 throw xae;
174 }
175 }
176
177 @Override
178 public int prepare() throws XAException, IOException {
179 if (LOG.isDebugEnabled()) {
180 LOG.debug("XA Transaction prepare: " + xid);
181 }
182
183 switch (getState()) {
184 case START_STATE:
185 // No work done.. no commit/rollback needed.
186 setStateFinished();
187 return XAResource.XA_RDONLY;
188 case IN_USE_STATE:
189 // We would record prepare here.
190 doPrePrepare();
191 setState(Transaction.PREPARED_STATE);
192 transactionStore.prepare(getTransactionId());
193 return XAResource.XA_OK;
194 default:
195 illegalStateTransition("prepare");
196 return XAResource.XA_RDONLY;
197 }
198 }
199
200 private void setStateFinished() {
201 setState(Transaction.FINISHED_STATE);
202 broker.removeTransaction(xid);
203 }
204
205 public ConnectionId getConnectionId() {
206 return connectionId;
207 }
208
209 @Override
210 public TransactionId getTransactionId() {
211 return xid;
212 }
213
214 @Override
215 public Logger getLog() {
216 return LOG;
217 }
218
219 public XATransactionId getXid() {
220 return xid;
221 }
222 }