001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.util.ArrayList;
020 import java.util.LinkedList;
021 import java.util.List;
022 import org.apache.activemq.command.MessageDispatch;
023
024 public class FifoMessageDispatchChannel implements MessageDispatchChannel {
025
026 private final Object mutex = new Object();
027 private final LinkedList<MessageDispatch> list;
028 private boolean closed;
029 private boolean running;
030
031 public FifoMessageDispatchChannel() {
032 this.list = new LinkedList<MessageDispatch>();
033 }
034
035 /* (non-Javadoc)
036 * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch)
037 */
038 public void enqueue(MessageDispatch message) {
039 synchronized (mutex) {
040 list.addLast(message);
041 mutex.notify();
042 }
043 }
044
045 /* (non-Javadoc)
046 * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch)
047 */
048 public void enqueueFirst(MessageDispatch message) {
049 synchronized (mutex) {
050 list.addFirst(message);
051 mutex.notify();
052 }
053 }
054
055 /* (non-Javadoc)
056 * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
057 */
058 public boolean isEmpty() {
059 synchronized (mutex) {
060 return list.isEmpty();
061 }
062 }
063
064 /* (non-Javadoc)
065 * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
066 */
067 public MessageDispatch dequeue(long timeout) throws InterruptedException {
068 synchronized (mutex) {
069 // Wait until the consumer is ready to deliver messages.
070 while (timeout != 0 && !closed && (list.isEmpty() || !running)) {
071 if (timeout == -1) {
072 mutex.wait();
073 } else {
074 mutex.wait(timeout);
075 break;
076 }
077 }
078 if (closed || !running || list.isEmpty()) {
079 return null;
080 }
081 return list.removeFirst();
082 }
083 }
084
085 /* (non-Javadoc)
086 * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
087 */
088 public MessageDispatch dequeueNoWait() {
089 synchronized (mutex) {
090 if (closed || !running || list.isEmpty()) {
091 return null;
092 }
093 return list.removeFirst();
094 }
095 }
096
097 /* (non-Javadoc)
098 * @see org.apache.activemq.MessageDispatchChannelI#peek()
099 */
100 public MessageDispatch peek() {
101 synchronized (mutex) {
102 if (closed || !running || list.isEmpty()) {
103 return null;
104 }
105 return list.getFirst();
106 }
107 }
108
109 /* (non-Javadoc)
110 * @see org.apache.activemq.MessageDispatchChannelI#start()
111 */
112 public void start() {
113 synchronized (mutex) {
114 running = true;
115 mutex.notifyAll();
116 }
117 }
118
119 /* (non-Javadoc)
120 * @see org.apache.activemq.MessageDispatchChannelI#stop()
121 */
122 public void stop() {
123 synchronized (mutex) {
124 running = false;
125 mutex.notifyAll();
126 }
127 }
128
129 /* (non-Javadoc)
130 * @see org.apache.activemq.MessageDispatchChannelI#close()
131 */
132 public void close() {
133 synchronized (mutex) {
134 if (!closed) {
135 running = false;
136 closed = true;
137 }
138 mutex.notifyAll();
139 }
140 }
141
142 /* (non-Javadoc)
143 * @see org.apache.activemq.MessageDispatchChannelI#clear()
144 */
145 public void clear() {
146 synchronized (mutex) {
147 list.clear();
148 }
149 }
150
151 /* (non-Javadoc)
152 * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
153 */
154 public boolean isClosed() {
155 return closed;
156 }
157
158 /* (non-Javadoc)
159 * @see org.apache.activemq.MessageDispatchChannelI#size()
160 */
161 public int size() {
162 synchronized (mutex) {
163 return list.size();
164 }
165 }
166
167 /* (non-Javadoc)
168 * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
169 */
170 public Object getMutex() {
171 return mutex;
172 }
173
174 /* (non-Javadoc)
175 * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
176 */
177 public boolean isRunning() {
178 return running;
179 }
180
181 /* (non-Javadoc)
182 * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
183 */
184 public List<MessageDispatch> removeAll() {
185 synchronized (mutex) {
186 ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
187 list.clear();
188 return rc;
189 }
190 }
191
192 @Override
193 public String toString() {
194 synchronized (mutex) {
195 return list.toString();
196 }
197 }
198 }