001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.store.amq;
019
020 import java.io.IOException;
021 import java.util.Iterator;
022 import java.util.LinkedHashMap;
023 import java.util.Map;
024 import javax.transaction.xa.XAException;
025 import org.apache.activemq.command.JournalTopicAck;
026 import org.apache.activemq.command.JournalTransaction;
027 import org.apache.activemq.command.Message;
028 import org.apache.activemq.command.MessageAck;
029 import org.apache.activemq.command.TransactionId;
030 import org.apache.activemq.command.XATransactionId;
031 import org.apache.activemq.kaha.impl.async.Location;
032 import org.apache.activemq.store.TransactionRecoveryListener;
033 import org.apache.activemq.store.TransactionStore;
034
035 /**
036 */
037 public class AMQTransactionStore implements TransactionStore {
038
039 protected Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId, AMQTx>();
040 Map<TransactionId, AMQTx> preparedTransactions = new LinkedHashMap<TransactionId, AMQTx>();
041
042 private final AMQPersistenceAdapter peristenceAdapter;
043 private boolean doingRecover;
044
045 public AMQTransactionStore(AMQPersistenceAdapter adapter) {
046 this.peristenceAdapter = adapter;
047 }
048
049 /**
050 * @throws IOException
051 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
052 */
053 public void prepare(TransactionId txid) throws IOException {
054 AMQTx tx = null;
055 synchronized (inflightTransactions) {
056 tx = inflightTransactions.remove(txid);
057 }
058 if (tx == null) {
059 return;
060 }
061 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
062 synchronized (preparedTransactions) {
063 preparedTransactions.put(txid, tx);
064 }
065 }
066
067 /**
068 * @throws IOException
069 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
070 */
071 public void replayPrepare(TransactionId txid) throws IOException {
072 AMQTx tx = null;
073 synchronized (inflightTransactions) {
074 tx = inflightTransactions.remove(txid);
075 }
076 if (tx == null) {
077 return;
078 }
079 synchronized (preparedTransactions) {
080 preparedTransactions.put(txid, tx);
081 }
082 }
083
084 public AMQTx getTx(TransactionId txid, Location location) {
085 AMQTx tx = null;
086 synchronized (inflightTransactions) {
087 tx = inflightTransactions.get(txid);
088 if (tx == null) {
089 tx = new AMQTx(location);
090 inflightTransactions.put(txid, tx);
091 }
092 }
093 return tx;
094 }
095
096 /**
097 * @throws XAException
098 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
099 */
100 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
101 if (preCommit != null) {
102 preCommit.run();
103 }
104 AMQTx tx;
105 if (wasPrepared) {
106 synchronized (preparedTransactions) {
107 tx = preparedTransactions.remove(txid);
108 }
109 } else {
110 synchronized (inflightTransactions) {
111 tx = inflightTransactions.remove(txid);
112 }
113 }
114 if (tx == null) {
115 if (postCommit != null) {
116 postCommit.run();
117 }
118 return;
119 }
120 if (txid.isXATransaction()) {
121 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), true,true);
122 } else {
123 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
124 }
125 if (postCommit != null) {
126 postCommit.run();
127 }
128 }
129
130 /**
131 * @throws XAException
132 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
133 */
134 public AMQTx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
135 if (wasPrepared) {
136 synchronized (preparedTransactions) {
137 return preparedTransactions.remove(txid);
138 }
139 } else {
140 synchronized (inflightTransactions) {
141 return inflightTransactions.remove(txid);
142 }
143 }
144 }
145
146 /**
147 * @throws IOException
148 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
149 */
150 public void rollback(TransactionId txid) throws IOException {
151 AMQTx tx = null;
152 synchronized (inflightTransactions) {
153 tx = inflightTransactions.remove(txid);
154 }
155 if (tx != null) {
156 synchronized (preparedTransactions) {
157 tx = preparedTransactions.remove(txid);
158 }
159 }
160 if (tx != null) {
161 if (txid.isXATransaction()) {
162 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), true,true);
163 } else {
164 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), true,true);
165 }
166 }
167 }
168
169 /**
170 * @throws IOException
171 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
172 */
173 public void replayRollback(TransactionId txid) throws IOException {
174 boolean inflight = false;
175 synchronized (inflightTransactions) {
176 inflight = inflightTransactions.remove(txid) != null;
177 }
178 if (inflight) {
179 synchronized (preparedTransactions) {
180 preparedTransactions.remove(txid);
181 }
182 }
183 }
184
185 public void start() throws Exception {
186 }
187
188 public void stop() throws Exception {
189 }
190
191 public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
192 // All the in-flight transactions get rolled back..
193 synchronized (inflightTransactions) {
194 inflightTransactions.clear();
195 }
196 this.doingRecover = true;
197 try {
198 Map<TransactionId, AMQTx> txs = null;
199 synchronized (preparedTransactions) {
200 txs = new LinkedHashMap<TransactionId, AMQTx>(preparedTransactions);
201 }
202 for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
203 Object txid = iter.next();
204 AMQTx tx = txs.get(txid);
205 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
206 }
207 } finally {
208 this.doingRecover = false;
209 }
210 }
211
212 /**
213 * @param message
214 * @throws IOException
215 */
216 void addMessage(AMQMessageStore store, Message message, Location location) throws IOException {
217 AMQTx tx = getTx(message.getTransactionId(), location);
218 tx.add(store, message, location);
219 }
220
221 /**
222 * @param ack
223 * @throws IOException
224 */
225 public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException {
226 AMQTx tx = getTx(ack.getTransactionId(), location);
227 tx.add(store, ack);
228 }
229
230 public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
231 AMQTx tx = getTx(ack.getTransactionId(), location);
232 tx.add(store, ack);
233 }
234
235 public Location checkpoint() throws IOException {
236 // Nothing really to checkpoint.. since, we don't
237 // checkpoint tx operations in to long term store until they are
238 // committed.
239 // But we keep track of the first location of an operation
240 // that was associated with an active tx. The journal can not
241 // roll over active tx records.
242 Location minimumLocationInUse = null;
243 synchronized (inflightTransactions) {
244 for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
245 AMQTx tx = iter.next();
246 Location location = tx.getLocation();
247 if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
248 minimumLocationInUse = location;
249 }
250 }
251 }
252 synchronized (preparedTransactions) {
253 for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
254 AMQTx tx = iter.next();
255 Location location = tx.getLocation();
256 if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
257 minimumLocationInUse = location;
258 }
259 }
260 return minimumLocationInUse;
261 }
262 }
263
264 public boolean isDoingRecover() {
265 return doingRecover;
266 }
267
268 /**
269 * @return the preparedTransactions
270 */
271 public Map<TransactionId, AMQTx> getPreparedTransactions() {
272 return this.preparedTransactions;
273 }
274
275 /**
276 * @param preparedTransactions the preparedTransactions to set
277 */
278 public void setPreparedTransactions(Map<TransactionId, AMQTx> preparedTransactions) {
279 if (preparedTransactions != null) {
280 this.preparedTransactions.clear();
281 this.preparedTransactions.putAll(preparedTransactions);
282 }
283 }
284 }