001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.InterruptedIOException;
020 import java.util.ArrayList;
021 import java.util.Arrays;
022 import java.util.HashMap;
023 import java.util.List;
024
025 import javax.jms.JMSException;
026 import javax.jms.TransactionInProgressException;
027 import javax.jms.TransactionRolledBackException;
028 import javax.transaction.xa.XAException;
029 import javax.transaction.xa.XAResource;
030 import javax.transaction.xa.Xid;
031
032 import org.apache.activemq.command.Command;
033 import org.apache.activemq.command.ConnectionId;
034 import org.apache.activemq.command.DataArrayResponse;
035 import org.apache.activemq.command.DataStructure;
036 import org.apache.activemq.command.IntegerResponse;
037 import org.apache.activemq.command.LocalTransactionId;
038 import org.apache.activemq.command.Response;
039 import org.apache.activemq.command.TransactionId;
040 import org.apache.activemq.command.TransactionInfo;
041 import org.apache.activemq.command.XATransactionId;
042 import org.apache.activemq.transaction.Synchronization;
043 import org.apache.activemq.util.JMSExceptionSupport;
044 import org.apache.activemq.util.LongSequenceGenerator;
045 import org.slf4j.Logger;
046 import org.slf4j.LoggerFactory;
047
048 /**
049 * A TransactionContext provides the means to control a JMS transaction. It
050 * provides a local transaction interface and also an XAResource interface. <p/>
051 * An application server controls the transactional assignment of an XASession
052 * by obtaining its XAResource. It uses the XAResource to assign the session to
053 * a transaction, prepare and commit work on the transaction, and so on. <p/> An
054 * XAResource provides some fairly sophisticated facilities for interleaving
055 * work on multiple transactions, recovering a list of transactions in progress,
056 * and so on. A JTA aware JMS provider must fully implement this functionality.
057 * This could be done by using the services of a database that supports XA, or a
058 * JMS provider may choose to implement this functionality from scratch. <p/>
059 *
060 *
061 * @see javax.jms.Session
062 * @see javax.jms.QueueSession
063 * @see javax.jms.TopicSession
064 * @see javax.jms.XASession
065 */
066 public class TransactionContext implements XAResource {
067
068 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
069
070 // XATransactionId -> ArrayList of TransactionContext objects
071 private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
072 new HashMap<TransactionId, List<TransactionContext>>();
073
074 private final ActiveMQConnection connection;
075 private final LongSequenceGenerator localTransactionIdGenerator;
076 private final ConnectionId connectionId;
077 private List<Synchronization> synchronizations;
078
079 // To track XA transactions.
080 private Xid associatedXid;
081 private TransactionId transactionId;
082 private LocalTransactionEventListener localTransactionEventListener;
083 private int beforeEndIndex;
084
085 public TransactionContext(ActiveMQConnection connection) {
086 this.connection = connection;
087 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
088 this.connectionId = connection.getConnectionInfo().getConnectionId();
089 }
090
091 public boolean isInXATransaction() {
092 if (transactionId != null && transactionId.isXATransaction()) {
093 return true;
094 } else {
095 if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) {
096 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
097 for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
098 if (transactions.contains(this)) {
099 return true;
100 }
101 }
102 }
103 }
104 }
105
106 return false;
107 }
108
109 public boolean isInLocalTransaction() {
110 return transactionId != null && transactionId.isLocalTransaction();
111 }
112
113 public boolean isInTransaction() {
114 return transactionId != null;
115 }
116
117 /**
118 * @return Returns the localTransactionEventListener.
119 */
120 public LocalTransactionEventListener getLocalTransactionEventListener() {
121 return localTransactionEventListener;
122 }
123
124 /**
125 * Used by the resource adapter to listen to transaction events.
126 *
127 * @param localTransactionEventListener The localTransactionEventListener to
128 * set.
129 */
130 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
131 this.localTransactionEventListener = localTransactionEventListener;
132 }
133
134 // ///////////////////////////////////////////////////////////
135 //
136 // Methods that work with the Synchronization objects registered with
137 // the transaction.
138 //
139 // ///////////////////////////////////////////////////////////
140
141 public void addSynchronization(Synchronization s) {
142 if (synchronizations == null) {
143 synchronizations = new ArrayList<Synchronization>(10);
144 }
145 synchronizations.add(s);
146 }
147
148 private void afterRollback() throws JMSException {
149 if (synchronizations == null) {
150 return;
151 }
152
153 Throwable firstException = null;
154 int size = synchronizations.size();
155 for (int i = 0; i < size; i++) {
156 try {
157 synchronizations.get(i).afterRollback();
158 } catch (Throwable t) {
159 LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
160 if (firstException == null) {
161 firstException = t;
162 }
163 }
164 }
165 synchronizations = null;
166 if (firstException != null) {
167 throw JMSExceptionSupport.create(firstException);
168 }
169 }
170
171 private void afterCommit() throws JMSException {
172 if (synchronizations == null) {
173 return;
174 }
175
176 Throwable firstException = null;
177 int size = synchronizations.size();
178 for (int i = 0; i < size; i++) {
179 try {
180 synchronizations.get(i).afterCommit();
181 } catch (Throwable t) {
182 LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
183 if (firstException == null) {
184 firstException = t;
185 }
186 }
187 }
188 synchronizations = null;
189 if (firstException != null) {
190 throw JMSExceptionSupport.create(firstException);
191 }
192 }
193
194 private void beforeEnd() throws JMSException {
195 if (synchronizations == null) {
196 return;
197 }
198
199 int size = synchronizations.size();
200 try {
201 for (;beforeEndIndex < size;) {
202 synchronizations.get(beforeEndIndex++).beforeEnd();
203 }
204 } catch (JMSException e) {
205 throw e;
206 } catch (Throwable e) {
207 throw JMSExceptionSupport.create(e);
208 }
209 }
210
211 public TransactionId getTransactionId() {
212 return transactionId;
213 }
214
215 // ///////////////////////////////////////////////////////////
216 //
217 // Local transaction interface.
218 //
219 // ///////////////////////////////////////////////////////////
220
221 /**
222 * Start a local transaction.
223 * @throws javax.jms.JMSException on internal error
224 */
225 public void begin() throws JMSException {
226
227 if (isInXATransaction()) {
228 throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress.");
229 }
230
231 if (transactionId == null) {
232 synchronizations = null;
233 beforeEndIndex = 0;
234 this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
235 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
236 this.connection.ensureConnectionInfoSent();
237 this.connection.asyncSendPacket(info);
238
239 // Notify the listener that the tx was started.
240 if (localTransactionEventListener != null) {
241 localTransactionEventListener.beginEvent();
242 }
243 if (LOG.isDebugEnabled()) {
244 LOG.debug("Begin:" + transactionId);
245 }
246 }
247
248 }
249
250 /**
251 * Rolls back any work done in this transaction and releases any locks
252 * currently held.
253 *
254 * @throws JMSException if the JMS provider fails to roll back the
255 * transaction due to some internal error.
256 * @throws javax.jms.IllegalStateException if the method is not called by a
257 * transacted session.
258 */
259 public void rollback() throws JMSException {
260 if (isInXATransaction()) {
261 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
262 }
263
264 try {
265 beforeEnd();
266 } catch (TransactionRolledBackException canOcurrOnFailover) {
267 LOG.warn("rollback processing error", canOcurrOnFailover);
268 }
269 if (transactionId != null) {
270 if (LOG.isDebugEnabled()) {
271 LOG.debug("Rollback: " + transactionId
272 + " syncCount: "
273 + (synchronizations != null ? synchronizations.size() : 0));
274 }
275
276 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
277 this.transactionId = null;
278 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
279 this.connection.syncSendPacket(info);
280 // Notify the listener that the tx was rolled back
281 if (localTransactionEventListener != null) {
282 localTransactionEventListener.rollbackEvent();
283 }
284 }
285
286 afterRollback();
287 }
288
289 /**
290 * Commits all work done in this transaction and releases any locks
291 * currently held.
292 *
293 * @throws JMSException if the JMS provider fails to commit the transaction
294 * due to some internal error.
295 * @throws javax.jms.IllegalStateException if the method is not called by a
296 * transacted session.
297 */
298 public void commit() throws JMSException {
299 if (isInXATransaction()) {
300 throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
301 }
302
303 try {
304 beforeEnd();
305 } catch (JMSException e) {
306 rollback();
307 throw e;
308 }
309
310 // Only send commit if the transaction was started.
311 if (transactionId != null) {
312 if (LOG.isDebugEnabled()) {
313 LOG.debug("Commit: " + transactionId
314 + " syncCount: "
315 + (synchronizations != null ? synchronizations.size() : 0));
316 }
317
318 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
319 this.transactionId = null;
320 // Notify the listener that the tx was committed back
321 try {
322 syncSendPacketWithInterruptionHandling(info);
323 if (localTransactionEventListener != null) {
324 localTransactionEventListener.commitEvent();
325 }
326 afterCommit();
327 } catch (JMSException cause) {
328 LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
329 if (localTransactionEventListener != null) {
330 localTransactionEventListener.rollbackEvent();
331 }
332 afterRollback();
333 throw cause;
334 }
335
336 }
337 }
338
339 // ///////////////////////////////////////////////////////////
340 //
341 // XAResource Implementation
342 //
343 // ///////////////////////////////////////////////////////////
344 /**
345 * Associates a transaction with the resource.
346 */
347 public void start(Xid xid, int flags) throws XAException {
348
349 if (LOG.isDebugEnabled()) {
350 LOG.debug("Start: " + xid);
351 }
352 if (isInLocalTransaction()) {
353 throw new XAException(XAException.XAER_PROTO);
354 }
355 // Are we already associated?
356 if (associatedXid != null) {
357 throw new XAException(XAException.XAER_PROTO);
358 }
359
360 // if ((flags & TMJOIN) == TMJOIN) {
361 // TODO: verify that the server has seen the xid
362 // // }
363 // if ((flags & TMJOIN) == TMRESUME) {
364 // // TODO: verify that the xid was suspended.
365 // }
366
367 // associate
368 synchronizations = null;
369 beforeEndIndex = 0;
370 setXid(xid);
371 }
372
373 /**
374 * @return connectionId for connection
375 */
376 private ConnectionId getConnectionId() {
377 return connection.getConnectionInfo().getConnectionId();
378 }
379
380 public void end(Xid xid, int flags) throws XAException {
381
382 if (LOG.isDebugEnabled()) {
383 LOG.debug("End: " + xid);
384 }
385
386 if (isInLocalTransaction()) {
387 throw new XAException(XAException.XAER_PROTO);
388 }
389
390 if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
391 // You can only suspend the associated xid.
392 if (!equals(associatedXid, xid)) {
393 throw new XAException(XAException.XAER_PROTO);
394 }
395
396 // TODO: we may want to put the xid in a suspended list.
397 try {
398 beforeEnd();
399 } catch (JMSException e) {
400 throw toXAException(e);
401 }
402 setXid(null);
403 } else if ((flags & TMSUCCESS) == TMSUCCESS) {
404 // set to null if this is the current xid.
405 // otherwise this could be an asynchronous success call
406 if (equals(associatedXid, xid)) {
407 try {
408 beforeEnd();
409 } catch (JMSException e) {
410 throw toXAException(e);
411 }
412 setXid(null);
413 }
414 } else {
415 throw new XAException(XAException.XAER_INVAL);
416 }
417 }
418
419 private boolean equals(Xid xid1, Xid xid2) {
420 if (xid1 == xid2) {
421 return true;
422 }
423 if (xid1 == null ^ xid2 == null) {
424 return false;
425 }
426 return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
427 && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
428 }
429
430 public int prepare(Xid xid) throws XAException {
431 if (LOG.isDebugEnabled()) {
432 LOG.debug("Prepare: " + xid);
433 }
434
435 // We allow interleaving multiple transactions, so
436 // we don't limit prepare to the associated xid.
437 XATransactionId x;
438 // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
439 // called first
440 if (xid == null || (equals(associatedXid, xid))) {
441 throw new XAException(XAException.XAER_PROTO);
442 } else {
443 // TODO: cache the known xids so we don't keep recreating this one??
444 x = new XATransactionId(xid);
445 }
446
447 try {
448 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
449
450 // Find out if the server wants to commit or rollback.
451 IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
452 if (XAResource.XA_RDONLY == response.getResult()) {
453 // transaction stops now, may be syncs that need a callback
454 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
455 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
456 if (l != null && !l.isEmpty()) {
457 if (LOG.isDebugEnabled()) {
458 LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
459 }
460 for (TransactionContext ctx : l) {
461 ctx.afterCommit();
462 }
463 }
464 }
465 }
466 return response.getResult();
467
468 } catch (JMSException e) {
469 LOG.warn("prepare of: " + x + " failed with: " + e, e);
470 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
471 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
472 if (l != null && !l.isEmpty()) {
473 for (TransactionContext ctx : l) {
474 try {
475 ctx.afterRollback();
476 } catch (Throwable ignored) {
477 if (LOG.isDebugEnabled()) {
478 LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " +
479 x + ", context: " + ctx, ignored);
480 }
481 }
482 }
483 }
484 }
485 throw toXAException(e);
486 }
487 }
488
489 public void rollback(Xid xid) throws XAException {
490
491 if (LOG.isDebugEnabled()) {
492 LOG.debug("Rollback: " + xid);
493 }
494
495 // We allow interleaving multiple transactions, so
496 // we don't limit rollback to the associated xid.
497 XATransactionId x;
498 if (xid == null) {
499 throw new XAException(XAException.XAER_PROTO);
500 }
501 if (equals(associatedXid, xid)) {
502 // I think this can happen even without an end(xid) call. Need to
503 // check spec.
504 x = (XATransactionId)transactionId;
505 } else {
506 x = new XATransactionId(xid);
507 }
508
509 try {
510 this.connection.checkClosedOrFailed();
511 this.connection.ensureConnectionInfoSent();
512
513 // Let the server know that the tx is rollback.
514 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
515 syncSendPacketWithInterruptionHandling(info);
516
517 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
518 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
519 if (l != null && !l.isEmpty()) {
520 for (TransactionContext ctx : l) {
521 ctx.afterRollback();
522 }
523 }
524 }
525 } catch (JMSException e) {
526 throw toXAException(e);
527 }
528 }
529
530 // XAResource interface
531 public void commit(Xid xid, boolean onePhase) throws XAException {
532
533 if (LOG.isDebugEnabled()) {
534 LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
535 }
536
537 // We allow interleaving multiple transactions, so
538 // we don't limit commit to the associated xid.
539 XATransactionId x;
540 if (xid == null || (equals(associatedXid, xid))) {
541 // should never happen, end(xid,TMSUCCESS) must have been previously
542 // called
543 throw new XAException(XAException.XAER_PROTO);
544 } else {
545 x = new XATransactionId(xid);
546 }
547
548 try {
549 this.connection.checkClosedOrFailed();
550 this.connection.ensureConnectionInfoSent();
551
552 // Notify the server that the tx was committed back
553 TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
554
555 syncSendPacketWithInterruptionHandling(info);
556
557 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
558 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
559 if (l != null && !l.isEmpty()) {
560 for (TransactionContext ctx : l) {
561 try {
562 ctx.afterCommit();
563 } catch (Exception ignored) {
564 LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored);
565 }
566 }
567 }
568 }
569
570 } catch (JMSException e) {
571 LOG.warn("commit of: " + x + " failed with: " + e, e);
572 if (onePhase) {
573 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
574 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
575 if (l != null && !l.isEmpty()) {
576 for (TransactionContext ctx : l) {
577 try {
578 ctx.afterRollback();
579 } catch (Throwable ignored) {
580 if (LOG.isDebugEnabled()) {
581 LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
582 }
583 }
584 }
585 }
586 }
587 }
588 throw toXAException(e);
589 }
590
591 }
592
593 public void forget(Xid xid) throws XAException {
594 if (LOG.isDebugEnabled()) {
595 LOG.debug("Forget: " + xid);
596 }
597
598 // We allow interleaving multiple transactions, so
599 // we don't limit forget to the associated xid.
600 XATransactionId x;
601 if (xid == null) {
602 throw new XAException(XAException.XAER_PROTO);
603 }
604 if (equals(associatedXid, xid)) {
605 // TODO determine if this can happen... I think not.
606 x = (XATransactionId)transactionId;
607 } else {
608 x = new XATransactionId(xid);
609 }
610
611 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
612
613 try {
614 // Tell the server to forget the transaction.
615 syncSendPacketWithInterruptionHandling(info);
616 } catch (JMSException e) {
617 throw toXAException(e);
618 }
619 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
620 ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
621 }
622 }
623
624 public boolean isSameRM(XAResource xaResource) throws XAException {
625 if (xaResource == null) {
626 return false;
627 }
628 if (!(xaResource instanceof TransactionContext)) {
629 return false;
630 }
631 TransactionContext xar = (TransactionContext)xaResource;
632 try {
633 return getResourceManagerId().equals(xar.getResourceManagerId());
634 } catch (Throwable e) {
635 throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
636 }
637 }
638
639 public Xid[] recover(int flag) throws XAException {
640 if (LOG.isDebugEnabled()) {
641 LOG.debug("Recover: " + flag);
642 }
643
644 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
645 try {
646 this.connection.checkClosedOrFailed();
647 this.connection.ensureConnectionInfoSent();
648
649 DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
650 DataStructure[] data = receipt.getData();
651 XATransactionId[] answer;
652 if (data instanceof XATransactionId[]) {
653 answer = (XATransactionId[])data;
654 } else {
655 answer = new XATransactionId[data.length];
656 System.arraycopy(data, 0, answer, 0, data.length);
657 }
658 return answer;
659 } catch (JMSException e) {
660 throw toXAException(e);
661 }
662 }
663
664 public int getTransactionTimeout() throws XAException {
665 return 0;
666 }
667
668 public boolean setTransactionTimeout(int seconds) throws XAException {
669 return false;
670 }
671
672 // ///////////////////////////////////////////////////////////
673 //
674 // Helper methods.
675 //
676 // ///////////////////////////////////////////////////////////
677 private String getResourceManagerId() throws JMSException {
678 return this.connection.getResourceManagerId();
679 }
680
681 private void setXid(Xid xid) throws XAException {
682
683 try {
684 this.connection.checkClosedOrFailed();
685 this.connection.ensureConnectionInfoSent();
686 } catch (JMSException e) {
687 throw toXAException(e);
688 }
689
690 if (xid != null) {
691 // associate
692 associatedXid = xid;
693 transactionId = new XATransactionId(xid);
694
695 TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
696 try {
697 this.connection.asyncSendPacket(info);
698 if (LOG.isDebugEnabled()) {
699 LOG.debug("Started XA transaction: " + transactionId);
700 }
701 } catch (JMSException e) {
702 throw toXAException(e);
703 }
704
705 } else {
706
707 if (transactionId != null) {
708 TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
709 try {
710 syncSendPacketWithInterruptionHandling(info);
711 if (LOG.isDebugEnabled()) {
712 LOG.debug("Ended XA transaction: " + transactionId);
713 }
714 } catch (JMSException e) {
715 throw toXAException(e);
716 }
717
718 // Add our self to the list of contexts that are interested in
719 // post commit/rollback events.
720 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
721 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
722 if (l == null) {
723 l = new ArrayList<TransactionContext>(3);
724 ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
725 l.add(this);
726 } else if (!l.contains(this)) {
727 l.add(this);
728 }
729 }
730 }
731
732 // dis-associate
733 associatedXid = null;
734 transactionId = null;
735 }
736 }
737
738 /**
739 * Sends the given command. Also sends the command in case of interruption,
740 * so that important commands like rollback and commit are never interrupted.
741 * If interruption occurred, set the interruption state of the current
742 * after performing the action again.
743 *
744 * @return the response
745 */
746 private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
747 try {
748 return this.connection.syncSendPacket(command);
749 } catch (JMSException e) {
750 if (e.getLinkedException() instanceof InterruptedIOException) {
751 try {
752 Thread.interrupted();
753 return this.connection.syncSendPacket(command);
754 } finally {
755 Thread.currentThread().interrupt();
756 }
757 }
758
759 throw e;
760 }
761 }
762
763 /**
764 * Converts a JMSException from the server to an XAException. if the
765 * JMSException contained a linked XAException that is returned instead.
766 *
767 * @param e JMSException to convert
768 * @return XAException wrapping original exception or its message
769 */
770 private XAException toXAException(JMSException e) {
771 if (e.getCause() != null && e.getCause() instanceof XAException) {
772 XAException original = (XAException)e.getCause();
773 XAException xae = new XAException(original.getMessage());
774 xae.errorCode = original.errorCode;
775 xae.initCause(original);
776 return xae;
777 }
778
779 XAException xae = new XAException(e.getMessage());
780 xae.errorCode = XAException.XAER_RMFAIL;
781 xae.initCause(e);
782 return xae;
783 }
784
785 public ActiveMQConnection getConnection() {
786 return connection;
787 }
788
789 public void cleanup() {
790 associatedXid = null;
791 transactionId = null;
792 }
793
794 @Override
795 public String toString() {
796 return "TransactionContext{" +
797 "transactionId=" + transactionId +
798 '}';
799 }
800 }