001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.Date;
022 import java.util.HashSet;
023 import java.util.Set;
024 import java.util.TreeSet;
025 import java.util.concurrent.ConcurrentHashMap;
026 import java.util.concurrent.Future;
027
028 import org.apache.activemq.broker.Broker;
029 import org.apache.activemq.broker.ConnectionContext;
030 import org.apache.activemq.command.Message;
031 import org.apache.activemq.command.MessageAck;
032 import org.apache.activemq.command.MessageId;
033 import org.apache.activemq.command.TransactionId;
034 import org.apache.activemq.command.XATransactionId;
035 import org.apache.activemq.store.AbstractMessageStore;
036 import org.apache.activemq.store.MessageStore;
037 import org.apache.activemq.store.ProxyMessageStore;
038 import org.apache.activemq.store.ProxyTopicMessageStore;
039 import org.apache.activemq.store.TopicMessageStore;
040 import org.apache.activemq.store.TransactionRecoveryListener;
041 import org.apache.activemq.store.TransactionStore;
042 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
043 import org.apache.activemq.store.kahadb.data.KahaEntryType;
044 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
045 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
046 import org.apache.activemq.util.IOHelper;
047 import org.apache.kahadb.journal.Journal;
048 import org.apache.kahadb.journal.Location;
049 import org.apache.kahadb.util.DataByteArrayInputStream;
050 import org.apache.kahadb.util.DataByteArrayOutputStream;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 public class MultiKahaDBTransactionStore implements TransactionStore {
055 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
056 final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
057 final ConcurrentHashMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
058 final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
059 private Journal journal;
060 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
061 private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
062
063 public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
064 this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
065 }
066
067 public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
068 return new ProxyMessageStore(messageStore) {
069 @Override
070 public void addMessage(ConnectionContext context, final Message send) throws IOException {
071 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
072 }
073
074 @Override
075 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
076 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
077 }
078
079 @Override
080 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
081 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
082 }
083
084 @Override
085 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
086 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
087 }
088
089 @Override
090 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
091 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
092 }
093
094 @Override
095 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
096 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
097 }
098 };
099 }
100
101 public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
102 return new ProxyTopicMessageStore(messageStore) {
103 @Override
104 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
105 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
106 }
107
108 @Override
109 public void addMessage(ConnectionContext context, final Message send) throws IOException {
110 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
111 }
112
113 @Override
114 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
115 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
116 }
117
118 @Override
119 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
120 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
121 }
122
123 @Override
124 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
125 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
126 }
127
128 @Override
129 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
130 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
131 }
132
133 @Override
134 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
135 MessageId messageId, MessageAck ack) throws IOException {
136 MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
137 subscriptionName, messageId, ack);
138 }
139 };
140 }
141
142 public void deleteAllMessages() {
143 IOHelper.deleteChildren(getDirectory());
144 }
145
146 public int getJournalMaxFileLength() {
147 return journalMaxFileLength;
148 }
149
150 public void setJournalMaxFileLength(int journalMaxFileLength) {
151 this.journalMaxFileLength = journalMaxFileLength;
152 }
153
154 public int getJournalMaxWriteBatchSize() {
155 return journalWriteBatchSize;
156 }
157
158 public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
159 this.journalWriteBatchSize = journalWriteBatchSize;
160 }
161
162 public class Tx {
163 private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
164 private int prepareLocationId = 0;
165
166 public void trackStore(TransactionStore store) {
167 stores.add(store);
168 }
169
170 public Set<TransactionStore> getStores() {
171 return stores;
172 }
173
174 public void trackPrepareLocation(Location location) {
175 this.prepareLocationId = location.getDataFileId();
176 }
177
178 public int getPreparedLocationId() {
179 return prepareLocationId;
180 }
181 }
182
183 public Tx getTx(TransactionId txid) {
184 Tx tx = inflightTransactions.get(txid);
185 if (tx == null) {
186 tx = new Tx();
187 inflightTransactions.put(txid, tx);
188 }
189 return tx;
190 }
191
192 public Tx removeTx(TransactionId txid) {
193 return inflightTransactions.remove(txid);
194 }
195
196 public void prepare(TransactionId txid) throws IOException {
197 Tx tx = getTx(txid);
198 for (TransactionStore store : tx.getStores()) {
199 store.prepare(txid);
200 }
201 }
202
203 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
204 throws IOException {
205
206 if (preCommit != null) {
207 preCommit.run();
208 }
209
210 Tx tx = getTx(txid);
211 if (wasPrepared) {
212 for (TransactionStore store : tx.getStores()) {
213 store.commit(txid, true, null, null);
214 }
215 } else {
216 // can only do 1pc on a single store
217 if (tx.getStores().size() == 1) {
218 for (TransactionStore store : tx.getStores()) {
219 store.commit(txid, false, null, null);
220 }
221 } else {
222 // need to do local 2pc
223 for (TransactionStore store : tx.getStores()) {
224 store.prepare(txid);
225 }
226 persistOutcome(tx, txid);
227 for (TransactionStore store : tx.getStores()) {
228 store.commit(txid, true, null, null);
229 }
230 persistCompletion(txid);
231 }
232 }
233 removeTx(txid);
234 if (postCommit != null) {
235 postCommit.run();
236 }
237 }
238
239 public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
240 tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
241 }
242
243 public void persistCompletion(TransactionId txid) throws IOException {
244 store(new KahaCommitCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)));
245 }
246
247 private Location store(JournalCommand<?> data) throws IOException {
248 int size = data.serializedSizeFramed();
249 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
250 os.writeByte(data.type().getNumber());
251 data.writeFramed(os);
252 Location location = journal.write(os.toByteSequence(), true);
253 journal.setLastAppendLocation(location);
254 return location;
255 }
256
257 public void rollback(TransactionId txid) throws IOException {
258 Tx tx = removeTx(txid);
259 if (tx != null) {
260 for (TransactionStore store : tx.getStores()) {
261 store.rollback(txid);
262 }
263 }
264 }
265
266 public void start() throws Exception {
267 journal = new Journal() {
268 @Override
269 protected void cleanup() {
270 super.cleanup();
271 txStoreCleanup();
272 }
273 };
274 journal.setDirectory(getDirectory());
275 journal.setMaxFileLength(journalMaxFileLength);
276 journal.setWriteBatchSize(journalWriteBatchSize);
277 IOHelper.mkdirs(journal.getDirectory());
278 journal.start();
279 recoverPendingLocalTransactions();
280 store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
281 }
282
283 private void txStoreCleanup() {
284 Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
285 for (Tx tx : inflightTransactions.values()) {
286 knownDataFileIds.remove(tx.getPreparedLocationId());
287 }
288 try {
289 journal.removeDataFiles(knownDataFileIds);
290 } catch (Exception e) {
291 LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
292 }
293 }
294
295 private File getDirectory() {
296 return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
297 }
298
299 public void stop() throws Exception {
300 journal.close();
301 journal = null;
302 }
303
304 private void recoverPendingLocalTransactions() throws IOException {
305 Location location = journal.getNextLocation(null);
306 while (location != null) {
307 process(load(location));
308 location = journal.getNextLocation(location);
309 }
310 recoveredPendingCommit.addAll(inflightTransactions.keySet());
311 LOG.info("pending local transactions: " + recoveredPendingCommit);
312 }
313
314 public JournalCommand<?> load(Location location) throws IOException {
315 DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
316 byte readByte = is.readByte();
317 KahaEntryType type = KahaEntryType.valueOf(readByte);
318 if (type == null) {
319 throw new IOException("Could not load journal record. Invalid location: " + location);
320 }
321 JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
322 message.mergeFramed(is);
323 return message;
324 }
325
326 public void process(JournalCommand<?> command) throws IOException {
327 switch (command.type()) {
328 case KAHA_PREPARE_COMMAND:
329 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
330 getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
331 break;
332 case KAHA_COMMIT_COMMAND:
333 KahaCommitCommand commitCommand = (KahaCommitCommand) command;
334 removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
335 break;
336 case KAHA_TRACE_COMMAND:
337 break;
338 default:
339 throw new IOException("Unexpected command in transaction journal: " + command);
340 }
341 }
342
343
344 public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
345
346 for (final KahaDBPersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
347 adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
348 @Override
349 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
350 try {
351 getTx(xid).trackStore(adapter.createTransactionStore());
352 } catch (IOException e) {
353 LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
354 }
355 listener.recover(xid, addedMessages, acks);
356 }
357 });
358 }
359
360 try {
361 Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
362 // force completion of local xa
363 for (TransactionId txid : broker.getPreparedTransactions(null)) {
364 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
365 try {
366 if (recoveredPendingCommit.contains(txid)) {
367 LOG.info("delivering pending commit outcome for tid: " + txid);
368 broker.commitTransaction(null, txid, false);
369
370 } else {
371 LOG.info("delivering rollback outcome to store for tid: " + txid);
372 broker.forgetTransaction(null, txid);
373 }
374 persistCompletion(txid);
375 } catch (Exception ex) {
376 LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
377 }
378 }
379 }
380 } catch (Exception e) {
381 LOG.error("failed to resolve pending local transactions", e);
382 }
383 }
384
385 void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
386 throws IOException {
387 if (message.getTransactionId() != null) {
388 getTx(message.getTransactionId()).trackStore(transactionStore);
389 }
390 destination.addMessage(context, message);
391 }
392
393 Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
394 throws IOException {
395 if (message.getTransactionId() != null) {
396 getTx(message.getTransactionId()).trackStore(transactionStore);
397 destination.addMessage(context, message);
398 return AbstractMessageStore.FUTURE;
399 } else {
400 return destination.asyncAddQueueMessage(context, message);
401 }
402 }
403
404 Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
405 throws IOException {
406
407 if (message.getTransactionId() != null) {
408 getTx(message.getTransactionId()).trackStore(transactionStore);
409 destination.addMessage(context, message);
410 return AbstractMessageStore.FUTURE;
411 } else {
412 return destination.asyncAddTopicMessage(context, message);
413 }
414 }
415
416 final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
417 throws IOException {
418 if (ack.getTransactionId() != null) {
419 getTx(ack.getTransactionId()).trackStore(transactionStore);
420 }
421 destination.removeMessage(context, ack);
422 }
423
424 final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
425 throws IOException {
426 if (ack.getTransactionId() != null) {
427 getTx(ack.getTransactionId()).trackStore(transactionStore);
428 }
429 destination.removeAsyncMessage(context, ack);
430 }
431
432 final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
433 final String clientId, final String subscriptionName,
434 final MessageId messageId, final MessageAck ack) throws IOException {
435 if (ack.getTransactionId() != null) {
436 getTx(ack.getTransactionId()).trackStore(transactionStore);
437 }
438 destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
439 }
440 }