001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.util.Enumeration;
020 import java.util.concurrent.atomic.AtomicBoolean;
021
022 import javax.jms.IllegalStateException;
023 import javax.jms.JMSException;
024 import javax.jms.Message;
025 import javax.jms.Queue;
026 import javax.jms.QueueBrowser;
027
028 import org.apache.activemq.command.ActiveMQDestination;
029 import org.apache.activemq.command.ConsumerId;
030 import org.apache.activemq.command.MessageDispatch;
031
032 /**
033 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
034 * queue without removing them. <p/>
035 * <P>
036 * The <CODE>getEnumeration</CODE> method returns a <CODE>
037 * java.util.Enumeration</CODE>
038 * that is used to scan the queue's messages. It may be an enumeration of the
039 * entire content of a queue, or it may contain only the messages matching a
040 * message selector. <p/>
041 * <P>
042 * Messages may be arriving and expiring while the scan is done. The JMS API
043 * does not require the content of an enumeration to be a static snapshot of
044 * queue content. Whether these changes are visible or not depends on the JMS
045 * provider. <p/>
046 * <P>
047 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
048 * </CODE>
049 * or a <CODE>QueueSession</CODE>.
050 *
051 * @see javax.jms.Session#createBrowser
052 * @see javax.jms.QueueSession#createBrowser
053 * @see javax.jms.QueueBrowser
054 * @see javax.jms.QueueReceiver
055 */
056
057 public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
058
059 private final ActiveMQSession session;
060 private final ActiveMQDestination destination;
061 private final String selector;
062
063 private ActiveMQMessageConsumer consumer;
064 private boolean closed;
065 private final ConsumerId consumerId;
066 private final AtomicBoolean browseDone = new AtomicBoolean(true);
067 private final boolean dispatchAsync;
068 private Object semaphore = new Object();
069
070 /**
071 * Constructor for an ActiveMQQueueBrowser - used internally
072 *
073 * @param theSession
074 * @param dest
075 * @param selector
076 * @throws JMSException
077 */
078 protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
079 this.session = session;
080 this.consumerId = consumerId;
081 this.destination = destination;
082 this.selector = selector;
083 this.dispatchAsync = dispatchAsync;
084 this.consumer = createConsumer();
085 }
086
087 /**
088 * @param session
089 * @param originalDestination
090 * @param selectorExpression
091 * @param cnum
092 * @return
093 * @throws JMSException
094 */
095 private ActiveMQMessageConsumer createConsumer() throws JMSException {
096 browseDone.set(false);
097 ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
098
099 return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
100 .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
101 public void dispatch(MessageDispatch md) {
102 if (md.getMessage() == null) {
103 browseDone.set(true);
104 } else {
105 super.dispatch(md);
106 }
107 notifyMessageAvailable();
108 }
109 };
110 }
111
112 private void destroyConsumer() {
113 if (consumer == null) {
114 return;
115 }
116 try {
117 if (session.getTransacted() && session.getTransactionContext().isInLocalTransaction()) {
118 session.commit();
119 }
120 consumer.close();
121 consumer = null;
122 } catch (JMSException e) {
123 e.printStackTrace();
124 }
125 }
126
127 /**
128 * Gets an enumeration for browsing the current queue messages in the order
129 * they would be received.
130 *
131 * @return an enumeration for browsing the messages
132 * @throws JMSException if the JMS provider fails to get the enumeration for
133 * this browser due to some internal error.
134 */
135
136 public Enumeration getEnumeration() throws JMSException {
137 checkClosed();
138 if (consumer == null) {
139 consumer = createConsumer();
140 }
141 return this;
142 }
143
144 private void checkClosed() throws IllegalStateException {
145 if (closed) {
146 throw new IllegalStateException("The Consumer is closed");
147 }
148 }
149
150 /**
151 * @return true if more messages to process
152 */
153 public boolean hasMoreElements() {
154 while (true) {
155
156 synchronized (this) {
157 if (consumer == null) {
158 return false;
159 }
160 }
161
162 if (consumer.getMessageSize() > 0) {
163 return true;
164 }
165
166 if (browseDone.get() || !session.isRunning()) {
167 destroyConsumer();
168 return false;
169 }
170
171 waitForMessage();
172 }
173 }
174
175 /**
176 * @return the next message
177 */
178 public Object nextElement() {
179 while (true) {
180
181 synchronized (this) {
182 if (consumer == null) {
183 return null;
184 }
185 }
186
187 try {
188 Message answer = consumer.receiveNoWait();
189 if (answer != null) {
190 return answer;
191 }
192 } catch (JMSException e) {
193 this.session.connection.onClientInternalException(e);
194 return null;
195 }
196
197 if (browseDone.get() || !session.isRunning()) {
198 destroyConsumer();
199 return null;
200 }
201
202 waitForMessage();
203 }
204 }
205
206 public synchronized void close() throws JMSException {
207 destroyConsumer();
208 closed = true;
209 }
210
211 /**
212 * Gets the queue associated with this queue browser.
213 *
214 * @return the queue
215 * @throws JMSException if the JMS provider fails to get the queue
216 * associated with this browser due to some internal error.
217 */
218
219 public Queue getQueue() throws JMSException {
220 return (Queue)destination;
221 }
222
223 public String getMessageSelector() throws JMSException {
224 return selector;
225 }
226
227 // Implementation methods
228 // -------------------------------------------------------------------------
229
230 /**
231 * Wait on a semaphore for a fixed amount of time for a message to come in.
232 * @throws JMSException
233 */
234 protected void waitForMessage() {
235 try {
236 consumer.sendPullCommand(-1);
237 synchronized (semaphore) {
238 semaphore.wait(2000);
239 }
240 } catch (InterruptedException e) {
241 Thread.currentThread().interrupt();
242 } catch (JMSException e) {
243 }
244
245 }
246
247 protected void notifyMessageAvailable() {
248 synchronized (semaphore) {
249 semaphore.notifyAll();
250 }
251 }
252
253 public String toString() {
254 return "ActiveMQQueueBrowser { value=" + consumerId + " }";
255 }
256
257 }