001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transaction;
018
019 import java.io.IOException;
020 import java.io.InterruptedIOException;
021 import java.util.ArrayList;
022 import java.util.Collections;
023 import java.util.Iterator;
024 import java.util.concurrent.Callable;
025 import java.util.concurrent.ExecutionException;
026 import java.util.concurrent.FutureTask;
027 import javax.transaction.xa.XAException;
028 import org.apache.activemq.command.TransactionId;
029 import org.slf4j.Logger;
030
031 /**
032 * Keeps track of all the actions the need to be done when a transaction does a
033 * commit or rollback.
034 *
035 *
036 */
037 public abstract class Transaction {
038
039 public static final byte START_STATE = 0; // can go to: 1,2,3
040 public static final byte IN_USE_STATE = 1; // can go to: 2,3
041 public static final byte PREPARED_STATE = 2; // can go to: 3
042 public static final byte FINISHED_STATE = 3;
043
044 private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
045 private byte state = START_STATE;
046 protected FutureTask<?> preCommitTask = new FutureTask<Object>(new Callable<Object>() {
047 public Object call() throws Exception {
048 doPreCommit();
049 return null;
050 }
051 });
052 protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
053 public Object call() throws Exception {
054 doPostCommit();
055 return null;
056 }
057 });
058
059 public byte getState() {
060 return state;
061 }
062
063 public void setState(byte state) {
064 this.state = state;
065 }
066
067 public void addSynchronization(Synchronization r) {
068 synchronizations.add(r);
069 if (state == START_STATE) {
070 state = IN_USE_STATE;
071 }
072 }
073
074 public void removeSynchronization(Synchronization r) {
075 synchronizations.remove(r);
076 }
077
078 public void prePrepare() throws Exception {
079
080 // Is it ok to call prepare now given the state of the
081 // transaction?
082 switch (state) {
083 case START_STATE:
084 case IN_USE_STATE:
085 break;
086 default:
087 XAException xae = new XAException("Prepare cannot be called now.");
088 xae.errorCode = XAException.XAER_PROTO;
089 throw xae;
090 }
091
092 // // Run the prePrepareTasks
093 // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
094 // Callback r = (Callback) iter.next();
095 // r.execute();
096 // }
097 }
098
099 protected void fireBeforeCommit() throws Exception {
100 for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
101 Synchronization s = iter.next();
102 s.beforeCommit();
103 }
104 }
105
106 protected void fireAfterCommit() throws Exception {
107 for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
108 Synchronization s = iter.next();
109 s.afterCommit();
110 }
111 }
112
113 public void fireAfterRollback() throws Exception {
114 Collections.reverse(synchronizations);
115 for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
116 Synchronization s = iter.next();
117 s.afterRollback();
118 }
119 }
120
121 @Override
122 public String toString() {
123 return super.toString() + "[synchronizations=" + synchronizations + "]";
124 }
125
126 public abstract void commit(boolean onePhase) throws XAException, IOException;
127
128 public abstract void rollback() throws XAException, IOException;
129
130 public abstract int prepare() throws XAException, IOException;
131
132 public abstract TransactionId getTransactionId();
133
134 public abstract Logger getLog();
135
136 public boolean isPrepared() {
137 return getState() == PREPARED_STATE;
138 }
139
140 public int size() {
141 return synchronizations.size();
142 }
143
144 protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
145 try {
146 postCommitTask.get();
147 } catch (InterruptedException e) {
148 throw new InterruptedIOException(e.toString());
149 } catch (ExecutionException e) {
150 Throwable t = e.getCause();
151 if (t instanceof XAException) {
152 throw (XAException) t;
153 } else if (t instanceof IOException) {
154 throw (IOException) t;
155 } else {
156 throw new XAException(e.toString());
157 }
158 }
159 }
160
161 protected void doPreCommit() throws XAException {
162 try {
163 fireBeforeCommit();
164 } catch (Throwable e) {
165 // I guess this could happen. Post commit task failed
166 // to execute properly.
167 getLog().warn("PRE COMMIT FAILED: ", e);
168 XAException xae = new XAException("PRE COMMIT FAILED");
169 xae.errorCode = XAException.XAER_RMERR;
170 xae.initCause(e);
171 throw xae;
172 }
173 }
174
175 protected void doPostCommit() throws XAException {
176 try {
177 fireAfterCommit();
178 } catch (Throwable e) {
179 // I guess this could happen. Post commit task failed
180 // to execute properly.
181 getLog().warn("POST COMMIT FAILED: ", e);
182 XAException xae = new XAException("POST COMMIT FAILED");
183 xae.errorCode = XAException.XAER_RMERR;
184 xae.initCause(e);
185 throw xae;
186 }
187 }
188 }