001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import java.util.ArrayList;
020 import java.util.Iterator;
021 import java.util.LinkedList;
022 import java.util.List;
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.broker.region.Destination;
025 import org.apache.activemq.broker.region.MessageReference;
026 import org.apache.activemq.broker.region.QueueMessageReference;
027
028 /**
029 * hold pending messages in a linked list (messages awaiting disptach to a
030 * consumer) cursor
031 *
032 *
033 */
034 public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
035 private final PendingList list;
036 private Iterator<MessageReference> iter;
037
038 public VMPendingMessageCursor(boolean prioritizedMessages) {
039 super(prioritizedMessages);
040 if (this.prioritizedMessages) {
041 this.list= new PrioritizedPendingList();
042 }else {
043 this.list = new OrderedPendingList();
044 }
045 }
046
047
048 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
049 throws Exception {
050 List<MessageReference> rc = new ArrayList<MessageReference>();
051 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
052 MessageReference r = iterator.next();
053 if (r.getRegionDestination() == destination) {
054 r.decrementReferenceCount();
055 rc.add(r);
056 iterator.remove();
057 }
058 }
059 return rc;
060 }
061
062 /**
063 * @return true if there are no pending messages
064 */
065
066 public synchronized boolean isEmpty() {
067 if (list.isEmpty()) {
068 return true;
069 } else {
070 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
071 MessageReference node = iterator.next();
072 if (node == QueueMessageReference.NULL_MESSAGE) {
073 continue;
074 }
075 if (!node.isDropped()) {
076 return false;
077 }
078 // We can remove dropped references.
079 iterator.remove();
080 }
081 return true;
082 }
083 }
084
085 /**
086 * reset the cursor
087 */
088
089 public synchronized void reset() {
090 iter = list.iterator();
091 last = null;
092 }
093
094 /**
095 * add message to await dispatch
096 *
097 * @param node
098 */
099
100 public synchronized void addMessageLast(MessageReference node) {
101 node.incrementReferenceCount();
102 list.addMessageLast(node);
103 }
104
105 /**
106 * add message to await dispatch
107 *
108 * @param position
109 * @param node
110 */
111
112 public synchronized void addMessageFirst(MessageReference node) {
113 node.incrementReferenceCount();
114 list.addMessageFirst(node);
115 }
116
117 /**
118 * @return true if there pending messages to dispatch
119 */
120
121 public synchronized boolean hasNext() {
122 return iter.hasNext();
123 }
124
125 /**
126 * @return the next pending message
127 */
128
129 public synchronized MessageReference next() {
130 last = iter.next();
131 if (last != null) {
132 last.incrementReferenceCount();
133 }
134 return last;
135 }
136
137 /**
138 * remove the message at the cursor position
139 */
140
141 public synchronized void remove() {
142 if (last != null) {
143 last.decrementReferenceCount();
144 }
145 iter.remove();
146 }
147
148 /**
149 * @return the number of pending messages
150 */
151
152 public synchronized int size() {
153 return list.size();
154 }
155
156 /**
157 * clear all pending messages
158 */
159
160 public synchronized void clear() {
161 for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
162 MessageReference ref = i.next();
163 ref.decrementReferenceCount();
164 }
165 list.clear();
166 }
167
168
169 public synchronized void remove(MessageReference node) {
170 list.remove(node);
171 node.decrementReferenceCount();
172 }
173
174 /**
175 * Page in a restricted number of messages
176 *
177 * @param maxItems
178 * @return a list of paged in messages
179 */
180
181 public LinkedList<MessageReference> pageInList(int maxItems) {
182 LinkedList<MessageReference> result = new LinkedList<MessageReference>();
183 for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
184 MessageReference ref = i.next();
185 ref.incrementReferenceCount();
186 result.add(ref);
187 if (result.size() >= maxItems) {
188 break;
189 }
190 }
191 return result;
192 }
193
194
195 public boolean isTransient() {
196 return true;
197 }
198
199
200 public void destroy() throws Exception {
201 super.destroy();
202 clear();
203 }
204 }