001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.util.ArrayList;
020 import java.util.LinkedList;
021 import java.util.List;
022 import org.apache.activemq.command.MessageDispatch;
023
024 public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
025 private static final Integer MAX_PRIORITY = 10;
026 private final Object mutex = new Object();
027 private final LinkedList<MessageDispatch>[] lists;
028 private boolean closed;
029 private boolean running;
030 private int size = 0;
031
032 public SimplePriorityMessageDispatchChannel() {
033 this.lists = new LinkedList[MAX_PRIORITY];
034 for (int i = 0; i < MAX_PRIORITY; i++) {
035 lists[i] = new LinkedList<MessageDispatch>();
036 }
037 }
038
039 /*
040 * (non-Javadoc)
041 * @see
042 * org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq
043 * .command.MessageDispatch)
044 */
045 public void enqueue(MessageDispatch message) {
046 synchronized (mutex) {
047 getList(message).addLast(message);
048
049 this.size++;
050 mutex.notify();
051 }
052 }
053
054 /*
055 * (non-Javadoc)
056 * @see
057 * org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq
058 * .command.MessageDispatch)
059 */
060 public void enqueueFirst(MessageDispatch message) {
061 synchronized (mutex) {
062 getList(message).addFirst(message);
063 this.size++;
064 mutex.notify();
065 }
066 }
067
068 /*
069 * (non-Javadoc)
070 * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
071 */
072 public boolean isEmpty() {
073 // synchronized (mutex) {
074 return this.size == 0;
075 // }
076 }
077
078 /*
079 * (non-Javadoc)
080 * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
081 */
082 public MessageDispatch dequeue(long timeout) throws InterruptedException {
083 synchronized (mutex) {
084 // Wait until the consumer is ready to deliver messages.
085 while (timeout != 0 && !closed && (isEmpty() || !running)) {
086 if (timeout == -1) {
087 mutex.wait();
088 } else {
089 mutex.wait(timeout);
090 break;
091 }
092 }
093 if (closed || !running || isEmpty()) {
094 return null;
095 }
096 return removeFirst();
097 }
098 }
099
100 /*
101 * (non-Javadoc)
102 * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
103 */
104 public MessageDispatch dequeueNoWait() {
105 synchronized (mutex) {
106 if (closed || !running || isEmpty()) {
107 return null;
108 }
109 return removeFirst();
110 }
111 }
112
113 /*
114 * (non-Javadoc)
115 * @see org.apache.activemq.MessageDispatchChannelI#peek()
116 */
117 public MessageDispatch peek() {
118 synchronized (mutex) {
119 if (closed || !running || isEmpty()) {
120 return null;
121 }
122 return getFirst();
123 }
124 }
125
126 /*
127 * (non-Javadoc)
128 * @see org.apache.activemq.MessageDispatchChannelI#start()
129 */
130 public void start() {
131 synchronized (mutex) {
132 running = true;
133 mutex.notifyAll();
134 }
135 }
136
137 /*
138 * (non-Javadoc)
139 * @see org.apache.activemq.MessageDispatchChannelI#stop()
140 */
141 public void stop() {
142 synchronized (mutex) {
143 running = false;
144 mutex.notifyAll();
145 }
146 }
147
148 /*
149 * (non-Javadoc)
150 * @see org.apache.activemq.MessageDispatchChannelI#close()
151 */
152 public void close() {
153 synchronized (mutex) {
154 if (!closed) {
155 running = false;
156 closed = true;
157 }
158 mutex.notifyAll();
159 }
160 }
161
162 /*
163 * (non-Javadoc)
164 * @see org.apache.activemq.MessageDispatchChannelI#clear()
165 */
166 public void clear() {
167 synchronized (mutex) {
168 for (int i = 0; i < MAX_PRIORITY; i++) {
169 lists[i].clear();
170 }
171 }
172 }
173
174 /*
175 * (non-Javadoc)
176 * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
177 */
178 public boolean isClosed() {
179 return closed;
180 }
181
182 /*
183 * (non-Javadoc)
184 * @see org.apache.activemq.MessageDispatchChannelI#size()
185 */
186 public int size() {
187 synchronized (mutex) {
188 return this.size;
189 }
190 }
191
192 /*
193 * (non-Javadoc)
194 * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
195 */
196 public Object getMutex() {
197 return mutex;
198 }
199
200 /*
201 * (non-Javadoc)
202 * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
203 */
204 public boolean isRunning() {
205 return running;
206 }
207
208 /*
209 * (non-Javadoc)
210 * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
211 */
212 public List<MessageDispatch> removeAll() {
213
214 synchronized (mutex) {
215 ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size());
216 for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
217 List<MessageDispatch> list = lists[i];
218 result.addAll(list);
219 size -= list.size();
220 list.clear();
221 }
222 return result;
223 }
224 }
225
226 @Override
227 public String toString() {
228
229 String result = "";
230 for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
231 result += i + ":{" + lists[i].toString() + "}";
232 }
233 return result;
234
235 }
236
237 protected int getPriority(MessageDispatch message) {
238 int priority = javax.jms.Message.DEFAULT_PRIORITY;
239 if (message.getMessage() != null) {
240 priority = Math.max(message.getMessage().getPriority(), 0);
241 priority = Math.min(priority, 9);
242 }
243 return priority;
244 }
245
246 protected LinkedList<MessageDispatch> getList(MessageDispatch md) {
247 return lists[getPriority(md)];
248 }
249
250 private final MessageDispatch removeFirst() {
251 if (this.size > 0) {
252 for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
253 LinkedList<MessageDispatch> list = lists[i];
254 if (!list.isEmpty()) {
255 this.size--;
256 return list.removeFirst();
257 }
258 }
259 }
260 return null;
261 }
262
263 private final MessageDispatch getFirst() {
264 if (this.size > 0) {
265 for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
266 LinkedList<MessageDispatch> list = lists[i];
267 if (!list.isEmpty()) {
268 return list.getFirst();
269 }
270 }
271 }
272 return null;
273 }
274 }