001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq;
019
020 import java.util.List;
021 import javax.jms.JMSException;
022 import org.apache.activemq.command.ConsumerId;
023 import org.apache.activemq.command.MessageDispatch;
024 import org.apache.activemq.thread.Task;
025 import org.apache.activemq.thread.TaskRunner;
026 import org.apache.activemq.util.JMSExceptionSupport;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * A utility class used by the Session for dispatching messages asynchronously
032 * to consumers
033 *
034 * @see javax.jms.Session
035 */
036 public class ActiveMQSessionExecutor implements Task {
037 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSessionExecutor.class);
038
039 private final ActiveMQSession session;
040 private final MessageDispatchChannel messageQueue;
041 private boolean dispatchedBySessionPool;
042 private volatile TaskRunner taskRunner;
043 private boolean startedOrWarnedThatNotStarted;
044
045 ActiveMQSessionExecutor(ActiveMQSession session) {
046 this.session = session;
047 if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) {
048 this.messageQueue = new SimplePriorityMessageDispatchChannel();
049 }else {
050 this.messageQueue = new FifoMessageDispatchChannel();
051 }
052 }
053
054 void setDispatchedBySessionPool(boolean value) {
055 dispatchedBySessionPool = value;
056 wakeup();
057 }
058
059 void execute(MessageDispatch message) throws InterruptedException {
060
061 if (!startedOrWarnedThatNotStarted) {
062
063 ActiveMQConnection connection = session.connection;
064 long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
065 if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
066 startedOrWarnedThatNotStarted = true;
067 } else {
068 long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
069
070 // lets only warn when a significant amount of time has passed
071 // just in case its normal operation
072 if (elapsedTime > aboutUnstartedConnectionTimeout) {
073 LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
074 + " Received: " + message);
075 startedOrWarnedThatNotStarted = true;
076 }
077 }
078 }
079
080 if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
081 dispatch(message);
082 } else {
083 messageQueue.enqueue(message);
084 wakeup();
085 }
086 }
087
088 public void wakeup() {
089 if (!dispatchedBySessionPool) {
090 if (session.isSessionAsyncDispatch()) {
091 try {
092 TaskRunner taskRunner = this.taskRunner;
093 if (taskRunner == null) {
094 synchronized (this) {
095 if (this.taskRunner == null) {
096 if (!isRunning()) {
097 // stop has been called
098 return;
099 }
100 this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
101 "ActiveMQ Session: " + session.getSessionId());
102 }
103 taskRunner = this.taskRunner;
104 }
105 }
106 taskRunner.wakeup();
107 } catch (InterruptedException e) {
108 Thread.currentThread().interrupt();
109 }
110 } else {
111 while (iterate()) {
112 }
113 }
114 }
115 }
116
117 void executeFirst(MessageDispatch message) {
118 messageQueue.enqueueFirst(message);
119 wakeup();
120 }
121
122 public boolean hasUncomsumedMessages() {
123 return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
124 }
125
126 void dispatch(MessageDispatch message) {
127 // TODO - we should use a Map for this indexed by consumerId
128 for (ActiveMQMessageConsumer consumer : this.session.consumers) {
129 ConsumerId consumerId = message.getConsumerId();
130 if (consumerId.equals(consumer.getConsumerId())) {
131 consumer.dispatch(message);
132 break;
133 }
134 }
135 }
136
137 synchronized void start() {
138 if (!messageQueue.isRunning()) {
139 messageQueue.start();
140 if (hasUncomsumedMessages()) {
141 wakeup();
142 }
143 }
144 }
145
146 void stop() throws JMSException {
147 try {
148 if (messageQueue.isRunning()) {
149 synchronized(this) {
150 messageQueue.stop();
151 if (this.taskRunner != null) {
152 this.taskRunner.shutdown();
153 this.taskRunner = null;
154 }
155 }
156 }
157 } catch (InterruptedException e) {
158 Thread.currentThread().interrupt();
159 throw JMSExceptionSupport.create(e);
160 }
161 }
162
163 boolean isRunning() {
164 return messageQueue.isRunning();
165 }
166
167 void close() {
168 messageQueue.close();
169 }
170
171 void clear() {
172 messageQueue.clear();
173 }
174
175 MessageDispatch dequeueNoWait() {
176 return messageQueue.dequeueNoWait();
177 }
178
179 protected void clearMessagesInProgress() {
180 messageQueue.clear();
181 }
182
183 public boolean isEmpty() {
184 return messageQueue.isEmpty();
185 }
186
187 public boolean iterate() {
188
189 // Deliver any messages queued on the consumer to their listeners.
190 for (ActiveMQMessageConsumer consumer : this.session.consumers) {
191 if (consumer.iterate()) {
192 return true;
193 }
194 }
195
196 // No messages left queued on the listeners.. so now dispatch messages
197 // queued on the session
198 MessageDispatch message = messageQueue.dequeueNoWait();
199 if (message == null) {
200 return false;
201 } else {
202 dispatch(message);
203 return !messageQueue.isEmpty();
204 }
205 }
206
207 List<MessageDispatch> getUnconsumedMessages() {
208 return messageQueue.removeAll();
209 }
210 }