001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.advisory;
018
019 import java.util.concurrent.atomic.AtomicBoolean;
020 import java.util.concurrent.atomic.AtomicInteger;
021
022 import javax.jms.Connection;
023 import javax.jms.Destination;
024 import javax.jms.JMSException;
025 import javax.jms.Message;
026 import javax.jms.MessageListener;
027 import javax.jms.Session;
028
029 import org.apache.activemq.ActiveMQMessageConsumer;
030 import org.apache.activemq.Service;
031 import org.apache.activemq.command.ActiveMQDestination;
032 import org.apache.activemq.command.ActiveMQMessage;
033 import org.apache.activemq.command.ActiveMQTopic;
034 import org.apache.activemq.command.ConsumerId;
035 import org.apache.activemq.command.ConsumerInfo;
036 import org.apache.activemq.command.RemoveInfo;
037 import org.slf4j.Logger;
038 import org.slf4j.LoggerFactory;
039
040 /**
041 * An object which can be used to listen to the number of active consumers
042 * available on a given destination.
043 *
044 *
045 */
046 public class ConsumerEventSource implements Service, MessageListener {
047 private static final Logger LOG = LoggerFactory.getLogger(ConsumerEventSource.class);
048
049 private final Connection connection;
050 private final ActiveMQDestination destination;
051 private ConsumerListener listener;
052 private AtomicBoolean started = new AtomicBoolean(false);
053 private AtomicInteger consumerCount = new AtomicInteger();
054 private Session session;
055 private ActiveMQMessageConsumer consumer;
056
057 public ConsumerEventSource(Connection connection, Destination destination) throws JMSException {
058 this.connection = connection;
059 this.destination = ActiveMQDestination.transform(destination);
060 }
061
062 public void setConsumerListener(ConsumerListener listener) {
063 this.listener = listener;
064 }
065
066 public String getConsumerId() {
067 return consumer != null ? consumer.getConsumerId().toString() : "NOT_SET";
068 }
069
070 public void start() throws Exception {
071 if (started.compareAndSet(false, true)) {
072 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
073 ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
074 consumer = (ActiveMQMessageConsumer) session.createConsumer(advisoryTopic);
075 consumer.setMessageListener(this);
076 }
077 }
078
079 public void stop() throws Exception {
080 if (started.compareAndSet(true, false)) {
081 if (session != null) {
082 session.close();
083 }
084 }
085 }
086
087 public void onMessage(Message message) {
088 if (message instanceof ActiveMQMessage) {
089 ActiveMQMessage activeMessage = (ActiveMQMessage)message;
090 Object command = activeMessage.getDataStructure();
091 int count = 0;
092 if (command instanceof ConsumerInfo) {
093 count = consumerCount.incrementAndGet();
094 count = extractConsumerCountFromMessage(message, count);
095 fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo)command, count));
096 } else if (command instanceof RemoveInfo) {
097 RemoveInfo removeInfo = (RemoveInfo)command;
098 if (removeInfo.isConsumerRemove()) {
099 count = consumerCount.decrementAndGet();
100 count = extractConsumerCountFromMessage(message, count);
101 fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId)removeInfo.getObjectId(), count));
102 }
103 } else {
104 LOG.warn("Unknown command: " + command);
105 }
106 } else {
107 LOG.warn("Unknown message type: " + message + ". Message ignored");
108 }
109 }
110
111 /**
112 * Lets rely by default on the broker telling us what the consumer count is
113 * as it can ensure that we are up to date at all times and have not
114 * received messages out of order etc.
115 */
116 protected int extractConsumerCountFromMessage(Message message, int count) {
117 try {
118 Object value = message.getObjectProperty("consumerCount");
119 if (value instanceof Number) {
120 Number n = (Number)value;
121 return n.intValue();
122 }
123 LOG.warn("No consumerCount header available on the message: " + message);
124 } catch (Exception e) {
125 LOG.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e);
126 }
127 return count;
128 }
129
130 protected void fireConsumerEvent(ConsumerEvent event) {
131 if (listener != null) {
132 listener.onConsumerEvent(event);
133 }
134 }
135
136 }