001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.EOFException;
020 import java.io.IOException;
021 import java.net.SocketException;
022 import java.net.URI;
023 import java.util.HashMap;
024 import java.util.Iterator;
025 import java.util.LinkedList;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Properties;
029 import java.util.concurrent.ConcurrentHashMap;
030 import java.util.concurrent.CopyOnWriteArrayList;
031 import java.util.concurrent.CountDownLatch;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicBoolean;
034 import java.util.concurrent.atomic.AtomicInteger;
035 import java.util.concurrent.atomic.AtomicReference;
036 import java.util.concurrent.locks.ReentrantReadWriteLock;
037
038 import javax.transaction.xa.XAResource;
039 import org.apache.activemq.advisory.AdvisorySupport;
040 import org.apache.activemq.broker.ft.MasterBroker;
041 import org.apache.activemq.broker.region.ConnectionStatistics;
042 import org.apache.activemq.broker.region.RegionBroker;
043 import org.apache.activemq.command.*;
044 import org.apache.activemq.network.DemandForwardingBridge;
045 import org.apache.activemq.network.MBeanNetworkListener;
046 import org.apache.activemq.network.NetworkBridgeConfiguration;
047 import org.apache.activemq.network.NetworkBridgeFactory;
048 import org.apache.activemq.security.MessageAuthorizationPolicy;
049 import org.apache.activemq.state.CommandVisitor;
050 import org.apache.activemq.state.ConnectionState;
051 import org.apache.activemq.state.ConsumerState;
052 import org.apache.activemq.state.ProducerState;
053 import org.apache.activemq.state.SessionState;
054 import org.apache.activemq.state.TransactionState;
055 import org.apache.activemq.thread.DefaultThreadPools;
056 import org.apache.activemq.thread.Task;
057 import org.apache.activemq.thread.TaskRunner;
058 import org.apache.activemq.thread.TaskRunnerFactory;
059 import org.apache.activemq.transaction.Transaction;
060 import org.apache.activemq.transport.DefaultTransportListener;
061 import org.apache.activemq.transport.ResponseCorrelator;
062 import org.apache.activemq.transport.Transport;
063 import org.apache.activemq.transport.TransportDisposedIOException;
064 import org.apache.activemq.transport.TransportFactory;
065 import org.apache.activemq.util.IntrospectionSupport;
066 import org.apache.activemq.util.MarshallingSupport;
067 import org.apache.activemq.util.ServiceSupport;
068 import org.apache.activemq.util.URISupport;
069 import org.slf4j.Logger;
070 import org.slf4j.LoggerFactory;
071 import org.slf4j.MDC;
072
073 public class TransportConnection implements Connection, Task, CommandVisitor {
074 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
075 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
076 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
077 // Keeps track of the broker and connector that created this connection.
078 protected final Broker broker;
079 protected final TransportConnector connector;
080 // Keeps track of the state of the connections.
081 // protected final ConcurrentHashMap localConnectionStates=new
082 // ConcurrentHashMap();
083 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
084 // The broker and wireformat info that was exchanged.
085 protected BrokerInfo brokerInfo;
086 protected final List<Command> dispatchQueue = new LinkedList<Command>();
087 protected TaskRunner taskRunner;
088 protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
089 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
090 private MasterBroker masterBroker;
091 private final Transport transport;
092 private MessageAuthorizationPolicy messageAuthorizationPolicy;
093 private WireFormatInfo wireFormatInfo;
094 // Used to do async dispatch.. this should perhaps be pushed down into the
095 // transport layer..
096 private boolean inServiceException;
097 private final ConnectionStatistics statistics = new ConnectionStatistics();
098 private boolean manageable;
099 private boolean slow;
100 private boolean markedCandidate;
101 private boolean blockedCandidate;
102 private boolean blocked;
103 private boolean connected;
104 private boolean active;
105 private boolean starting;
106 private boolean pendingStop;
107 private long timeStamp;
108 private final AtomicBoolean stopping = new AtomicBoolean(false);
109 private final CountDownLatch stopped = new CountDownLatch(1);
110 private final AtomicBoolean asyncException = new AtomicBoolean(false);
111 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
112 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
113 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
114 private ConnectionContext context;
115 private boolean networkConnection;
116 private boolean faultTolerantConnection;
117 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
118 private DemandForwardingBridge duplexBridge;
119 private final TaskRunnerFactory taskRunnerFactory;
120 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
121 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
122 private String duplexNetworkConnectorId;
123 private Throwable stopError = null;
124
125 /**
126 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
127 * else commands are sent async.
128 */
129 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
130 TaskRunnerFactory taskRunnerFactory) {
131 this.connector = connector;
132 this.broker = broker;
133 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
134 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
135 brokerConnectionStates = rb.getConnectionStates();
136 if (connector != null) {
137 this.statistics.setParent(connector.getStatistics());
138 }
139 this.taskRunnerFactory = taskRunnerFactory;
140 this.transport = transport;
141 this.transport.setTransportListener(new DefaultTransportListener() {
142 @Override
143 public void onCommand(Object o) {
144 serviceLock.readLock().lock();
145 try {
146 if (!(o instanceof Command)) {
147 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
148 }
149 Command command = (Command) o;
150 Response response = service(command);
151 if (response != null) {
152 dispatchSync(response);
153 }
154 } finally {
155 serviceLock.readLock().unlock();
156 }
157 }
158
159 @Override
160 public void onException(IOException exception) {
161 serviceLock.readLock().lock();
162 try {
163 serviceTransportException(exception);
164 } finally {
165 serviceLock.readLock().unlock();
166 }
167 }
168 });
169 connected = true;
170 }
171
172 /**
173 * Returns the number of messages to be dispatched to this connection
174 *
175 * @return size of dispatch queue
176 */
177 public int getDispatchQueueSize() {
178 synchronized (dispatchQueue) {
179 return dispatchQueue.size();
180 }
181 }
182
183 public void serviceTransportException(IOException e) {
184 BrokerService bService = connector.getBrokerService();
185 if (bService.isShutdownOnSlaveFailure()) {
186 if (brokerInfo != null) {
187 if (brokerInfo.isSlaveBroker()) {
188 LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
189 try {
190 doStop();
191 bService.stop();
192 } catch (Exception ex) {
193 LOG.warn("Failed to stop the master", ex);
194 }
195 }
196 }
197 }
198 if (!stopping.get()) {
199 transportException.set(e);
200 if (TRANSPORTLOG.isDebugEnabled()) {
201 TRANSPORTLOG.debug(this + " failed: " + e, e);
202 } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
203 TRANSPORTLOG.warn(this + " failed: " + e);
204 }
205 stopAsync();
206 }
207 }
208
209 private boolean expected(IOException e) {
210 return isStomp() &&
211 ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
212 }
213
214 private boolean isStomp() {
215 URI uri = connector.getUri();
216 return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
217 }
218
219 /**
220 * Calls the serviceException method in an async thread. Since handling a
221 * service exception closes a socket, we should not tie up broker threads
222 * since client sockets may hang or cause deadlocks.
223 */
224 public void serviceExceptionAsync(final IOException e) {
225 if (asyncException.compareAndSet(false, true)) {
226 new Thread("Async Exception Handler") {
227 @Override
228 public void run() {
229 serviceException(e);
230 }
231 }.start();
232 }
233 }
234
235 /**
236 * Closes a clients connection due to a detected error. Errors are ignored
237 * if: the client is closing or broker is closing. Otherwise, the connection
238 * error transmitted to the client before stopping it's transport.
239 */
240 public void serviceException(Throwable e) {
241 // are we a transport exception such as not being able to dispatch
242 // synchronously to a transport
243 if (e instanceof IOException) {
244 serviceTransportException((IOException) e);
245 } else if (e.getClass() == BrokerStoppedException.class) {
246 // Handle the case where the broker is stopped
247 // But the client is still connected.
248 if (!stopping.get()) {
249 if (SERVICELOG.isDebugEnabled()) {
250 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection.");
251 }
252 ConnectionError ce = new ConnectionError();
253 ce.setException(e);
254 dispatchSync(ce);
255 // Record the error that caused the transport to stop
256 this.stopError = e;
257 // Wait a little bit to try to get the output buffer to flush
258 // the exption notification to the client.
259 try {
260 Thread.sleep(500);
261 } catch (InterruptedException ie) {
262 Thread.currentThread().interrupt();
263 }
264 // Worst case is we just kill the connection before the
265 // notification gets to him.
266 stopAsync();
267 }
268 } else if (!stopping.get() && !inServiceException) {
269 inServiceException = true;
270 try {
271 SERVICELOG.warn("Async error occurred: " + e, e);
272 ConnectionError ce = new ConnectionError();
273 ce.setException(e);
274 if (pendingStop) {
275 dispatchSync(ce);
276 } else {
277 dispatchAsync(ce);
278 }
279 } finally {
280 inServiceException = false;
281 }
282 }
283 }
284
285 public Response service(Command command) {
286 MDC.put("activemq.connector", connector.getUri().toString());
287 Response response = null;
288 boolean responseRequired = command.isResponseRequired();
289 int commandId = command.getCommandId();
290 try {
291 if (!pendingStop) {
292 response = command.visit(this);
293 } else {
294 response = new ExceptionResponse(this.stopError);
295 }
296 } catch (Throwable e) {
297 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
298 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
299 + " command: " + command + ", exception: " + e, e);
300 }
301
302 if (e instanceof java.lang.SecurityException) {
303 // still need to close this down - in case the peer of this transport doesn't play nice
304 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
305 }
306
307 if (responseRequired) {
308 response = new ExceptionResponse(e);
309 } else {
310 serviceException(e);
311 }
312 }
313 if (responseRequired) {
314 if (response == null) {
315 response = new Response();
316 }
317 response.setCorrelationId(commandId);
318 }
319 // The context may have been flagged so that the response is not
320 // sent.
321 if (context != null) {
322 if (context.isDontSendReponse()) {
323 context.setDontSendReponse(false);
324 response = null;
325 }
326 context = null;
327 }
328 MDC.remove("activemq.connector");
329 return response;
330 }
331
332 public Response processKeepAlive(KeepAliveInfo info) throws Exception {
333 return null;
334 }
335
336 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
337 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
338 return null;
339 }
340
341 public Response processWireFormat(WireFormatInfo info) throws Exception {
342 wireFormatInfo = info;
343 protocolVersion.set(info.getVersion());
344 return null;
345 }
346
347 public Response processShutdown(ShutdownInfo info) throws Exception {
348 stopAsync();
349 return null;
350 }
351
352 public Response processFlush(FlushCommand command) throws Exception {
353 return null;
354 }
355
356 public Response processBeginTransaction(TransactionInfo info) throws Exception {
357 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
358 context = null;
359 if (cs != null) {
360 context = cs.getContext();
361 }
362 if (cs == null) {
363 throw new NullPointerException("Context is null");
364 }
365 // Avoid replaying dup commands
366 if (cs.getTransactionState(info.getTransactionId()) == null) {
367 cs.addTransactionState(info.getTransactionId());
368 broker.beginTransaction(context, info.getTransactionId());
369 }
370 return null;
371 }
372
373 public Response processEndTransaction(TransactionInfo info) throws Exception {
374 // No need to do anything. This packet is just sent by the client
375 // make sure he is synced with the server as commit command could
376 // come from a different connection.
377 return null;
378 }
379
380 public Response processPrepareTransaction(TransactionInfo info) throws Exception {
381 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
382 context = null;
383 if (cs != null) {
384 context = cs.getContext();
385 }
386 if (cs == null) {
387 throw new NullPointerException("Context is null");
388 }
389 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
390 if (transactionState == null) {
391 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
392 + info.getTransactionId());
393 }
394 // Avoid dups.
395 if (!transactionState.isPrepared()) {
396 transactionState.setPrepared(true);
397 int result = broker.prepareTransaction(context, info.getTransactionId());
398 transactionState.setPreparedResult(result);
399 if (result == XAResource.XA_RDONLY) {
400 // we are done, no further rollback or commit from TM
401 cs.removeTransactionState(info.getTransactionId());
402 }
403 IntegerResponse response = new IntegerResponse(result);
404 return response;
405 } else {
406 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
407 return response;
408 }
409 }
410
411 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
412 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
413 context = cs.getContext();
414 cs.removeTransactionState(info.getTransactionId());
415 broker.commitTransaction(context, info.getTransactionId(), true);
416 return null;
417 }
418
419 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
420 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
421 context = cs.getContext();
422 cs.removeTransactionState(info.getTransactionId());
423 broker.commitTransaction(context, info.getTransactionId(), false);
424 return null;
425 }
426
427 public Response processRollbackTransaction(TransactionInfo info) throws Exception {
428 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
429 context = cs.getContext();
430 cs.removeTransactionState(info.getTransactionId());
431 broker.rollbackTransaction(context, info.getTransactionId());
432 return null;
433 }
434
435 public Response processForgetTransaction(TransactionInfo info) throws Exception {
436 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
437 context = cs.getContext();
438 broker.forgetTransaction(context, info.getTransactionId());
439 return null;
440 }
441
442 public Response processRecoverTransactions(TransactionInfo info) throws Exception {
443 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
444 context = cs.getContext();
445 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
446 return new DataArrayResponse(preparedTransactions);
447 }
448
449 public Response processMessage(Message messageSend) throws Exception {
450 ProducerId producerId = messageSend.getProducerId();
451 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
452 if (producerExchange.canDispatch(messageSend)) {
453 broker.send(producerExchange, messageSend);
454 }
455 return null;
456 }
457
458 public Response processMessageAck(MessageAck ack) throws Exception {
459 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
460 if (consumerExchange != null) {
461 broker.acknowledge(consumerExchange, ack);
462 }
463 return null;
464 }
465
466 public Response processMessagePull(MessagePull pull) throws Exception {
467 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
468 }
469
470 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
471 broker.processDispatchNotification(notification);
472 return null;
473 }
474
475 public Response processAddDestination(DestinationInfo info) throws Exception {
476 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
477 broker.addDestinationInfo(cs.getContext(), info);
478 if (info.getDestination().isTemporary()) {
479 cs.addTempDestination(info);
480 }
481 return null;
482 }
483
484 public Response processRemoveDestination(DestinationInfo info) throws Exception {
485 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
486 broker.removeDestinationInfo(cs.getContext(), info);
487 if (info.getDestination().isTemporary()) {
488 cs.removeTempDestination(info.getDestination());
489 }
490 return null;
491 }
492
493 public Response processAddProducer(ProducerInfo info) throws Exception {
494 SessionId sessionId = info.getProducerId().getParentId();
495 ConnectionId connectionId = sessionId.getParentId();
496 TransportConnectionState cs = lookupConnectionState(connectionId);
497 if (cs == null) {
498 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
499 + connectionId);
500 }
501 SessionState ss = cs.getSessionState(sessionId);
502 if (ss == null) {
503 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
504 + sessionId);
505 }
506 // Avoid replaying dup commands
507 if (!ss.getProducerIds().contains(info.getProducerId())) {
508 ActiveMQDestination destination = info.getDestination();
509 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
510 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
511 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
512 }
513 }
514 broker.addProducer(cs.getContext(), info);
515 try {
516 ss.addProducer(info);
517 } catch (IllegalStateException e) {
518 broker.removeProducer(cs.getContext(), info);
519 }
520
521 }
522 return null;
523 }
524
525 public Response processRemoveProducer(ProducerId id) throws Exception {
526 SessionId sessionId = id.getParentId();
527 ConnectionId connectionId = sessionId.getParentId();
528 TransportConnectionState cs = lookupConnectionState(connectionId);
529 SessionState ss = cs.getSessionState(sessionId);
530 if (ss == null) {
531 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
532 + sessionId);
533 }
534 ProducerState ps = ss.removeProducer(id);
535 if (ps == null) {
536 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
537 }
538 removeProducerBrokerExchange(id);
539 broker.removeProducer(cs.getContext(), ps.getInfo());
540 return null;
541 }
542
543 public Response processAddConsumer(ConsumerInfo info) throws Exception {
544 SessionId sessionId = info.getConsumerId().getParentId();
545 ConnectionId connectionId = sessionId.getParentId();
546 TransportConnectionState cs = lookupConnectionState(connectionId);
547 if (cs == null) {
548 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
549 + connectionId);
550 }
551 SessionState ss = cs.getSessionState(sessionId);
552 if (ss == null) {
553 throw new IllegalStateException(broker.getBrokerName()
554 + " Cannot add a consumer to a session that had not been registered: " + sessionId);
555 }
556 // Avoid replaying dup commands
557 if (!ss.getConsumerIds().contains(info.getConsumerId())) {
558 ActiveMQDestination destination = info.getDestination();
559 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
560 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
561 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
562 }
563 }
564
565 broker.addConsumer(cs.getContext(), info);
566 try {
567 ss.addConsumer(info);
568 addConsumerBrokerExchange(info.getConsumerId());
569 } catch (IllegalStateException e) {
570 broker.removeConsumer(cs.getContext(), info);
571 }
572
573 }
574 return null;
575 }
576
577 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
578 SessionId sessionId = id.getParentId();
579 ConnectionId connectionId = sessionId.getParentId();
580 TransportConnectionState cs = lookupConnectionState(connectionId);
581 if (cs == null) {
582 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
583 + connectionId);
584 }
585 SessionState ss = cs.getSessionState(sessionId);
586 if (ss == null) {
587 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
588 + sessionId);
589 }
590 ConsumerState consumerState = ss.removeConsumer(id);
591 if (consumerState == null) {
592 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
593 }
594 ConsumerInfo info = consumerState.getInfo();
595 info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
596 broker.removeConsumer(cs.getContext(), consumerState.getInfo());
597 removeConsumerBrokerExchange(id);
598 return null;
599 }
600
601 public Response processAddSession(SessionInfo info) throws Exception {
602 ConnectionId connectionId = info.getSessionId().getParentId();
603 TransportConnectionState cs = lookupConnectionState(connectionId);
604 // Avoid replaying dup commands
605 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
606 broker.addSession(cs.getContext(), info);
607 try {
608 cs.addSession(info);
609 } catch (IllegalStateException e) {
610 e.printStackTrace();
611 broker.removeSession(cs.getContext(), info);
612 }
613 }
614 return null;
615 }
616
617 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
618 ConnectionId connectionId = id.getParentId();
619 TransportConnectionState cs = lookupConnectionState(connectionId);
620 if (cs == null) {
621 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
622 }
623 SessionState session = cs.getSessionState(id);
624 if (session == null) {
625 throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
626 }
627 // Don't let new consumers or producers get added while we are closing
628 // this down.
629 session.shutdown();
630 // Cascade the connection stop to the consumers and producers.
631 for (ConsumerId consumerId : session.getConsumerIds()) {
632 try {
633 processRemoveConsumer(consumerId, lastDeliveredSequenceId);
634 } catch (Throwable e) {
635 LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
636 }
637 }
638 for (ProducerId producerId : session.getProducerIds()) {
639 try {
640 processRemoveProducer(producerId);
641 } catch (Throwable e) {
642 LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
643 }
644 }
645 cs.removeSession(id);
646 broker.removeSession(cs.getContext(), session.getInfo());
647 return null;
648 }
649
650 public Response processAddConnection(ConnectionInfo info) throws Exception {
651 // if the broker service has slave attached, wait for the slave to be
652 // attached to allow client connection. slave connection is fine
653 if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
654 && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
655 ServiceSupport.dispose(transport);
656 return new ExceptionResponse(new Exception("Master's slave not attached yet."));
657 }
658 // Older clients should have been defaulting this field to true.. but
659 // they were not.
660 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
661 info.setClientMaster(true);
662 }
663 TransportConnectionState state;
664 // Make sure 2 concurrent connections by the same ID only generate 1
665 // TransportConnectionState object.
666 synchronized (brokerConnectionStates) {
667 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
668 if (state == null) {
669 state = new TransportConnectionState(info, this);
670 brokerConnectionStates.put(info.getConnectionId(), state);
671 }
672 state.incrementReference();
673 }
674 // If there are 2 concurrent connections for the same connection id,
675 // then last one in wins, we need to sync here
676 // to figure out the winner.
677 synchronized (state.getConnectionMutex()) {
678 if (state.getConnection() != this) {
679 LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
680 state.getConnection().stop();
681 LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
682 + state.getConnection().getRemoteAddress());
683 state.setConnection(this);
684 state.reset(info);
685 }
686 }
687 registerConnectionState(info.getConnectionId(), state);
688 LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info);
689 this.faultTolerantConnection = info.isFaultTolerant();
690 // Setup the context.
691 String clientId = info.getClientId();
692 context = new ConnectionContext();
693 context.setBroker(broker);
694 context.setClientId(clientId);
695 context.setClientMaster(info.isClientMaster());
696 context.setConnection(this);
697 context.setConnectionId(info.getConnectionId());
698 context.setConnector(connector);
699 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
700 context.setNetworkConnection(networkConnection);
701 context.setFaultTolerant(faultTolerantConnection);
702 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
703 context.setUserName(info.getUserName());
704 context.setWireFormatInfo(wireFormatInfo);
705 context.setReconnect(info.isFailoverReconnect());
706 this.manageable = info.isManageable();
707 context.setConnectionState(state);
708 state.setContext(context);
709 state.setConnection(this);
710 if (info.getClientIp() == null) {
711 info.setClientIp(getRemoteAddress());
712 }
713
714 try {
715 broker.addConnection(context, info);
716 } catch (Exception e) {
717 synchronized (brokerConnectionStates) {
718 brokerConnectionStates.remove(info.getConnectionId());
719 }
720 unregisterConnectionState(info.getConnectionId());
721 LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
722 if (LOG.isDebugEnabled()) {
723 LOG.debug("Exception detail:", e);
724 }
725 throw e;
726 }
727 if (info.isManageable()) {
728 // send ConnectionCommand
729 ConnectionControl command = this.connector.getConnectionControl();
730 command.setFaultTolerant(broker.isFaultTolerantConfiguration());
731 if (info.isFailoverReconnect()) {
732 command.setRebalanceConnection(false);
733 }
734 dispatchAsync(command);
735 }
736 return null;
737 }
738
739 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
740 throws InterruptedException {
741 LOG.debug("remove connection id: " + id);
742 TransportConnectionState cs = lookupConnectionState(id);
743 if (cs != null) {
744 // Don't allow things to be added to the connection state while we
745 // are shutting down.
746 cs.shutdown();
747 // Cascade the connection stop to the sessions.
748 for (SessionId sessionId : cs.getSessionIds()) {
749 try {
750 processRemoveSession(sessionId, lastDeliveredSequenceId);
751 } catch (Throwable e) {
752 SERVICELOG.warn("Failed to remove session " + sessionId, e);
753 }
754 }
755 // Cascade the connection stop to temp destinations.
756 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
757 DestinationInfo di = iter.next();
758 try {
759 broker.removeDestination(cs.getContext(), di.getDestination(), 0);
760 } catch (Throwable e) {
761 SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
762 }
763 iter.remove();
764 }
765 try {
766 broker.removeConnection(cs.getContext(), cs.getInfo(), null);
767 } catch (Throwable e) {
768 SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
769 if (LOG.isDebugEnabled()) {
770 SERVICELOG.debug("Exception detail:", e);
771 }
772 }
773 TransportConnectionState state = unregisterConnectionState(id);
774 if (state != null) {
775 synchronized (brokerConnectionStates) {
776 // If we are the last reference, we should remove the state
777 // from the broker.
778 if (state.decrementReference() == 0) {
779 brokerConnectionStates.remove(id);
780 }
781 }
782 }
783 }
784 return null;
785 }
786
787 public Response processProducerAck(ProducerAck ack) throws Exception {
788 // A broker should not get ProducerAck messages.
789 return null;
790 }
791
792 public Connector getConnector() {
793 return connector;
794 }
795
796 public void dispatchSync(Command message) {
797 try {
798 processDispatch(message);
799 } catch (IOException e) {
800 serviceExceptionAsync(e);
801 }
802 }
803
804 public void dispatchAsync(Command message) {
805 if (!stopping.get()) {
806 if (taskRunner == null) {
807 dispatchSync(message);
808 } else {
809 synchronized (dispatchQueue) {
810 dispatchQueue.add(message);
811 }
812 try {
813 taskRunner.wakeup();
814 } catch (InterruptedException e) {
815 Thread.currentThread().interrupt();
816 }
817 }
818 } else {
819 if (message.isMessageDispatch()) {
820 MessageDispatch md = (MessageDispatch) message;
821 Runnable sub = md.getTransmitCallback();
822 broker.postProcessDispatch(md);
823 if (sub != null) {
824 sub.run();
825 }
826 }
827 }
828 }
829
830 protected void processDispatch(Command command) throws IOException {
831 final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
832 try {
833 if (!stopping.get()) {
834 if (messageDispatch != null) {
835 broker.preProcessDispatch(messageDispatch);
836 }
837 dispatch(command);
838 }
839 } finally {
840 if (messageDispatch != null) {
841 Runnable sub = messageDispatch.getTransmitCallback();
842 broker.postProcessDispatch(messageDispatch);
843 if (sub != null) {
844 sub.run();
845 }
846 }
847 }
848 }
849
850 public boolean iterate() {
851 try {
852 if (pendingStop || stopping.get()) {
853 if (dispatchStopped.compareAndSet(false, true)) {
854 if (transportException.get() == null) {
855 try {
856 dispatch(new ShutdownInfo());
857 } catch (Throwable ignore) {
858 }
859 }
860 dispatchStoppedLatch.countDown();
861 }
862 return false;
863 }
864 if (!dispatchStopped.get()) {
865 Command command = null;
866 synchronized (dispatchQueue) {
867 if (dispatchQueue.isEmpty()) {
868 return false;
869 }
870 command = dispatchQueue.remove(0);
871 }
872 processDispatch(command);
873 return true;
874 }
875 return false;
876 } catch (IOException e) {
877 if (dispatchStopped.compareAndSet(false, true)) {
878 dispatchStoppedLatch.countDown();
879 }
880 serviceExceptionAsync(e);
881 return false;
882 }
883 }
884
885 /**
886 * Returns the statistics for this connection
887 */
888 public ConnectionStatistics getStatistics() {
889 return statistics;
890 }
891
892 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
893 return messageAuthorizationPolicy;
894 }
895
896 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
897 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
898 }
899
900 public boolean isManageable() {
901 return manageable;
902 }
903
904 public void start() throws Exception {
905 try {
906 synchronized (this) {
907 starting = true;
908 if (taskRunnerFactory != null) {
909 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
910 + getRemoteAddress());
911 } else {
912 taskRunner = null;
913 }
914 transport.start();
915 active = true;
916 BrokerInfo info = connector.getBrokerInfo().copy();
917 if (connector.isUpdateClusterClients()) {
918 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
919 } else {
920 info.setPeerBrokerInfos(null);
921 }
922 dispatchAsync(info);
923
924 connector.onStarted(this);
925 }
926 } catch (Exception e) {
927 // Force clean up on an error starting up.
928 pendingStop = true;
929 throw e;
930 } finally {
931 // stop() can be called from within the above block,
932 // but we want to be sure start() completes before
933 // stop() runs, so queue the stop until right now:
934 setStarting(false);
935 if (isPendingStop()) {
936 LOG.debug("Calling the delayed stop() after start() " + this);
937 stop();
938 }
939 }
940 }
941
942 public void stop() throws Exception {
943 stopAsync();
944 while (!stopped.await(5, TimeUnit.SECONDS)) {
945 LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
946 }
947 }
948
949 public void delayedStop(final int waitTime, final String reason, Throwable cause) {
950 if (waitTime > 0) {
951 synchronized (this) {
952 pendingStop = true;
953 stopError = cause;
954 }
955 try {
956 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
957 public void run() {
958 try {
959 Thread.sleep(waitTime);
960 stopAsync();
961 LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason);
962 } catch (InterruptedException e) {
963 }
964 }
965 }, "delayedStop:" + transport.getRemoteAddress());
966 } catch (Throwable t) {
967 LOG.warn("cannot create stopAsync :", t);
968 }
969 }
970 }
971
972 public void stopAsync() {
973 // If we're in the middle of starting then go no further... for now.
974 synchronized (this) {
975 pendingStop = true;
976 if (starting) {
977 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
978 return;
979 }
980 }
981 if (stopping.compareAndSet(false, true)) {
982 // Let all the connection contexts know we are shutting down
983 // so that in progress operations can notice and unblock.
984 List<TransportConnectionState> connectionStates = listConnectionStates();
985 for (TransportConnectionState cs : connectionStates) {
986 ConnectionContext connectionContext = cs.getContext();
987 if (connectionContext != null) {
988 connectionContext.getStopping().set(true);
989 }
990 }
991 try {
992 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
993 public void run() {
994 serviceLock.writeLock().lock();
995 try {
996 doStop();
997 } catch (Throwable e) {
998 LOG.debug("Error occurred while shutting down a connection " + this, e);
999 } finally {
1000 stopped.countDown();
1001 serviceLock.writeLock().unlock();
1002 }
1003 }
1004 }, "StopAsync:" + transport.getRemoteAddress());
1005 } catch (Throwable t) {
1006 LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
1007 stopped.countDown();
1008 }
1009 }
1010 }
1011
1012 @Override
1013 public String toString() {
1014 return "Transport Connection to: " + transport.getRemoteAddress();
1015 }
1016
1017 protected void doStop() throws Exception, InterruptedException {
1018 LOG.debug("Stopping connection: " + transport.getRemoteAddress());
1019 connector.onStopped(this);
1020 try {
1021 synchronized (this) {
1022 if (masterBroker != null) {
1023 masterBroker.stop();
1024 }
1025 if (duplexBridge != null) {
1026 duplexBridge.stop();
1027 }
1028 }
1029 } catch (Exception ignore) {
1030 LOG.trace("Exception caught stopping", ignore);
1031 }
1032 try {
1033 transport.stop();
1034 LOG.debug("Stopped transport: " + transport.getRemoteAddress());
1035 } catch (Exception e) {
1036 LOG.debug("Could not stop transport: " + e, e);
1037 }
1038 if (taskRunner != null) {
1039 taskRunner.shutdown(1);
1040 }
1041 active = false;
1042 // Run the MessageDispatch callbacks so that message references get
1043 // cleaned up.
1044 synchronized (dispatchQueue) {
1045 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1046 Command command = iter.next();
1047 if (command.isMessageDispatch()) {
1048 MessageDispatch md = (MessageDispatch) command;
1049 Runnable sub = md.getTransmitCallback();
1050 broker.postProcessDispatch(md);
1051 if (sub != null) {
1052 sub.run();
1053 }
1054 }
1055 }
1056 dispatchQueue.clear();
1057 }
1058 //
1059 // Remove all logical connection associated with this connection
1060 // from the broker.
1061 if (!broker.isStopped()) {
1062 List<TransportConnectionState> connectionStates = listConnectionStates();
1063 connectionStates = listConnectionStates();
1064 for (TransportConnectionState cs : connectionStates) {
1065 cs.getContext().getStopping().set(true);
1066 try {
1067 LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
1068 processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1069 } catch (Throwable ignore) {
1070 ignore.printStackTrace();
1071 }
1072 }
1073 }
1074 LOG.debug("Connection Stopped: " + getRemoteAddress());
1075 }
1076
1077 /**
1078 * @return Returns the blockedCandidate.
1079 */
1080 public boolean isBlockedCandidate() {
1081 return blockedCandidate;
1082 }
1083
1084 /**
1085 * @param blockedCandidate The blockedCandidate to set.
1086 */
1087 public void setBlockedCandidate(boolean blockedCandidate) {
1088 this.blockedCandidate = blockedCandidate;
1089 }
1090
1091 /**
1092 * @return Returns the markedCandidate.
1093 */
1094 public boolean isMarkedCandidate() {
1095 return markedCandidate;
1096 }
1097
1098 /**
1099 * @param markedCandidate The markedCandidate to set.
1100 */
1101 public void setMarkedCandidate(boolean markedCandidate) {
1102 this.markedCandidate = markedCandidate;
1103 if (!markedCandidate) {
1104 timeStamp = 0;
1105 blockedCandidate = false;
1106 }
1107 }
1108
1109 /**
1110 * @param slow The slow to set.
1111 */
1112 public void setSlow(boolean slow) {
1113 this.slow = slow;
1114 }
1115
1116 /**
1117 * @return true if the Connection is slow
1118 */
1119 public boolean isSlow() {
1120 return slow;
1121 }
1122
1123 /**
1124 * @return true if the Connection is potentially blocked
1125 */
1126 public boolean isMarkedBlockedCandidate() {
1127 return markedCandidate;
1128 }
1129
1130 /**
1131 * Mark the Connection, so we can deem if it's collectable on the next sweep
1132 */
1133 public void doMark() {
1134 if (timeStamp == 0) {
1135 timeStamp = System.currentTimeMillis();
1136 }
1137 }
1138
1139 /**
1140 * @return if after being marked, the Connection is still writing
1141 */
1142 public boolean isBlocked() {
1143 return blocked;
1144 }
1145
1146 /**
1147 * @return true if the Connection is connected
1148 */
1149 public boolean isConnected() {
1150 return connected;
1151 }
1152
1153 /**
1154 * @param blocked The blocked to set.
1155 */
1156 public void setBlocked(boolean blocked) {
1157 this.blocked = blocked;
1158 }
1159
1160 /**
1161 * @param connected The connected to set.
1162 */
1163 public void setConnected(boolean connected) {
1164 this.connected = connected;
1165 }
1166
1167 /**
1168 * @return true if the Connection is active
1169 */
1170 public boolean isActive() {
1171 return active;
1172 }
1173
1174 /**
1175 * @param active The active to set.
1176 */
1177 public void setActive(boolean active) {
1178 this.active = active;
1179 }
1180
1181 /**
1182 * @return true if the Connection is starting
1183 */
1184 public synchronized boolean isStarting() {
1185 return starting;
1186 }
1187
1188 public synchronized boolean isNetworkConnection() {
1189 return networkConnection;
1190 }
1191
1192 public boolean isFaultTolerantConnection() {
1193 return this.faultTolerantConnection;
1194 }
1195
1196 protected synchronized void setStarting(boolean starting) {
1197 this.starting = starting;
1198 }
1199
1200 /**
1201 * @return true if the Connection needs to stop
1202 */
1203 public synchronized boolean isPendingStop() {
1204 return pendingStop;
1205 }
1206
1207 protected synchronized void setPendingStop(boolean pendingStop) {
1208 this.pendingStop = pendingStop;
1209 }
1210
1211 public Response processBrokerInfo(BrokerInfo info) {
1212 if (info.isSlaveBroker()) {
1213 BrokerService bService = connector.getBrokerService();
1214 // Do we only support passive slaves - or does the slave want to be
1215 // passive ?
1216 boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1217 if (passive == false) {
1218
1219 // stream messages from this broker (the master) to
1220 // the slave
1221 MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1222 masterBroker = new MasterBroker(parent, transport);
1223 masterBroker.startProcessing();
1224 }
1225 LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
1226 bService.slaveConnectionEstablished();
1227 } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1228 // so this TransportConnection is the rear end of a network bridge
1229 // We have been requested to create a two way pipe ...
1230 try {
1231 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1232 Map<String, String> props = createMap(properties);
1233 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1234 IntrospectionSupport.setProperties(config, props, "");
1235 config.setBrokerName(broker.getBrokerName());
1236
1237 // check for existing duplex connection hanging about
1238
1239 // We first look if existing network connection already exists for the same broker Id and network connector name
1240 // It's possible in case of brief network fault to have this transport connector side of the connection always active
1241 // and the duplex network connector side wanting to open a new one
1242 // In this case, the old connection must be broken
1243 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1244 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1245 synchronized (connections) {
1246 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1247 TransportConnection c = iter.next();
1248 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1249 LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1250 c.stopAsync();
1251 // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1252 c.getStopped().await(1, TimeUnit.SECONDS);
1253 }
1254 }
1255 setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1256 }
1257 URI uri = broker.getVmConnectorURI();
1258 HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
1259 map.put("network", "true");
1260 map.put("async", "false");
1261 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1262 Transport localTransport = TransportFactory.connect(uri);
1263 Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1264 String duplexName = localTransport.toString();
1265 if (duplexName.contains("#")) {
1266 duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1267 }
1268 MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1269 listener.setCreatedByDuplex(true);
1270 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1271 duplexBridge.setBrokerService(broker.getBrokerService());
1272 // now turn duplex off this side
1273 info.setDuplexConnection(false);
1274 duplexBridge.setCreatedByDuplex(true);
1275 duplexBridge.duplexStart(this, brokerInfo, info);
1276 LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1277 return null;
1278 } catch (TransportDisposedIOException e) {
1279 LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1280 return null;
1281 } catch (Exception e) {
1282 LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e);
1283 return null;
1284 }
1285 }
1286 // We only expect to get one broker info command per connection
1287 if (this.brokerInfo != null) {
1288 LOG.warn("Unexpected extra broker info command received: " + info);
1289 }
1290 this.brokerInfo = info;
1291 networkConnection = true;
1292 List<TransportConnectionState> connectionStates = listConnectionStates();
1293 for (TransportConnectionState cs : connectionStates) {
1294 cs.getContext().setNetworkConnection(true);
1295 }
1296 return null;
1297 }
1298
1299 @SuppressWarnings({"unchecked", "rawtypes"})
1300 private HashMap<String, String> createMap(Properties properties) {
1301 return new HashMap(properties);
1302 }
1303
1304 protected void dispatch(Command command) throws IOException {
1305 try {
1306 setMarkedCandidate(true);
1307 transport.oneway(command);
1308 } finally {
1309 setMarkedCandidate(false);
1310 }
1311 }
1312
1313 public String getRemoteAddress() {
1314 return transport.getRemoteAddress();
1315 }
1316
1317 public String getConnectionId() {
1318 List<TransportConnectionState> connectionStates = listConnectionStates();
1319 for (TransportConnectionState cs : connectionStates) {
1320 if (cs.getInfo().getClientId() != null) {
1321 return cs.getInfo().getClientId();
1322 }
1323 return cs.getInfo().getConnectionId().toString();
1324 }
1325 return null;
1326 }
1327
1328 public void updateClient(ConnectionControl control) {
1329 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1330 && this.wireFormatInfo.getVersion() >= 6) {
1331 dispatchAsync(control);
1332 }
1333 }
1334
1335 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1336 ProducerBrokerExchange result = producerExchanges.get(id);
1337 if (result == null) {
1338 synchronized (producerExchanges) {
1339 result = new ProducerBrokerExchange();
1340 TransportConnectionState state = lookupConnectionState(id);
1341 context = state.getContext();
1342 result.setConnectionContext(context);
1343 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1344 result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1345 }
1346 SessionState ss = state.getSessionState(id.getParentId());
1347 if (ss != null) {
1348 result.setProducerState(ss.getProducerState(id));
1349 ProducerState producerState = ss.getProducerState(id);
1350 if (producerState != null && producerState.getInfo() != null) {
1351 ProducerInfo info = producerState.getInfo();
1352 result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1353 }
1354 }
1355 producerExchanges.put(id, result);
1356 }
1357 } else {
1358 context = result.getConnectionContext();
1359 }
1360 return result;
1361 }
1362
1363 private void removeProducerBrokerExchange(ProducerId id) {
1364 synchronized (producerExchanges) {
1365 producerExchanges.remove(id);
1366 }
1367 }
1368
1369 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1370 ConsumerBrokerExchange result = consumerExchanges.get(id);
1371 return result;
1372 }
1373
1374 private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
1375 ConsumerBrokerExchange result = consumerExchanges.get(id);
1376 if (result == null) {
1377 synchronized (consumerExchanges) {
1378 result = new ConsumerBrokerExchange();
1379 TransportConnectionState state = lookupConnectionState(id);
1380 context = state.getContext();
1381 result.setConnectionContext(context);
1382 SessionState ss = state.getSessionState(id.getParentId());
1383 if (ss != null) {
1384 ConsumerState cs = ss.getConsumerState(id);
1385 if (cs != null) {
1386 ConsumerInfo info = cs.getInfo();
1387 if (info != null) {
1388 if (info.getDestination() != null && info.getDestination().isPattern()) {
1389 result.setWildcard(true);
1390 }
1391 }
1392 }
1393 }
1394 consumerExchanges.put(id, result);
1395 }
1396 }
1397 return result;
1398 }
1399
1400 private void removeConsumerBrokerExchange(ConsumerId id) {
1401 synchronized (consumerExchanges) {
1402 consumerExchanges.remove(id);
1403 }
1404 }
1405
1406 public int getProtocolVersion() {
1407 return protocolVersion.get();
1408 }
1409
1410 public Response processControlCommand(ControlCommand command) throws Exception {
1411 return null;
1412 }
1413
1414 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1415 return null;
1416 }
1417
1418 public Response processConnectionControl(ConnectionControl control) throws Exception {
1419 if (control != null) {
1420 faultTolerantConnection = control.isFaultTolerant();
1421 }
1422 return null;
1423 }
1424
1425 public Response processConnectionError(ConnectionError error) throws Exception {
1426 return null;
1427 }
1428
1429 public Response processConsumerControl(ConsumerControl control) throws Exception {
1430 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1431 broker.processConsumerControl(consumerExchange, control);
1432 return null;
1433 }
1434
1435 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1436 TransportConnectionState state) {
1437 TransportConnectionState cs = null;
1438 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1439 // swap implementations
1440 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1441 newRegister.intialize(connectionStateRegister);
1442 connectionStateRegister = newRegister;
1443 }
1444 cs = connectionStateRegister.registerConnectionState(connectionId, state);
1445 return cs;
1446 }
1447
1448 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1449 return connectionStateRegister.unregisterConnectionState(connectionId);
1450 }
1451
1452 protected synchronized List<TransportConnectionState> listConnectionStates() {
1453 return connectionStateRegister.listConnectionStates();
1454 }
1455
1456 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1457 return connectionStateRegister.lookupConnectionState(connectionId);
1458 }
1459
1460 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1461 return connectionStateRegister.lookupConnectionState(id);
1462 }
1463
1464 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1465 return connectionStateRegister.lookupConnectionState(id);
1466 }
1467
1468 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1469 return connectionStateRegister.lookupConnectionState(id);
1470 }
1471
1472 protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1473 return connectionStateRegister.lookupConnectionState(connectionId);
1474 }
1475
1476 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1477 this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1478 }
1479
1480 protected synchronized String getDuplexNetworkConnectorId() {
1481 return this.duplexNetworkConnectorId;
1482 }
1483
1484 public boolean isStopping() {
1485 return stopping.get();
1486 }
1487
1488 protected CountDownLatch getStopped() {
1489 return stopped;
1490 }
1491
1492 private int getProducerCount(ConnectionId connectionId) {
1493 int result = 0;
1494 TransportConnectionState cs = lookupConnectionState(connectionId);
1495 if (cs != null) {
1496 for (SessionId sessionId : cs.getSessionIds()) {
1497 SessionState sessionState = cs.getSessionState(sessionId);
1498 if (sessionState != null) {
1499 result += sessionState.getProducerIds().size();
1500 }
1501 }
1502 }
1503 return result;
1504 }
1505
1506 private int getConsumerCount(ConnectionId connectionId) {
1507 int result = 0;
1508 TransportConnectionState cs = lookupConnectionState(connectionId);
1509 if (cs != null) {
1510 for (SessionId sessionId : cs.getSessionIds()) {
1511 SessionState sessionState = cs.getSessionState(sessionId);
1512 if (sessionState != null) {
1513 result += sessionState.getConsumerIds().size();
1514 }
1515 }
1516 }
1517 return result;
1518 }
1519 }