001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.store.journal;
019
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.Iterator;
023 import java.util.LinkedHashMap;
024 import java.util.Map;
025 import javax.transaction.xa.XAException;
026 import org.apache.activeio.journal.RecordLocation;
027 import org.apache.activemq.command.JournalTopicAck;
028 import org.apache.activemq.command.JournalTransaction;
029 import org.apache.activemq.command.Message;
030 import org.apache.activemq.command.MessageAck;
031 import org.apache.activemq.command.TransactionId;
032 import org.apache.activemq.command.XATransactionId;
033 import org.apache.activemq.store.TransactionRecoveryListener;
034 import org.apache.activemq.store.TransactionStore;
035
036 /**
037 */
038 public class JournalTransactionStore implements TransactionStore {
039
040 private final JournalPersistenceAdapter peristenceAdapter;
041 private final Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
042 private final Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
043 private boolean doingRecover;
044
045 public static class TxOperation {
046
047 static final byte ADD_OPERATION_TYPE = 0;
048 static final byte REMOVE_OPERATION_TYPE = 1;
049 static final byte ACK_OPERATION_TYPE = 3;
050
051 public byte operationType;
052 public JournalMessageStore store;
053 public Object data;
054
055 public TxOperation(byte operationType, JournalMessageStore store, Object data) {
056 this.operationType = operationType;
057 this.store = store;
058 this.data = data;
059 }
060
061 }
062
063 /**
064 * Operations
065 *
066 *
067 */
068 public static class Tx {
069
070 private final RecordLocation location;
071 private final ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
072
073 public Tx(RecordLocation location) {
074 this.location = location;
075 }
076
077 public void add(JournalMessageStore store, Message msg) {
078 operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
079 }
080
081 public void add(JournalMessageStore store, MessageAck ack) {
082 operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
083 }
084
085 public void add(JournalTopicMessageStore store, JournalTopicAck ack) {
086 operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
087 }
088
089 public Message[] getMessages() {
090 ArrayList<Object> list = new ArrayList<Object>();
091 for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
092 TxOperation op = iter.next();
093 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
094 list.add(op.data);
095 }
096 }
097 Message rc[] = new Message[list.size()];
098 list.toArray(rc);
099 return rc;
100 }
101
102 public MessageAck[] getAcks() {
103 ArrayList<Object> list = new ArrayList<Object>();
104 for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
105 TxOperation op = iter.next();
106 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
107 list.add(op.data);
108 }
109 }
110 MessageAck rc[] = new MessageAck[list.size()];
111 list.toArray(rc);
112 return rc;
113 }
114
115 public ArrayList<TxOperation> getOperations() {
116 return operations;
117 }
118
119 }
120
121 public JournalTransactionStore(JournalPersistenceAdapter adapter) {
122 this.peristenceAdapter = adapter;
123 }
124
125 /**
126 * @throws IOException
127 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
128 */
129 public void prepare(TransactionId txid) throws IOException {
130 Tx tx = null;
131 synchronized (inflightTransactions) {
132 tx = inflightTransactions.remove(txid);
133 }
134 if (tx == null) {
135 return;
136 }
137 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false),
138 true);
139 synchronized (preparedTransactions) {
140 preparedTransactions.put(txid, tx);
141 }
142 }
143
144 /**
145 * @throws IOException
146 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
147 */
148 public void replayPrepare(TransactionId txid) throws IOException {
149 Tx tx = null;
150 synchronized (inflightTransactions) {
151 tx = inflightTransactions.remove(txid);
152 }
153 if (tx == null) {
154 return;
155 }
156 synchronized (preparedTransactions) {
157 preparedTransactions.put(txid, tx);
158 }
159 }
160
161 public Tx getTx(Object txid, RecordLocation location) {
162 Tx tx = null;
163 synchronized (inflightTransactions) {
164 tx = inflightTransactions.get(txid);
165 }
166 if (tx == null) {
167 tx = new Tx(location);
168 inflightTransactions.put(txid, tx);
169 }
170 return tx;
171 }
172
173 /**
174 * @throws XAException
175 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
176 */
177 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
178 Tx tx;
179 if (preCommit != null) {
180 preCommit.run();
181 }
182 if (wasPrepared) {
183 synchronized (preparedTransactions) {
184 tx = preparedTransactions.remove(txid);
185 }
186 } else {
187 synchronized (inflightTransactions) {
188 tx = inflightTransactions.remove(txid);
189 }
190 }
191 if (tx == null) {
192 if (postCommit != null) {
193 postCommit.run();
194 }
195 return;
196 }
197 if (txid.isXATransaction()) {
198 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid,
199 wasPrepared), true);
200 } else {
201 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
202 wasPrepared), true);
203 }
204 if (postCommit != null) {
205 postCommit.run();
206 }
207 }
208
209 /**
210 * @throws XAException
211 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
212 */
213 public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
214 if (wasPrepared) {
215 synchronized (preparedTransactions) {
216 return preparedTransactions.remove(txid);
217 }
218 } else {
219 synchronized (inflightTransactions) {
220 return inflightTransactions.remove(txid);
221 }
222 }
223 }
224
225 /**
226 * @throws IOException
227 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
228 */
229 public void rollback(TransactionId txid) throws IOException {
230 Tx tx = null;
231 synchronized (inflightTransactions) {
232 tx = inflightTransactions.remove(txid);
233 }
234 if (tx != null) {
235 synchronized (preparedTransactions) {
236 tx = preparedTransactions.remove(txid);
237 }
238 }
239 if (tx != null) {
240 if (txid.isXATransaction()) {
241 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid,
242 false), true);
243 } else {
244 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,
245 txid, false), true);
246 }
247 }
248 }
249
250 /**
251 * @throws IOException
252 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
253 */
254 public void replayRollback(TransactionId txid) throws IOException {
255 boolean inflight = false;
256 synchronized (inflightTransactions) {
257 inflight = inflightTransactions.remove(txid) != null;
258 }
259 if (inflight) {
260 synchronized (preparedTransactions) {
261 preparedTransactions.remove(txid);
262 }
263 }
264 }
265
266 public void start() throws Exception {
267 }
268
269 public void stop() throws Exception {
270 }
271
272 public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
273 // All the in-flight transactions get rolled back..
274 synchronized (inflightTransactions) {
275 inflightTransactions.clear();
276 }
277 this.doingRecover = true;
278 try {
279 Map<TransactionId, Tx> txs = null;
280 synchronized (preparedTransactions) {
281 txs = new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
282 }
283 for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
284 Object txid = iter.next();
285 Tx tx = txs.get(txid);
286 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
287 }
288 } finally {
289 this.doingRecover = false;
290 }
291 }
292
293 /**
294 * @param message
295 * @throws IOException
296 */
297 void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException {
298 Tx tx = getTx(message.getTransactionId(), location);
299 tx.add(store, message);
300 }
301
302 /**
303 * @param ack
304 * @throws IOException
305 */
306 public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location)
307 throws IOException {
308 Tx tx = getTx(ack.getTransactionId(), location);
309 tx.add(store, ack);
310 }
311
312 public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
313 Tx tx = getTx(ack.getTransactionId(), location);
314 tx.add(store, ack);
315 }
316
317 public RecordLocation checkpoint() throws IOException {
318 // Nothing really to checkpoint.. since, we don't
319 // checkpoint tx operations in to long term store until they are
320 // committed.
321 // But we keep track of the first location of an operation
322 // that was associated with an active tx. The journal can not
323 // roll over active tx records.
324 RecordLocation rc = null;
325 synchronized (inflightTransactions) {
326 for (Iterator<Tx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
327 Tx tx = iter.next();
328 RecordLocation location = tx.location;
329 if (rc == null || rc.compareTo(location) < 0) {
330 rc = location;
331 }
332 }
333 }
334 synchronized (preparedTransactions) {
335 for (Iterator<Tx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
336 Tx tx = iter.next();
337 RecordLocation location = tx.location;
338 if (rc == null || rc.compareTo(location) < 0) {
339 rc = location;
340 }
341 }
342 return rc;
343 }
344 }
345
346 public boolean isDoingRecover() {
347 return doingRecover;
348 }
349
350 }