001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.advisory;
018
019 import java.util.Set;
020 import java.util.concurrent.CopyOnWriteArraySet;
021 import java.util.concurrent.atomic.AtomicBoolean;
022
023 import javax.jms.Connection;
024 import javax.jms.JMSException;
025 import javax.jms.Message;
026 import javax.jms.MessageConsumer;
027 import javax.jms.MessageListener;
028 import javax.jms.Session;
029
030 import org.apache.activemq.Service;
031 import org.apache.activemq.command.ActiveMQDestination;
032 import org.apache.activemq.command.ActiveMQMessage;
033 import org.apache.activemq.command.ActiveMQQueue;
034 import org.apache.activemq.command.ActiveMQTempQueue;
035 import org.apache.activemq.command.ActiveMQTempTopic;
036 import org.apache.activemq.command.ActiveMQTopic;
037 import org.apache.activemq.command.DestinationInfo;
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040
041 /**
042 * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them
043 * being created or deleted.
044 *
045 *
046 */
047 public class DestinationSource implements MessageListener {
048 private static final Logger LOG = LoggerFactory.getLogger(ConsumerEventSource.class);
049 private AtomicBoolean started = new AtomicBoolean(false);
050 private final Connection connection;
051 private Session session;
052 private MessageConsumer queueConsumer;
053 private MessageConsumer topicConsumer;
054 private MessageConsumer tempTopicConsumer;
055 private MessageConsumer tempQueueConsumer;
056 private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>();
057 private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>();
058 private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>();
059 private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>();
060 private DestinationListener listener;
061
062 public DestinationSource(Connection connection) throws JMSException {
063 this.connection = connection;
064 }
065
066 public DestinationListener getListener() {
067 return listener;
068 }
069
070 public void setDestinationListener(DestinationListener listener) {
071 this.listener = listener;
072 }
073
074 /**
075 * Returns the current queues available on the broker
076 */
077 public Set<ActiveMQQueue> getQueues() {
078 return queues;
079 }
080
081 /**
082 * Returns the current topics on the broker
083 */
084 public Set<ActiveMQTopic> getTopics() {
085 return topics;
086 }
087
088 /**
089 * Returns the current temporary topics available on the broker
090 */
091 public Set<ActiveMQTempQueue> getTemporaryQueues() {
092 return temporaryQueues;
093 }
094
095 /**
096 * Returns the current temporary queues available on the broker
097 */
098 public Set<ActiveMQTempTopic> getTemporaryTopics() {
099 return temporaryTopics;
100 }
101
102 public void start() throws JMSException {
103 if (started.compareAndSet(false, true)) {
104 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
105 queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
106 queueConsumer.setMessageListener(this);
107
108 topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC);
109 topicConsumer.setMessageListener(this);
110
111 tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC);
112 tempQueueConsumer.setMessageListener(this);
113
114 tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC);
115 tempTopicConsumer.setMessageListener(this);
116 }
117 }
118
119 public void stop() throws JMSException {
120 if (started.compareAndSet(true, false)) {
121 if (session != null) {
122 session.close();
123 }
124 }
125 }
126
127 public void onMessage(Message message) {
128 if (message instanceof ActiveMQMessage) {
129 ActiveMQMessage activeMessage = (ActiveMQMessage) message;
130 Object command = activeMessage.getDataStructure();
131 if (command instanceof DestinationInfo) {
132 DestinationInfo destinationInfo = (DestinationInfo) command;
133 DestinationEvent event = new DestinationEvent(this, destinationInfo);
134 fireDestinationEvent(event);
135 }
136 else {
137 LOG.warn("Unknown dataStructure: " + command);
138 }
139 }
140 else {
141 LOG.warn("Unknown message type: " + message + ". Message ignored");
142 }
143 }
144
145 protected void fireDestinationEvent(DestinationEvent event) {
146 // now lets update the data structures
147 ActiveMQDestination destination = event.getDestination();
148 boolean add = event.isAddOperation();
149 if (destination instanceof ActiveMQQueue) {
150 ActiveMQQueue queue = (ActiveMQQueue) destination;
151 if (add) {
152 queues.add(queue);
153 }
154 else {
155 queues.remove(queue);
156 }
157 }
158 else if (destination instanceof ActiveMQTopic) {
159 ActiveMQTopic topic = (ActiveMQTopic) destination;
160 if (add) {
161 topics.add(topic);
162 }
163 else {
164 topics.remove(topic);
165 }
166 }
167 else if (destination instanceof ActiveMQTempQueue) {
168 ActiveMQTempQueue queue = (ActiveMQTempQueue) destination;
169 if (add) {
170 temporaryQueues.add(queue);
171 }
172 else {
173 temporaryQueues.remove(queue);
174 }
175 }
176 else if (destination instanceof ActiveMQTempTopic) {
177 ActiveMQTempTopic topic = (ActiveMQTempTopic) destination;
178 if (add) {
179 temporaryTopics.add(topic);
180 }
181 else {
182 temporaryTopics.remove(topic);
183 }
184 }
185 else {
186 LOG.warn("Unknown destination type: " + destination);
187 }
188 if (listener != null) {
189 listener.onDestinationEvent(event);
190 }
191 }
192 }