001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import java.util.Collections;
020 import java.util.LinkedList;
021 import java.util.List;
022 import java.util.Set;
023 import org.apache.activemq.ActiveMQMessageAudit;
024 import org.apache.activemq.broker.Broker;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.broker.region.BaseDestination;
027 import org.apache.activemq.broker.region.Destination;
028 import org.apache.activemq.broker.region.MessageReference;
029 import org.apache.activemq.broker.region.Subscription;
030 import org.apache.activemq.command.MessageId;
031 import org.apache.activemq.usage.SystemUsage;
032
033 /**
034 * Abstract method holder for pending message (messages awaiting disptach to a
035 * consumer) cursor
036 *
037 *
038 */
039 public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040 protected int memoryUsageHighWaterMark = 70;
041 protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042 protected SystemUsage systemUsage;
043 protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044 protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045 protected boolean enableAudit=true;
046 protected ActiveMQMessageAudit audit;
047 protected boolean useCache=true;
048 private boolean cacheEnabled=true;
049 private boolean started=false;
050 protected MessageReference last = null;
051 protected final boolean prioritizedMessages;
052
053 public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054 this.prioritizedMessages=prioritizedMessages;
055 }
056
057
058 public synchronized void start() throws Exception {
059 if (!started && enableAudit && audit==null) {
060 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
061 }
062 started=true;
063 }
064
065 public synchronized void stop() throws Exception {
066 started=false;
067 gc();
068 }
069
070 public void add(ConnectionContext context, Destination destination) throws Exception {
071 }
072
073 @SuppressWarnings("unchecked")
074 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
075 return Collections.EMPTY_LIST;
076 }
077
078 public boolean isRecoveryRequired() {
079 return true;
080 }
081
082 public void addMessageFirst(MessageReference node) throws Exception {
083 }
084
085 public void addMessageLast(MessageReference node) throws Exception {
086 }
087
088 public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
089 addMessageLast(node);
090 return true;
091 }
092
093 public void addRecoveredMessage(MessageReference node) throws Exception {
094 addMessageLast(node);
095 }
096
097 public void clear() {
098 }
099
100 public boolean hasNext() {
101 return false;
102 }
103
104 public boolean isEmpty() {
105 return false;
106 }
107
108 public boolean isEmpty(Destination destination) {
109 return isEmpty();
110 }
111
112 public MessageReference next() {
113 return null;
114 }
115
116 public void remove() {
117 }
118
119 public void reset() {
120 }
121
122 public int size() {
123 return 0;
124 }
125
126 public int getMaxBatchSize() {
127 return maxBatchSize;
128 }
129
130 public void setMaxBatchSize(int maxBatchSize) {
131 this.maxBatchSize = maxBatchSize;
132 }
133
134 protected void fillBatch() throws Exception {
135 }
136
137 public void resetForGC() {
138 reset();
139 }
140
141 public void remove(MessageReference node) {
142 }
143
144 public void gc() {
145 }
146
147 public void setSystemUsage(SystemUsage usageManager) {
148 this.systemUsage = usageManager;
149 }
150
151 public boolean hasSpace() {
152 return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
153 }
154
155 public boolean isFull() {
156 return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
157 }
158
159 public void release() {
160 }
161
162 public boolean hasMessagesBufferedToDeliver() {
163 return false;
164 }
165
166 /**
167 * @return the memoryUsageHighWaterMark
168 */
169 public int getMemoryUsageHighWaterMark() {
170 return memoryUsageHighWaterMark;
171 }
172
173 /**
174 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
175 */
176 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
177 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
178 }
179
180 /**
181 * @return the usageManager
182 */
183 public SystemUsage getSystemUsage() {
184 return this.systemUsage;
185 }
186
187 /**
188 * destroy the cursor
189 *
190 * @throws Exception
191 */
192 public void destroy() throws Exception {
193 stop();
194 }
195
196 /**
197 * Page in a restricted number of messages
198 *
199 * @param maxItems maximum number of messages to return
200 * @return a list of paged in messages
201 */
202 public LinkedList<MessageReference> pageInList(int maxItems) {
203 throw new RuntimeException("Not supported");
204 }
205
206 /**
207 * @return the maxProducersToAudit
208 */
209 public int getMaxProducersToAudit() {
210 return maxProducersToAudit;
211 }
212
213 /**
214 * @param maxProducersToAudit the maxProducersToAudit to set
215 */
216 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
217 this.maxProducersToAudit = maxProducersToAudit;
218 if (audit != null) {
219 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
220 }
221 }
222
223 /**
224 * @return the maxAuditDepth
225 */
226 public int getMaxAuditDepth() {
227 return maxAuditDepth;
228 }
229
230
231 /**
232 * @param maxAuditDepth the maxAuditDepth to set
233 */
234 public synchronized void setMaxAuditDepth(int maxAuditDepth) {
235 this.maxAuditDepth = maxAuditDepth;
236 if (audit != null) {
237 audit.setAuditDepth(maxAuditDepth);
238 }
239 }
240
241
242 /**
243 * @return the enableAudit
244 */
245 public boolean isEnableAudit() {
246 return enableAudit;
247 }
248
249 /**
250 * @param enableAudit the enableAudit to set
251 */
252 public synchronized void setEnableAudit(boolean enableAudit) {
253 this.enableAudit = enableAudit;
254 if (enableAudit && started && audit==null) {
255 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
256 }
257 }
258
259 public boolean isTransient() {
260 return false;
261 }
262
263
264 /**
265 * set the audit
266 * @param audit new audit component
267 */
268 public void setMessageAudit(ActiveMQMessageAudit audit) {
269 this.audit=audit;
270 }
271
272
273 /**
274 * @return the audit
275 */
276 public ActiveMQMessageAudit getMessageAudit() {
277 return audit;
278 }
279
280 public boolean isUseCache() {
281 return useCache;
282 }
283
284 public void setUseCache(boolean useCache) {
285 this.useCache = useCache;
286 }
287
288 public synchronized boolean isDuplicate(MessageId messageId) {
289 boolean unique = recordUniqueId(messageId);
290 rollback(messageId);
291 return !unique;
292 }
293
294 /**
295 * records a message id and checks if it is a duplicate
296 * @param messageId
297 * @return true if id is unique, false otherwise.
298 */
299 public synchronized boolean recordUniqueId(MessageId messageId) {
300 if (!enableAudit || audit==null) {
301 return true;
302 }
303 return !audit.isDuplicate(messageId);
304 }
305
306 public synchronized void rollback(MessageId id) {
307 if (audit != null) {
308 audit.rollback(id);
309 }
310 }
311
312 protected synchronized boolean isStarted() {
313 return started;
314 }
315
316 public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
317 boolean result = false;
318 Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
319 if (destinations != null) {
320 for (Destination dest:destinations) {
321 if (dest.isPrioritizedMessages()) {
322 result = true;
323 break;
324 }
325 }
326 }
327 return result;
328
329 }
330
331 public synchronized boolean isCacheEnabled() {
332 return cacheEnabled;
333 }
334
335 public synchronized void setCacheEnabled(boolean val) {
336 cacheEnabled = val;
337 }
338 }