001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.console.filter;
018
019 import java.net.URI;
020 import java.util.Collections;
021 import java.util.Iterator;
022 import java.util.List;
023
024 import javax.jms.Connection;
025 import javax.jms.ConnectionFactory;
026 import javax.jms.Destination;
027 import javax.jms.JMSException;
028 import javax.jms.QueueBrowser;
029 import javax.jms.Session;
030
031 import org.apache.activemq.ActiveMQConnectionFactory;
032 import org.apache.activemq.command.ActiveMQQueue;
033 import org.apache.activemq.command.ActiveMQTopic;
034
035 public class AmqMessagesQueryFilter extends AbstractQueryFilter {
036
037 private URI brokerUrl;
038 private Destination destination;
039
040 private ConnectionFactory connectionFactory;
041
042 /**
043 * Create a JMS message query filter
044 *
045 * @param brokerUrl - broker url to connect to
046 * @param destination - JMS destination to query
047 */
048 public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) {
049 super(null);
050 this.brokerUrl = brokerUrl;
051 this.destination = destination;
052 }
053
054 /**
055 * Create a JMS message query filter
056 *
057 * @param brokerUrl - broker url to connect to
058 * @param destination - JMS destination to query
059 */
060 public AmqMessagesQueryFilter(ConnectionFactory connectionFactory, Destination destination) {
061 super(null);
062 this.destination = destination;
063 this.connectionFactory = connectionFactory;
064 }
065
066 /**
067 * Queries the specified destination using the message selector format query
068 *
069 * @param queries - message selector queries
070 * @return list messages that matches the selector
071 * @throws Exception
072 */
073 public List query(List queries) throws Exception {
074 String selector = "";
075
076 // Convert to message selector
077 for (Iterator i = queries.iterator(); i.hasNext(); ) {
078 selector = selector + "(" + i.next().toString() + ") AND ";
079 }
080
081 // Remove last AND
082 if (!selector.equals("")) {
083 selector = selector.substring(0, selector.length() - 5);
084 }
085
086 if (destination instanceof ActiveMQQueue) {
087 return queryMessages((ActiveMQQueue) destination, selector);
088 } else {
089 return queryMessages((ActiveMQTopic) destination, selector);
090 }
091 }
092
093 /**
094 * Query the messages of a queue destination using a queue browser
095 *
096 * @param queue - queue destination
097 * @param selector - message selector
098 * @return list of messages that matches the selector
099 * @throws Exception
100 */
101 protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception {
102 Connection conn = createConnection();
103
104 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
105 QueueBrowser browser = sess.createBrowser(queue, selector);
106
107 List messages = Collections.list(browser.getEnumeration());
108
109 conn.close();
110
111 return messages;
112 }
113
114 /**
115 * Query the messages of a topic destination using a message consumer
116 *
117 * @param topic - topic destination
118 * @param selector - message selector
119 * @return list of messages that matches the selector
120 * @throws Exception
121 */
122 protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception {
123 // TODO: should we use a durable subscriber or a retroactive non-durable
124 // subscriber?
125 // TODO: if a durable subscriber is used, how do we manage it?
126 // subscribe/unsubscribe tasks?
127 return null;
128 }
129
130 /**
131 * Create and start a JMS connection
132 *
133 * @param brokerUrl - broker url to connect to.
134 * @return JMS connection
135 * @throws JMSException
136 * @deprecated Use createConnection() instead, and pass the url to the ConnectionFactory when it's created.
137 */
138 @Deprecated
139 protected Connection createConnection(URI brokerUrl) throws JMSException {
140 // maintain old behaviour, when called this way.
141 connectionFactory = (new ActiveMQConnectionFactory(brokerUrl));
142 return createConnection();
143 }
144
145 /**
146 * Create and start a JMS connection
147 *
148 * @return JMS connection
149 * @throws JMSException
150 */
151 protected Connection createConnection() throws JMSException {
152 // maintain old behaviour, when called either way.
153 if (null == connectionFactory)
154 connectionFactory = (new ActiveMQConnectionFactory(getBrokerUrl()));
155
156 Connection conn = connectionFactory.createConnection();
157 conn.start();
158 return conn;
159 }
160
161
162 /**
163 * Get the broker url being used.
164 *
165 * @return broker url
166 */
167 public URI getBrokerUrl() {
168 return brokerUrl;
169 }
170
171 /**
172 * Set the broker url to use.
173 *
174 * @param brokerUrl - broker url
175 */
176 public void setBrokerUrl(URI brokerUrl) {
177 this.brokerUrl = brokerUrl;
178 }
179
180 /**
181 * Get the destination being used.
182 *
183 * @return - JMS destination
184 */
185 public Destination getDestination() {
186 return destination;
187 }
188
189 /**
190 * Set the destination to use.
191 *
192 * @param destination - JMS destination
193 */
194 public void setDestination(Destination destination) {
195 this.destination = destination;
196 }
197
198 }