001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import org.apache.activemq.broker.Broker;
020 import org.apache.activemq.broker.region.MessageReference;
021 import org.apache.activemq.broker.region.Queue;
022 import org.apache.activemq.command.Message;
023 import org.apache.activemq.usage.SystemUsage;
024 import org.slf4j.Logger;
025 import org.slf4j.LoggerFactory;
026
027 /**
028 * Store based Cursor for Queues
029 */
030 public class StoreQueueCursor extends AbstractPendingMessageCursor {
031
032 private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
033 private final Broker broker;
034 private int pendingCount;
035 private final Queue queue;
036 private PendingMessageCursor nonPersistent;
037 private final QueueStorePrefetch persistent;
038 private boolean started;
039 private PendingMessageCursor currentCursor;
040
041 /**
042 * Construct
043 * @param broker
044 * @param queue
045 */
046 public StoreQueueCursor(Broker broker,Queue queue) {
047 super((queue != null ? queue.isPrioritizedMessages():false));
048 this.broker=broker;
049 this.queue = queue;
050 this.persistent = new QueueStorePrefetch(queue);
051 currentCursor = persistent;
052 }
053
054 public synchronized void start() throws Exception {
055 started = true;
056 super.start();
057 if (nonPersistent == null) {
058 if (broker.getBrokerService().isPersistent()) {
059 nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
060 }else {
061 nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
062 }
063 nonPersistent.setMaxBatchSize(getMaxBatchSize());
064 nonPersistent.setSystemUsage(systemUsage);
065 nonPersistent.setEnableAudit(isEnableAudit());
066 nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
067 nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
068 }
069 nonPersistent.setMessageAudit(getMessageAudit());
070 nonPersistent.start();
071 persistent.setMessageAudit(getMessageAudit());
072 persistent.start();
073 pendingCount = persistent.size() + nonPersistent.size();
074 }
075
076 public synchronized void stop() throws Exception {
077 started = false;
078 if (nonPersistent != null) {
079 // nonPersistent.clear();
080 // nonPersistent.stop();
081 // nonPersistent.gc();
082 nonPersistent.destroy();
083 }
084 persistent.stop();
085 persistent.gc();
086 super.stop();
087 pendingCount = 0;
088 }
089
090 public synchronized void addMessageLast(MessageReference node) throws Exception {
091 if (node != null) {
092 Message msg = node.getMessage();
093 if (started) {
094 pendingCount++;
095 if (!msg.isPersistent()) {
096 nonPersistent.addMessageLast(node);
097 }
098 }
099 if (msg.isPersistent()) {
100 persistent.addMessageLast(node);
101 }
102 }
103 }
104
105 public synchronized void addMessageFirst(MessageReference node) throws Exception {
106 if (node != null) {
107 Message msg = node.getMessage();
108 if (started) {
109 pendingCount++;
110 if (!msg.isPersistent()) {
111 nonPersistent.addMessageFirst(node);
112 }
113 }
114 if (msg.isPersistent()) {
115 persistent.addMessageFirst(node);
116 }
117 }
118 }
119
120 public synchronized void clear() {
121 pendingCount = 0;
122 }
123
124 public synchronized boolean hasNext() {
125 try {
126 getNextCursor();
127 } catch (Exception e) {
128 LOG.error("Failed to get current cursor ", e);
129 throw new RuntimeException(e);
130 }
131 return currentCursor != null ? currentCursor.hasNext() : false;
132 }
133
134 public synchronized MessageReference next() {
135 MessageReference result = currentCursor != null ? currentCursor.next() : null;
136 return result;
137 }
138
139 public synchronized void remove() {
140 if (currentCursor != null) {
141 currentCursor.remove();
142 }
143 pendingCount--;
144 }
145
146 public synchronized void remove(MessageReference node) {
147 if (!node.isPersistent()) {
148 nonPersistent.remove(node);
149 } else {
150 persistent.remove(node);
151 }
152 pendingCount--;
153 }
154
155 public synchronized void reset() {
156 nonPersistent.reset();
157 persistent.reset();
158 pendingCount = persistent.size() + nonPersistent.size();
159 }
160
161 public void release() {
162 nonPersistent.release();
163 persistent.release();
164 }
165
166
167 public synchronized int size() {
168 if (pendingCount < 0) {
169 pendingCount = persistent.size() + nonPersistent.size();
170 }
171 return pendingCount;
172 }
173
174 public synchronized boolean isEmpty() {
175 // if negative, more messages arrived in store since last reset so non empty
176 return pendingCount == 0;
177 }
178
179 /**
180 * Informs the Broker if the subscription needs to intervention to recover
181 * it's state e.g. DurableTopicSubscriber may do
182 *
183 * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
184 * @return true if recovery required
185 */
186 public boolean isRecoveryRequired() {
187 return false;
188 }
189
190 /**
191 * @return the nonPersistent Cursor
192 */
193 public PendingMessageCursor getNonPersistent() {
194 return this.nonPersistent;
195 }
196
197 /**
198 * @param nonPersistent cursor to set
199 */
200 public void setNonPersistent(PendingMessageCursor nonPersistent) {
201 this.nonPersistent = nonPersistent;
202 }
203
204 public void setMaxBatchSize(int maxBatchSize) {
205 persistent.setMaxBatchSize(maxBatchSize);
206 if (nonPersistent != null) {
207 nonPersistent.setMaxBatchSize(maxBatchSize);
208 }
209 super.setMaxBatchSize(maxBatchSize);
210 }
211
212
213 public void setMaxProducersToAudit(int maxProducersToAudit) {
214 super.setMaxProducersToAudit(maxProducersToAudit);
215 if (persistent != null) {
216 persistent.setMaxProducersToAudit(maxProducersToAudit);
217 }
218 if (nonPersistent != null) {
219 nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
220 }
221 }
222
223 public void setMaxAuditDepth(int maxAuditDepth) {
224 super.setMaxAuditDepth(maxAuditDepth);
225 if (persistent != null) {
226 persistent.setMaxAuditDepth(maxAuditDepth);
227 }
228 if (nonPersistent != null) {
229 nonPersistent.setMaxAuditDepth(maxAuditDepth);
230 }
231 }
232
233 public void setEnableAudit(boolean enableAudit) {
234 super.setEnableAudit(enableAudit);
235 if (persistent != null) {
236 persistent.setEnableAudit(enableAudit);
237 }
238 if (nonPersistent != null) {
239 nonPersistent.setEnableAudit(enableAudit);
240 }
241 }
242
243 @Override
244 public void setUseCache(boolean useCache) {
245 super.setUseCache(useCache);
246 if (persistent != null) {
247 persistent.setUseCache(useCache);
248 }
249 if (nonPersistent != null) {
250 nonPersistent.setUseCache(useCache);
251 }
252 }
253
254 @Override
255 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
256 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
257 if (persistent != null) {
258 persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
259 }
260 if (nonPersistent != null) {
261 nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
262 }
263 }
264
265
266
267 public synchronized void gc() {
268 if (persistent != null) {
269 persistent.gc();
270 }
271 if (nonPersistent != null) {
272 nonPersistent.gc();
273 }
274 pendingCount = persistent.size() + nonPersistent.size();
275 }
276
277 public void setSystemUsage(SystemUsage usageManager) {
278 super.setSystemUsage(usageManager);
279 if (persistent != null) {
280 persistent.setSystemUsage(usageManager);
281 }
282 if (nonPersistent != null) {
283 nonPersistent.setSystemUsage(usageManager);
284 }
285 }
286
287 protected synchronized PendingMessageCursor getNextCursor() throws Exception {
288 if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) {
289 currentCursor = currentCursor == persistent ? nonPersistent : persistent;
290 // sanity check
291 if (currentCursor.isEmpty()) {
292 currentCursor = currentCursor == persistent ? nonPersistent : persistent;
293 }
294 }
295 return currentCursor;
296 }
297
298 @Override
299 public boolean isCacheEnabled() {
300 boolean cacheEnabled = isUseCache();
301 if (cacheEnabled) {
302 if (persistent != null) {
303 cacheEnabled &= persistent.isCacheEnabled();
304 }
305 if (nonPersistent != null) {
306 cacheEnabled &= nonPersistent.isCacheEnabled();
307 }
308 setCacheEnabled(cacheEnabled);
309 }
310 return cacheEnabled;
311 }
312 }