001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.DataInputStream;
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.Iterator;
023 import java.util.List;
024 import java.util.Map;
025 import java.util.concurrent.CancellationException;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.ExecutionException;
028 import java.util.concurrent.Future;
029
030 import org.apache.activemq.broker.ConnectionContext;
031 import org.apache.activemq.command.Message;
032 import org.apache.activemq.command.MessageAck;
033 import org.apache.activemq.command.MessageId;
034 import org.apache.activemq.command.TransactionId;
035 import org.apache.activemq.command.XATransactionId;
036 import org.apache.activemq.openwire.OpenWireFormat;
037 import org.apache.activemq.protobuf.Buffer;
038 import org.apache.activemq.store.AbstractMessageStore;
039 import org.apache.activemq.store.MessageStore;
040 import org.apache.activemq.store.ProxyMessageStore;
041 import org.apache.activemq.store.ProxyTopicMessageStore;
042 import org.apache.activemq.store.TopicMessageStore;
043 import org.apache.activemq.store.TransactionRecoveryListener;
044 import org.apache.activemq.store.TransactionStore;
045 import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
046 import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
047 import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
048 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
049 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
050 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
051 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
052 import org.apache.activemq.wireformat.WireFormat;
053 import org.slf4j.Logger;
054 import org.slf4j.LoggerFactory;
055
056 /**
057 * Provides a TransactionStore implementation that can create transaction aware
058 * MessageStore objects from non transaction aware MessageStore objects.
059 *
060 *
061 */
062 public class KahaDBTransactionStore implements TransactionStore {
063 static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
064 ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
065 private final WireFormat wireFormat = new OpenWireFormat();
066 private final KahaDBStore theStore;
067
068 public KahaDBTransactionStore(KahaDBStore theStore) {
069 this.theStore = theStore;
070 }
071
072 public class Tx {
073 private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
074
075 private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
076
077 public void add(AddMessageCommand msg) {
078 messages.add(msg);
079 }
080
081 public void add(RemoveMessageCommand ack) {
082 acks.add(ack);
083 }
084
085 public Message[] getMessages() {
086 Message rc[] = new Message[messages.size()];
087 int count = 0;
088 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
089 AddMessageCommand cmd = iter.next();
090 rc[count++] = cmd.getMessage();
091 }
092 return rc;
093 }
094
095 public MessageAck[] getAcks() {
096 MessageAck rc[] = new MessageAck[acks.size()];
097 int count = 0;
098 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
099 RemoveMessageCommand cmd = iter.next();
100 rc[count++] = cmd.getMessageAck();
101 }
102 return rc;
103 }
104
105 /**
106 * @return true if something to commit
107 * @throws IOException
108 */
109 public List<Future<Object>> commit() throws IOException {
110 List<Future<Object>> results = new ArrayList<Future<Object>>();
111 // Do all the message adds.
112 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
113 AddMessageCommand cmd = iter.next();
114 results.add(cmd.run());
115
116 }
117 // And removes..
118 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
119 RemoveMessageCommand cmd = iter.next();
120 cmd.run();
121 results.add(cmd.run());
122 }
123
124 return results;
125 }
126 }
127
128 public abstract class AddMessageCommand {
129 private final ConnectionContext ctx;
130 AddMessageCommand(ConnectionContext ctx) {
131 this.ctx = ctx;
132 }
133 abstract Message getMessage();
134 Future<Object> run() throws IOException {
135 return run(this.ctx);
136 }
137 abstract Future<Object> run(ConnectionContext ctx) throws IOException;
138 }
139
140 public abstract class RemoveMessageCommand {
141
142 private final ConnectionContext ctx;
143 RemoveMessageCommand(ConnectionContext ctx) {
144 this.ctx = ctx;
145 }
146 abstract MessageAck getMessageAck();
147 Future<Object> run() throws IOException {
148 return run(this.ctx);
149 }
150 abstract Future<Object> run(ConnectionContext context) throws IOException;
151 }
152
153 public MessageStore proxy(MessageStore messageStore) {
154 return new ProxyMessageStore(messageStore) {
155 @Override
156 public void addMessage(ConnectionContext context, final Message send) throws IOException {
157 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
158 }
159
160 @Override
161 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
162 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
163 }
164
165 @Override
166 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
167 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
168 }
169
170 @Override
171 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
172 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
173 }
174
175 @Override
176 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
177 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
178 }
179
180 @Override
181 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
182 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
183 }
184 };
185 }
186
187 public TopicMessageStore proxy(TopicMessageStore messageStore) {
188 return new ProxyTopicMessageStore(messageStore) {
189 @Override
190 public void addMessage(ConnectionContext context, final Message send) throws IOException {
191 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
192 }
193
194 @Override
195 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
196 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
197 }
198
199 @Override
200 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
201 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
202 }
203
204 @Override
205 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
206 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
207 }
208
209 @Override
210 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
211 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
212 }
213
214 @Override
215 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
216 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
217 }
218
219 @Override
220 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
221 MessageId messageId, MessageAck ack) throws IOException {
222 KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
223 subscriptionName, messageId, ack);
224 }
225
226 };
227 }
228
229 /**
230 * @throws IOException
231 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
232 */
233 public void prepare(TransactionId txid) throws IOException {
234 KahaTransactionInfo info = getTransactionInfo(txid);
235 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
236 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
237 } else {
238 Tx tx = inflightTransactions.remove(txid);
239 if (tx != null) {
240 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
241 }
242 }
243 }
244
245 public Tx getTx(Object txid) {
246 Tx tx = inflightTransactions.get(txid);
247 if (tx == null) {
248 tx = new Tx();
249 inflightTransactions.put(txid, tx);
250 }
251 return tx;
252 }
253
254 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
255 throws IOException {
256 if (txid != null) {
257 if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
258 if (preCommit != null) {
259 preCommit.run();
260 }
261 Tx tx = inflightTransactions.remove(txid);
262 if (tx != null) {
263 List<Future<Object>> results = tx.commit();
264 boolean doneSomething = false;
265 for (Future<Object> result : results) {
266 try {
267 result.get();
268 } catch (InterruptedException e) {
269 theStore.brokerService.handleIOException(new IOException(e.getMessage()));
270 } catch (ExecutionException e) {
271 theStore.brokerService.handleIOException(new IOException(e.getMessage()));
272 }catch(CancellationException e) {
273 }
274 if (!result.isCancelled()) {
275 doneSomething = true;
276 }
277 }
278 if (postCommit != null) {
279 postCommit.run();
280 }
281 if (doneSomething) {
282 KahaTransactionInfo info = getTransactionInfo(txid);
283 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
284 }
285 }else {
286 //The Tx will be null for failed over clients - lets run their post commits
287 if (postCommit != null) {
288 postCommit.run();
289 }
290 }
291
292 } else {
293 KahaTransactionInfo info = getTransactionInfo(txid);
294 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
295 forgetRecoveredAcks(txid);
296 }
297 }else {
298 LOG.error("Null transaction passed on commit");
299 }
300 }
301
302 /**
303 * @throws IOException
304 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
305 */
306 public void rollback(TransactionId txid) throws IOException {
307 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
308 KahaTransactionInfo info = getTransactionInfo(txid);
309 theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
310 forgetRecoveredAcks(txid);
311 } else {
312 inflightTransactions.remove(txid);
313 }
314 }
315
316 protected void forgetRecoveredAcks(TransactionId txid) throws IOException {
317 if (txid.isXATransaction()) {
318 XATransactionId xaTid = ((XATransactionId) txid);
319 theStore.forgetRecoveredAcks(xaTid.getPreparedAcks());
320 }
321 }
322
323 public void start() throws Exception {
324 }
325
326 public void stop() throws Exception {
327 }
328
329 public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
330 for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
331 XATransactionId xid = (XATransactionId) entry.getKey();
332 ArrayList<Message> messageList = new ArrayList<Message>();
333 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
334
335 for (Operation op : entry.getValue()) {
336 if (op.getClass() == AddOpperation.class) {
337 AddOpperation addOp = (AddOpperation) op;
338 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
339 .newInput()));
340 messageList.add(msg);
341 } else {
342 RemoveOpperation rmOp = (RemoveOpperation) op;
343 Buffer ackb = rmOp.getCommand().getAck();
344 MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
345 ackList.add(ack);
346 }
347 }
348
349 Message[] addedMessages = new Message[messageList.size()];
350 MessageAck[] acks = new MessageAck[ackList.size()];
351 messageList.toArray(addedMessages);
352 ackList.toArray(acks);
353 xid.setPreparedAcks(ackList);
354 theStore.trackRecoveredAcks(ackList);
355 listener.recover(xid, addedMessages, acks);
356 }
357 }
358
359 /**
360 * @param message
361 * @throws IOException
362 */
363 void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
364 throws IOException {
365
366 if (message.getTransactionId() != null) {
367 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
368 destination.addMessage(context, message);
369 } else {
370 Tx tx = getTx(message.getTransactionId());
371 tx.add(new AddMessageCommand(context) {
372 @Override
373 public Message getMessage() {
374 return message;
375 }
376 @Override
377 public Future<Object> run(ConnectionContext ctx) throws IOException {
378 destination.addMessage(ctx, message);
379 return AbstractMessageStore.FUTURE;
380 }
381
382 });
383 }
384 } else {
385 destination.addMessage(context, message);
386 }
387 }
388
389 Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
390 throws IOException {
391
392 if (message.getTransactionId() != null) {
393 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
394 destination.addMessage(context, message);
395 return AbstractMessageStore.FUTURE;
396 } else {
397 Tx tx = getTx(message.getTransactionId());
398 tx.add(new AddMessageCommand(context) {
399 @Override
400 public Message getMessage() {
401 return message;
402 }
403 @Override
404 public Future<Object> run(ConnectionContext ctx) throws IOException {
405 return destination.asyncAddQueueMessage(ctx, message);
406 }
407
408 });
409 return AbstractMessageStore.FUTURE;
410 }
411 } else {
412 return destination.asyncAddQueueMessage(context, message);
413 }
414 }
415
416 Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
417 throws IOException {
418
419 if (message.getTransactionId() != null) {
420 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
421 destination.addMessage(context, message);
422 return AbstractMessageStore.FUTURE;
423 } else {
424 Tx tx = getTx(message.getTransactionId());
425 tx.add(new AddMessageCommand(context) {
426 @Override
427 public Message getMessage() {
428 return message;
429 }
430 @Override
431 public Future<Object> run(ConnectionContext ctx) throws IOException {
432 return destination.asyncAddTopicMessage(ctx, message);
433 }
434
435 });
436 return AbstractMessageStore.FUTURE;
437 }
438 } else {
439 return destination.asyncAddTopicMessage(context, message);
440 }
441 }
442
443 /**
444 * @param ack
445 * @throws IOException
446 */
447 final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
448 throws IOException {
449
450 if (ack.isInTransaction()) {
451 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
452 destination.removeMessage(context, ack);
453 } else {
454 Tx tx = getTx(ack.getTransactionId());
455 tx.add(new RemoveMessageCommand(context) {
456 @Override
457 public MessageAck getMessageAck() {
458 return ack;
459 }
460
461 @Override
462 public Future<Object> run(ConnectionContext ctx) throws IOException {
463 destination.removeMessage(ctx, ack);
464 return AbstractMessageStore.FUTURE;
465 }
466 });
467 }
468 } else {
469 destination.removeMessage(context, ack);
470 }
471 }
472
473 final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
474 throws IOException {
475
476 if (ack.isInTransaction()) {
477 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
478 destination.removeAsyncMessage(context, ack);
479 } else {
480 Tx tx = getTx(ack.getTransactionId());
481 tx.add(new RemoveMessageCommand(context) {
482 @Override
483 public MessageAck getMessageAck() {
484 return ack;
485 }
486
487 @Override
488 public Future<Object> run(ConnectionContext ctx) throws IOException {
489 destination.removeMessage(ctx, ack);
490 return AbstractMessageStore.FUTURE;
491 }
492 });
493 }
494 } else {
495 destination.removeAsyncMessage(context, ack);
496 }
497 }
498
499 final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
500 final MessageId messageId, final MessageAck ack) throws IOException {
501
502 if (ack.isInTransaction()) {
503 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
504 destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
505 } else {
506 Tx tx = getTx(ack.getTransactionId());
507 tx.add(new RemoveMessageCommand(context) {
508 public MessageAck getMessageAck() {
509 return ack;
510 }
511
512 public Future<Object> run(ConnectionContext ctx) throws IOException {
513 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
514 return AbstractMessageStore.FUTURE;
515 }
516 });
517 }
518 } else {
519 destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
520 }
521 }
522
523
524 private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
525 return theStore.getTransactionIdTransformer().transform(txid);
526 }
527 }