001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import java.io.IOException;
020 import java.util.LinkedList;
021 import java.util.List;
022 import org.apache.activemq.ActiveMQMessageAudit;
023 import org.apache.activemq.Service;
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.broker.region.Destination;
026 import org.apache.activemq.broker.region.MessageReference;
027 import org.apache.activemq.command.MessageId;
028 import org.apache.activemq.usage.SystemUsage;
029
030 /**
031 * Interface to pending message (messages awaiting disptach to a consumer)
032 * cursor
033 *
034 *
035 */
036 public interface PendingMessageCursor extends Service {
037
038 /**
039 * Add a destination
040 *
041 * @param context
042 * @param destination
043 * @throws Exception
044 */
045 void add(ConnectionContext context, Destination destination) throws Exception;
046
047 /**
048 * remove a destination
049 *
050 * @param context
051 * @param destination
052 * @throws Exception
053 */
054 List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
055
056 /**
057 * @return true if there are no pending messages
058 */
059 boolean isEmpty();
060
061 /**
062 * check if a Destination is Empty for this cursor
063 *
064 * @param destination
065 * @return true id the Destination is empty
066 */
067 boolean isEmpty(Destination destination);
068
069 /**
070 * reset the cursor
071 */
072 void reset();
073
074 /**
075 * hint to the cursor to release any locks it might have grabbed after a
076 * reset
077 */
078 void release();
079
080 /**
081 * add message to await dispatch
082 *
083 * @param node
084 * @throws IOException
085 * @throws Exception
086 */
087 void addMessageLast(MessageReference node) throws Exception;
088 /**
089 * add message to await dispatch - if it can
090 *
091 * @param node
092 * @param maxWaitTime
093 * @return true if successful
094 * @throws IOException
095 * @throws Exception
096 */
097 boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception;
098
099 /**
100 * add message to await dispatch
101 *
102 * @param node
103 * @throws Exception
104 */
105 void addMessageFirst(MessageReference node) throws Exception;
106
107 /**
108 * Add a message recovered from a retroactive policy
109 *
110 * @param node
111 * @throws Exception
112 */
113 void addRecoveredMessage(MessageReference node) throws Exception;
114
115 /**
116 * @return true if there pending messages to dispatch
117 */
118 boolean hasNext();
119
120 /**
121 * @return the next pending message with its reference count increment
122 */
123 MessageReference next();
124
125 /**
126 * remove the message at the cursor position
127 */
128 void remove();
129
130 /**
131 * @return the number of pending messages
132 */
133 int size();
134
135 /**
136 * clear all pending messages
137 */
138 void clear();
139
140 /**
141 * Informs the Broker if the subscription needs to intervention to recover
142 * it's state e.g. DurableTopicSubscriber may do
143 *
144 * @return true if recovery required
145 */
146 boolean isRecoveryRequired();
147
148 /**
149 * @return the maximum batch size
150 */
151 int getMaxBatchSize();
152
153 /**
154 * Set the max batch size
155 *
156 * @param maxBatchSize
157 */
158 void setMaxBatchSize(int maxBatchSize);
159
160 /**
161 * Give the cursor a hint that we are about to remove messages from memory
162 * only
163 */
164 void resetForGC();
165
166 /**
167 * remove a node
168 *
169 * @param node
170 */
171 void remove(MessageReference node);
172
173 /**
174 * free up any internal buffers
175 */
176 void gc();
177
178 /**
179 * Set the UsageManager
180 *
181 * @param systemUsage
182 * @see org.apache.activemq.usage.SystemUsage
183 */
184 void setSystemUsage(SystemUsage systemUsage);
185
186 /**
187 * @return the usageManager
188 */
189 SystemUsage getSystemUsage();
190
191 /**
192 * @return the memoryUsageHighWaterMark
193 */
194 int getMemoryUsageHighWaterMark();
195
196 /**
197 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
198 */
199 void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
200
201 /**
202 * @return true if the cursor is full
203 */
204 boolean isFull();
205
206 /**
207 * @return true if the cursor has space to page messages into
208 */
209 public boolean hasSpace();
210
211 /**
212 * @return true if the cursor has buffered messages ready to deliver
213 */
214 boolean hasMessagesBufferedToDeliver();
215
216 /**
217 * destroy the cursor
218 *
219 * @throws Exception
220 */
221 void destroy() throws Exception;
222
223 /**
224 * Page in a restricted number of messages and increment the reference count
225 *
226 * @param maxItems
227 * @return a list of paged in messages
228 */
229 LinkedList<MessageReference> pageInList(int maxItems);
230
231 /**
232 * set the maximum number of producers to track at one time
233 * @param value
234 */
235 void setMaxProducersToAudit(int value);
236
237 /**
238 * @return the maximum number of producers to audit
239 */
240 int getMaxProducersToAudit();
241
242 /**
243 * Set the maximum depth of message ids to track
244 * @param depth
245 */
246 void setMaxAuditDepth(int depth);
247
248 /**
249 * @return the audit depth
250 */
251 int getMaxAuditDepth();
252
253 /**
254 * @return the enableAudit
255 */
256 public boolean isEnableAudit();
257 /**
258 * @param enableAudit the enableAudit to set
259 */
260 public void setEnableAudit(boolean enableAudit);
261
262 /**
263 * @return true if the underlying state of this cursor
264 * disappears when the broker shuts down
265 */
266 public boolean isTransient();
267
268
269 /**
270 * set the audit
271 * @param audit
272 */
273 public void setMessageAudit(ActiveMQMessageAudit audit);
274
275
276 /**
277 * @return the audit - could be null
278 */
279 public ActiveMQMessageAudit getMessageAudit();
280
281 /**
282 * use a cache to improve performance
283 * @param useCache
284 */
285 public void setUseCache(boolean useCache);
286
287 /**
288 * @return true if a cache may be used
289 */
290 public boolean isUseCache();
291
292 /**
293 * remove from auditing the message id
294 * @param id
295 */
296 public void rollback(MessageId id);
297
298 /**
299 * @return true if cache is being used
300 */
301 public boolean isCacheEnabled();
302
303 }