001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.state;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.LinkedHashMap;
022 import java.util.Map;
023 import java.util.Vector;
024 import java.util.Map.Entry;
025 import java.util.concurrent.ConcurrentHashMap;
026
027 import javax.jms.TransactionRolledBackException;
028 import javax.transaction.xa.XAResource;
029
030 import org.apache.activemq.command.Command;
031 import org.apache.activemq.command.ConnectionId;
032 import org.apache.activemq.command.ConnectionInfo;
033 import org.apache.activemq.command.ConsumerControl;
034 import org.apache.activemq.command.ConsumerId;
035 import org.apache.activemq.command.ConsumerInfo;
036 import org.apache.activemq.command.DestinationInfo;
037 import org.apache.activemq.command.ExceptionResponse;
038 import org.apache.activemq.command.IntegerResponse;
039 import org.apache.activemq.command.Message;
040 import org.apache.activemq.command.MessagePull;
041 import org.apache.activemq.command.ProducerId;
042 import org.apache.activemq.command.ProducerInfo;
043 import org.apache.activemq.command.Response;
044 import org.apache.activemq.command.SessionId;
045 import org.apache.activemq.command.SessionInfo;
046 import org.apache.activemq.command.TransactionInfo;
047 import org.apache.activemq.transport.Transport;
048 import org.apache.activemq.util.IOExceptionSupport;
049 import org.slf4j.Logger;
050 import org.slf4j.LoggerFactory;
051
052 /**
053 * Tracks the state of a connection so a newly established transport can be
054 * re-initialized to the state that was tracked.
055 *
056 *
057 */
058 public class ConnectionStateTracker extends CommandVisitorAdapter {
059 private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
060
061 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
062 private static final int MESSAGE_PULL_SIZE = 400;
063 protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
064
065 private boolean trackTransactions;
066 private boolean restoreSessions = true;
067 private boolean restoreConsumers = true;
068 private boolean restoreProducers = true;
069 private boolean restoreTransaction = true;
070 private boolean trackMessages = true;
071 private boolean trackTransactionProducers = true;
072 private int maxCacheSize = 128 * 1024;
073 private int currentCacheSize;
074 private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
075 protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
076 boolean result = currentCacheSize > maxCacheSize;
077 if (result) {
078 if (eldest.getValue() instanceof Message) {
079 currentCacheSize -= ((Message)eldest.getValue()).getSize();
080 } else if (eldest.getValue() instanceof MessagePull) {
081 currentCacheSize -= MESSAGE_PULL_SIZE;
082 }
083 if (LOG.isTraceEnabled()) {
084 LOG.trace("removing tracked message: " + eldest.getKey());
085 }
086 }
087 return result;
088 }
089 };
090
091 private class RemoveTransactionAction implements ResponseHandler {
092 private final TransactionInfo info;
093
094 public RemoveTransactionAction(TransactionInfo info) {
095 this.info = info;
096 }
097
098 public void onResponse(Command response) {
099 ConnectionId connectionId = info.getConnectionId();
100 ConnectionState cs = connectionStates.get(connectionId);
101 cs.removeTransactionState(info.getTransactionId());
102 }
103 }
104
105 private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
106
107 public PrepareReadonlyTransactionAction(TransactionInfo info) {
108 super(info);
109 }
110
111 public void onResponse(Command command) {
112 IntegerResponse response = (IntegerResponse) command;
113 if (XAResource.XA_RDONLY == response.getResult()) {
114 // all done, no commit or rollback from TM
115 super.onResponse(command);
116 }
117 }
118 }
119
120 /**
121 *
122 *
123 * @param command
124 * @return null if the command is not state tracked.
125 * @throws IOException
126 */
127 public Tracked track(Command command) throws IOException {
128 try {
129 return (Tracked)command.visit(this);
130 } catch (IOException e) {
131 throw e;
132 } catch (Throwable e) {
133 throw IOExceptionSupport.create(e);
134 }
135 }
136
137 public void trackBack(Command command) {
138 if (command != null) {
139 if (trackMessages && command.isMessage()) {
140 Message message = (Message) command;
141 if (message.getTransactionId()==null) {
142 currentCacheSize = currentCacheSize + message.getSize();
143 }
144 } else if (command instanceof MessagePull) {
145 // just needs to be a rough estimate of size, ~4 identifiers
146 currentCacheSize += MESSAGE_PULL_SIZE;
147 }
148 }
149 }
150
151 public void restore(Transport transport) throws IOException {
152 // Restore the connections.
153 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
154 ConnectionState connectionState = iter.next();
155 connectionState.getInfo().setFailoverReconnect(true);
156 if (LOG.isDebugEnabled()) {
157 LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
158 }
159 transport.oneway(connectionState.getInfo());
160 restoreTempDestinations(transport, connectionState);
161
162 if (restoreSessions) {
163 restoreSessions(transport, connectionState);
164 }
165
166 if (restoreTransaction) {
167 restoreTransactions(transport, connectionState);
168 }
169 }
170 //now flush messages
171 for (Command msg:messageCache.values()) {
172 if (LOG.isDebugEnabled()) {
173 LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
174 }
175 transport.oneway(msg);
176 }
177 }
178
179 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
180 Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
181 for (TransactionState transactionState : connectionState.getTransactionStates()) {
182 if (LOG.isDebugEnabled()) {
183 LOG.debug("tx: " + transactionState.getId());
184 }
185
186 // rollback any completed transactions - no way to know if commit got there
187 // or if reply went missing
188 //
189 if (!transactionState.getCommands().isEmpty()) {
190 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
191 if (lastCommand instanceof TransactionInfo) {
192 TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
193 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
194 if (LOG.isDebugEnabled()) {
195 LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
196 }
197 toRollback.add(transactionInfo);
198 continue;
199 }
200 }
201 }
202
203 // replay short lived producers that may have been involved in the transaction
204 for (ProducerState producerState : transactionState.getProducerStates().values()) {
205 if (LOG.isDebugEnabled()) {
206 LOG.debug("tx replay producer :" + producerState.getInfo());
207 }
208 transport.oneway(producerState.getInfo());
209 }
210
211 for (Command command : transactionState.getCommands()) {
212 if (LOG.isDebugEnabled()) {
213 LOG.debug("tx replay: " + command);
214 }
215 transport.oneway(command);
216 }
217
218 for (ProducerState producerState : transactionState.getProducerStates().values()) {
219 if (LOG.isDebugEnabled()) {
220 LOG.debug("tx remove replayed producer :" + producerState.getInfo());
221 }
222 transport.oneway(producerState.getInfo().createRemoveCommand());
223 }
224 }
225
226 for (TransactionInfo command: toRollback) {
227 // respond to the outstanding commit
228 ExceptionResponse response = new ExceptionResponse();
229 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
230 response.setCorrelationId(command.getCommandId());
231 transport.getTransportListener().onCommand(response);
232 }
233 }
234
235 /**
236 * @param transport
237 * @param connectionState
238 * @throws IOException
239 */
240 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
241 // Restore the connection's sessions
242 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
243 SessionState sessionState = (SessionState)iter2.next();
244 if (LOG.isDebugEnabled()) {
245 LOG.debug("session: " + sessionState.getInfo().getSessionId());
246 }
247 transport.oneway(sessionState.getInfo());
248
249 if (restoreProducers) {
250 restoreProducers(transport, sessionState);
251 }
252
253 if (restoreConsumers) {
254 restoreConsumers(transport, sessionState);
255 }
256 }
257 }
258
259 /**
260 * @param transport
261 * @param sessionState
262 * @throws IOException
263 */
264 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
265 // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
266 final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
267 final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
268 for (ConsumerState consumerState : sessionState.getConsumerStates()) {
269 ConsumerInfo infoToSend = consumerState.getInfo();
270 if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
271 infoToSend = consumerState.getInfo().copy();
272 connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
273 infoToSend.setPrefetchSize(0);
274 if (LOG.isDebugEnabled()) {
275 LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
276 }
277 }
278 if (LOG.isDebugEnabled()) {
279 LOG.debug("restore consumer: " + infoToSend.getConsumerId());
280 }
281 transport.oneway(infoToSend);
282 }
283 }
284
285 /**
286 * @param transport
287 * @param sessionState
288 * @throws IOException
289 */
290 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
291 // Restore the session's producers
292 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
293 ProducerState producerState = (ProducerState)iter3.next();
294 if (LOG.isDebugEnabled()) {
295 LOG.debug("producer: " + producerState.getInfo().getProducerId());
296 }
297 transport.oneway(producerState.getInfo());
298 }
299 }
300
301 /**
302 * @param transport
303 * @param connectionState
304 * @throws IOException
305 */
306 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
307 throws IOException {
308 // Restore the connection's temp destinations.
309 for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
310 transport.oneway((DestinationInfo)iter2.next());
311 }
312 }
313
314 public Response processAddDestination(DestinationInfo info) {
315 if (info != null) {
316 ConnectionState cs = connectionStates.get(info.getConnectionId());
317 if (cs != null && info.getDestination().isTemporary()) {
318 cs.addTempDestination(info);
319 }
320 }
321 return TRACKED_RESPONSE_MARKER;
322 }
323
324 public Response processRemoveDestination(DestinationInfo info) {
325 if (info != null) {
326 ConnectionState cs = connectionStates.get(info.getConnectionId());
327 if (cs != null && info.getDestination().isTemporary()) {
328 cs.removeTempDestination(info.getDestination());
329 }
330 }
331 return TRACKED_RESPONSE_MARKER;
332 }
333
334 public Response processAddProducer(ProducerInfo info) {
335 if (info != null && info.getProducerId() != null) {
336 SessionId sessionId = info.getProducerId().getParentId();
337 if (sessionId != null) {
338 ConnectionId connectionId = sessionId.getParentId();
339 if (connectionId != null) {
340 ConnectionState cs = connectionStates.get(connectionId);
341 if (cs != null) {
342 SessionState ss = cs.getSessionState(sessionId);
343 if (ss != null) {
344 ss.addProducer(info);
345 }
346 }
347 }
348 }
349 }
350 return TRACKED_RESPONSE_MARKER;
351 }
352
353 public Response processRemoveProducer(ProducerId id) {
354 if (id != null) {
355 SessionId sessionId = id.getParentId();
356 if (sessionId != null) {
357 ConnectionId connectionId = sessionId.getParentId();
358 if (connectionId != null) {
359 ConnectionState cs = connectionStates.get(connectionId);
360 if (cs != null) {
361 SessionState ss = cs.getSessionState(sessionId);
362 if (ss != null) {
363 ss.removeProducer(id);
364 }
365 }
366 }
367 }
368 }
369 return TRACKED_RESPONSE_MARKER;
370 }
371
372 public Response processAddConsumer(ConsumerInfo info) {
373 if (info != null) {
374 SessionId sessionId = info.getConsumerId().getParentId();
375 if (sessionId != null) {
376 ConnectionId connectionId = sessionId.getParentId();
377 if (connectionId != null) {
378 ConnectionState cs = connectionStates.get(connectionId);
379 if (cs != null) {
380 SessionState ss = cs.getSessionState(sessionId);
381 if (ss != null) {
382 ss.addConsumer(info);
383 }
384 }
385 }
386 }
387 }
388 return TRACKED_RESPONSE_MARKER;
389 }
390
391 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
392 if (id != null) {
393 SessionId sessionId = id.getParentId();
394 if (sessionId != null) {
395 ConnectionId connectionId = sessionId.getParentId();
396 if (connectionId != null) {
397 ConnectionState cs = connectionStates.get(connectionId);
398 if (cs != null) {
399 SessionState ss = cs.getSessionState(sessionId);
400 if (ss != null) {
401 ss.removeConsumer(id);
402 }
403 }
404 }
405 }
406 }
407 return TRACKED_RESPONSE_MARKER;
408 }
409
410 public Response processAddSession(SessionInfo info) {
411 if (info != null) {
412 ConnectionId connectionId = info.getSessionId().getParentId();
413 if (connectionId != null) {
414 ConnectionState cs = connectionStates.get(connectionId);
415 if (cs != null) {
416 cs.addSession(info);
417 }
418 }
419 }
420 return TRACKED_RESPONSE_MARKER;
421 }
422
423 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
424 if (id != null) {
425 ConnectionId connectionId = id.getParentId();
426 if (connectionId != null) {
427 ConnectionState cs = connectionStates.get(connectionId);
428 if (cs != null) {
429 cs.removeSession(id);
430 }
431 }
432 }
433 return TRACKED_RESPONSE_MARKER;
434 }
435
436 public Response processAddConnection(ConnectionInfo info) {
437 if (info != null) {
438 connectionStates.put(info.getConnectionId(), new ConnectionState(info));
439 }
440 return TRACKED_RESPONSE_MARKER;
441 }
442
443 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
444 if (id != null) {
445 connectionStates.remove(id);
446 }
447 return TRACKED_RESPONSE_MARKER;
448 }
449
450 public Response processMessage(Message send) throws Exception {
451 if (send != null) {
452 if (trackTransactions && send.getTransactionId() != null) {
453 ProducerId producerId = send.getProducerId();
454 ConnectionId connectionId = producerId.getParentId().getParentId();
455 if (connectionId != null) {
456 ConnectionState cs = connectionStates.get(connectionId);
457 if (cs != null) {
458 TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
459 if (transactionState != null) {
460 transactionState.addCommand(send);
461
462 if (trackTransactionProducers) {
463 // for jmstemplate, track the producer in case it is closed before commit
464 // and needs to be replayed
465 SessionState ss = cs.getSessionState(producerId.getParentId());
466 ProducerState producerState = ss.getProducerState(producerId);
467 producerState.setTransactionState(transactionState);
468 }
469 }
470 }
471 }
472 return TRACKED_RESPONSE_MARKER;
473 }else if (trackMessages) {
474 messageCache.put(send.getMessageId(), send);
475 }
476 }
477 return null;
478 }
479
480 public Response processBeginTransaction(TransactionInfo info) {
481 if (trackTransactions && info != null && info.getTransactionId() != null) {
482 ConnectionId connectionId = info.getConnectionId();
483 if (connectionId != null) {
484 ConnectionState cs = connectionStates.get(connectionId);
485 if (cs != null) {
486 cs.addTransactionState(info.getTransactionId());
487 TransactionState state = cs.getTransactionState(info.getTransactionId());
488 state.addCommand(info);
489 }
490 }
491 return TRACKED_RESPONSE_MARKER;
492 }
493 return null;
494 }
495
496 public Response processPrepareTransaction(TransactionInfo info) throws Exception {
497 if (trackTransactions && info != null) {
498 ConnectionId connectionId = info.getConnectionId();
499 if (connectionId != null) {
500 ConnectionState cs = connectionStates.get(connectionId);
501 if (cs != null) {
502 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
503 if (transactionState != null) {
504 transactionState.addCommand(info);
505 return new Tracked(new PrepareReadonlyTransactionAction(info));
506 }
507 }
508 }
509 }
510 return null;
511 }
512
513 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
514 if (trackTransactions && info != null) {
515 ConnectionId connectionId = info.getConnectionId();
516 if (connectionId != null) {
517 ConnectionState cs = connectionStates.get(connectionId);
518 if (cs != null) {
519 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
520 if (transactionState != null) {
521 transactionState.addCommand(info);
522 return new Tracked(new RemoveTransactionAction(info));
523 }
524 }
525 }
526 }
527 return null;
528 }
529
530 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
531 if (trackTransactions && info != null) {
532 ConnectionId connectionId = info.getConnectionId();
533 if (connectionId != null) {
534 ConnectionState cs = connectionStates.get(connectionId);
535 if (cs != null) {
536 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
537 if (transactionState != null) {
538 transactionState.addCommand(info);
539 return new Tracked(new RemoveTransactionAction(info));
540 }
541 }
542 }
543 }
544 return null;
545 }
546
547 public Response processRollbackTransaction(TransactionInfo info) throws Exception {
548 if (trackTransactions && info != null) {
549 ConnectionId connectionId = info.getConnectionId();
550 if (connectionId != null) {
551 ConnectionState cs = connectionStates.get(connectionId);
552 if (cs != null) {
553 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
554 if (transactionState != null) {
555 transactionState.addCommand(info);
556 return new Tracked(new RemoveTransactionAction(info));
557 }
558 }
559 }
560 }
561 return null;
562 }
563
564 public Response processEndTransaction(TransactionInfo info) throws Exception {
565 if (trackTransactions && info != null) {
566 ConnectionId connectionId = info.getConnectionId();
567 if (connectionId != null) {
568 ConnectionState cs = connectionStates.get(connectionId);
569 if (cs != null) {
570 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
571 if (transactionState != null) {
572 transactionState.addCommand(info);
573 }
574 }
575 }
576 return TRACKED_RESPONSE_MARKER;
577 }
578 return null;
579 }
580
581 @Override
582 public Response processMessagePull(MessagePull pull) throws Exception {
583 if (pull != null) {
584 // leave a single instance in the cache
585 final String id = pull.getDestination() + "::" + pull.getConsumerId();
586 messageCache.put(id.intern(), pull);
587 }
588 return null;
589 }
590
591 public boolean isRestoreConsumers() {
592 return restoreConsumers;
593 }
594
595 public void setRestoreConsumers(boolean restoreConsumers) {
596 this.restoreConsumers = restoreConsumers;
597 }
598
599 public boolean isRestoreProducers() {
600 return restoreProducers;
601 }
602
603 public void setRestoreProducers(boolean restoreProducers) {
604 this.restoreProducers = restoreProducers;
605 }
606
607 public boolean isRestoreSessions() {
608 return restoreSessions;
609 }
610
611 public void setRestoreSessions(boolean restoreSessions) {
612 this.restoreSessions = restoreSessions;
613 }
614
615 public boolean isTrackTransactions() {
616 return trackTransactions;
617 }
618
619 public void setTrackTransactions(boolean trackTransactions) {
620 this.trackTransactions = trackTransactions;
621 }
622
623 public boolean isTrackTransactionProducers() {
624 return this.trackTransactionProducers;
625 }
626
627 public void setTrackTransactionProducers(boolean trackTransactionProducers) {
628 this.trackTransactionProducers = trackTransactionProducers;
629 }
630
631 public boolean isRestoreTransaction() {
632 return restoreTransaction;
633 }
634
635 public void setRestoreTransaction(boolean restoreTransaction) {
636 this.restoreTransaction = restoreTransaction;
637 }
638
639 public boolean isTrackMessages() {
640 return trackMessages;
641 }
642
643 public void setTrackMessages(boolean trackMessages) {
644 this.trackMessages = trackMessages;
645 }
646
647 public int getMaxCacheSize() {
648 return maxCacheSize;
649 }
650
651 public void setMaxCacheSize(int maxCacheSize) {
652 this.maxCacheSize = maxCacheSize;
653 }
654
655 public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
656 ConnectionState connectionState = connectionStates.get(connectionId);
657 if (connectionState != null) {
658 connectionState.setConnectionInterruptProcessingComplete(true);
659 Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
660 for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
661 ConsumerControl control = new ConsumerControl();
662 control.setConsumerId(entry.getKey());
663 control.setPrefetch(entry.getValue().getPrefetchSize());
664 control.setDestination(entry.getValue().getDestination());
665 try {
666 if (LOG.isDebugEnabled()) {
667 LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
668 }
669 transport.oneway(control);
670 } catch (Exception ex) {
671 if (LOG.isDebugEnabled()) {
672 LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
673 + " with: " + control.getPrefetch(), ex);
674 }
675 }
676 }
677 stalledConsumers.clear();
678 }
679 }
680
681 public void transportInterrupted(ConnectionId connectionId) {
682 ConnectionState connectionState = connectionStates.get(connectionId);
683 if (connectionState != null) {
684 connectionState.setConnectionInterruptProcessingComplete(false);
685 }
686 }
687 }