001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.cursors;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.LinkedList;
022 import java.util.concurrent.atomic.AtomicBoolean;
023 import java.util.concurrent.atomic.AtomicLong;
024 import org.apache.activemq.broker.Broker;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.broker.region.Destination;
027 import org.apache.activemq.broker.region.IndirectMessageReference;
028 import org.apache.activemq.broker.region.MessageReference;
029 import org.apache.activemq.broker.region.QueueMessageReference;
030 import org.apache.activemq.command.Message;
031 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
032 import org.apache.activemq.openwire.OpenWireFormat;
033 import org.apache.activemq.store.kahadb.plist.PList;
034 import org.apache.activemq.store.kahadb.plist.PListEntry;
035 import org.apache.activemq.store.kahadb.plist.PListStore;
036 import org.apache.activemq.usage.SystemUsage;
037 import org.apache.activemq.usage.Usage;
038 import org.apache.activemq.usage.UsageListener;
039 import org.apache.activemq.wireformat.WireFormat;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042 import org.apache.kahadb.util.ByteSequence;
043
044 /**
045 * persist pending messages pending message (messages awaiting dispatch to a
046 * consumer) cursor
047 *
048 *
049 */
050 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
051 static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
052 private static final AtomicLong NAME_COUNT = new AtomicLong();
053 protected Broker broker;
054 private final PListStore store;
055 private final String name;
056 private PendingList memoryList;
057 private PList diskList;
058 private Iterator<MessageReference> iter;
059 private Destination regionDestination;
060 private boolean iterating;
061 private boolean flushRequired;
062 private final AtomicBoolean started = new AtomicBoolean();
063 private final WireFormat wireFormat = new OpenWireFormat();
064 /**
065 * @param broker
066 * @param name
067 * @param prioritizedMessages
068 */
069 public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
070 super(prioritizedMessages);
071 if (this.prioritizedMessages) {
072 this.memoryList = new PrioritizedPendingList();
073 } else {
074 this.memoryList = new OrderedPendingList();
075 }
076 this.broker = broker;
077 // the store can be null if the BrokerService has persistence
078 // turned off
079 this.store = broker.getTempDataStore();
080 this.name = NAME_COUNT.incrementAndGet() + "_" + name;
081 }
082
083 @Override
084 public void start() throws Exception {
085 if (started.compareAndSet(false, true)) {
086 super.start();
087 if (systemUsage != null) {
088 systemUsage.getMemoryUsage().addUsageListener(this);
089 }
090 }
091 }
092
093 @Override
094 public void stop() throws Exception {
095 if (started.compareAndSet(true, false)) {
096 super.stop();
097 if (systemUsage != null) {
098 systemUsage.getMemoryUsage().removeUsageListener(this);
099 }
100 }
101 }
102
103 /**
104 * @return true if there are no pending messages
105 */
106 @Override
107 public synchronized boolean isEmpty() {
108 if (memoryList.isEmpty() && isDiskListEmpty()) {
109 return true;
110 }
111 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
112 MessageReference node = iterator.next();
113 if (node == QueueMessageReference.NULL_MESSAGE) {
114 continue;
115 }
116 if (!node.isDropped()) {
117 return false;
118 }
119 // We can remove dropped references.
120 iterator.remove();
121 }
122 return isDiskListEmpty();
123 }
124
125 /**
126 * reset the cursor
127 */
128 @Override
129 public synchronized void reset() {
130 iterating = true;
131 last = null;
132 if (isDiskListEmpty()) {
133 this.iter = this.memoryList.iterator();
134 } else {
135 this.iter = new DiskIterator();
136 }
137 }
138
139 @Override
140 public synchronized void release() {
141 iterating = false;
142 if (iter instanceof DiskIterator) {
143 ((DiskIterator)iter).release();
144 };
145 if (flushRequired) {
146 flushRequired = false;
147 if (!hasSpace()) {
148 flushToDisk();
149 }
150 }
151 }
152
153 @Override
154 public synchronized void destroy() throws Exception {
155 stop();
156 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
157 Message node = (Message) i.next();
158 node.decrementReferenceCount();
159 }
160 memoryList.clear();
161 destroyDiskList();
162 }
163
164 private void destroyDiskList() throws Exception {
165 if (diskList != null) {
166 store.removePList(name);
167 diskList = null;
168 }
169 }
170
171 @Override
172 public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
173 LinkedList<MessageReference> result = new LinkedList<MessageReference>();
174 int count = 0;
175 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
176 MessageReference ref = i.next();
177 ref.incrementReferenceCount();
178 result.add(ref);
179 count++;
180 }
181 if (count < maxItems && !isDiskListEmpty()) {
182 for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
183 Message message = (Message) i.next();
184 message.setRegionDestination(regionDestination);
185 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
186 message.incrementReferenceCount();
187 result.add(message);
188 count++;
189 }
190 }
191 return result;
192 }
193
194 /**
195 * add message to await dispatch
196 *
197 * @param node
198 * @throws Exception
199 */
200 @Override
201 public synchronized void addMessageLast(MessageReference node) throws Exception {
202 tryAddMessageLast(node, 0);
203 }
204
205 @Override
206 public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
207 if (!node.isExpired()) {
208 try {
209 regionDestination = node.getMessage().getRegionDestination();
210 if (isDiskListEmpty()) {
211 if (hasSpace() || this.store == null) {
212 memoryList.addMessageLast(node);
213 node.incrementReferenceCount();
214 setCacheEnabled(true);
215 return true;
216 }
217 }
218 if (!hasSpace()) {
219 if (isDiskListEmpty()) {
220 expireOldMessages();
221 if (hasSpace()) {
222 memoryList.addMessageLast(node);
223 node.incrementReferenceCount();
224 return true;
225 } else {
226 flushToDisk();
227 }
228 }
229 }
230 if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
231 ByteSequence bs = getByteSequence(node.getMessage());
232 getDiskList().addLast(node.getMessageId().toString(), bs);
233 return true;
234 }
235 return false;
236
237 } catch (Exception e) {
238 LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
239 throw new RuntimeException(e);
240 }
241 } else {
242 discardExpiredMessage(node);
243 }
244 //message expired
245 return true;
246 }
247
248 /**
249 * add message to await dispatch
250 *
251 * @param node
252 */
253 @Override
254 public synchronized void addMessageFirst(MessageReference node) {
255 if (!node.isExpired()) {
256 try {
257 regionDestination = node.getMessage().getRegionDestination();
258 if (isDiskListEmpty()) {
259 if (hasSpace()) {
260 memoryList.addMessageFirst(node);
261 node.incrementReferenceCount();
262 setCacheEnabled(true);
263 return;
264 }
265 }
266 if (!hasSpace()) {
267 if (isDiskListEmpty()) {
268 expireOldMessages();
269 if (hasSpace()) {
270 memoryList.addMessageFirst(node);
271 node.incrementReferenceCount();
272 return;
273 } else {
274 flushToDisk();
275 }
276 }
277 }
278 systemUsage.getTempUsage().waitForSpace();
279 node.decrementReferenceCount();
280 ByteSequence bs = getByteSequence(node.getMessage());
281 getDiskList().addFirst(node.getMessageId().toString(), bs);
282
283 } catch (Exception e) {
284 LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
285 throw new RuntimeException(e);
286 }
287 } else {
288 discardExpiredMessage(node);
289 }
290 }
291
292 /**
293 * @return true if there pending messages to dispatch
294 */
295 @Override
296 public synchronized boolean hasNext() {
297 return iter.hasNext();
298 }
299
300 /**
301 * @return the next pending message
302 */
303 @Override
304 public synchronized MessageReference next() {
305 MessageReference reference = iter.next();
306 last = reference;
307 if (!isDiskListEmpty()) {
308 // got from disk
309 reference.getMessage().setRegionDestination(regionDestination);
310 reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
311 }
312 reference.incrementReferenceCount();
313 return reference;
314 }
315
316 /**
317 * remove the message at the cursor position
318 */
319 @Override
320 public synchronized void remove() {
321 iter.remove();
322 if (last != null) {
323 last.decrementReferenceCount();
324 }
325 }
326
327 /**
328 * @param node
329 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
330 */
331 @Override
332 public synchronized void remove(MessageReference node) {
333 if (memoryList.remove(node) != null) {
334 node.decrementReferenceCount();
335 }
336 if (!isDiskListEmpty()) {
337 try {
338 getDiskList().remove(node.getMessageId().toString());
339 } catch (IOException e) {
340 throw new RuntimeException(e);
341 }
342 }
343 }
344
345 /**
346 * @return the number of pending messages
347 */
348 @Override
349 public synchronized int size() {
350 return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
351 }
352
353 /**
354 * clear all pending messages
355 */
356 @Override
357 public synchronized void clear() {
358 memoryList.clear();
359 if (!isDiskListEmpty()) {
360 try {
361 getDiskList().destroy();
362 } catch (IOException e) {
363 throw new RuntimeException(e);
364 }
365 }
366 last = null;
367 }
368
369 @Override
370 public synchronized boolean isFull() {
371
372 return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
373
374 }
375
376 @Override
377 public boolean hasMessagesBufferedToDeliver() {
378 return !isEmpty();
379 }
380
381 @Override
382 public void setSystemUsage(SystemUsage usageManager) {
383 super.setSystemUsage(usageManager);
384 }
385
386 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
387 if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
388 synchronized (this) {
389 if (!flushRequired && size() != 0) {
390 flushRequired =true;
391 if (!iterating) {
392 expireOldMessages();
393 if (!hasSpace()) {
394 flushToDisk();
395 flushRequired = false;
396 }
397 }
398 }
399 }
400 }
401 }
402
403 @Override
404 public boolean isTransient() {
405 return true;
406 }
407
408 protected boolean isSpaceInMemoryList() {
409 return hasSpace() && isDiskListEmpty();
410 }
411
412 protected synchronized void expireOldMessages() {
413 if (!memoryList.isEmpty()) {
414 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
415 MessageReference node = iterator.next();
416 if (node.isExpired()) {
417 node.decrementReferenceCount();
418 discardExpiredMessage(node);
419 iterator.remove();
420 }
421 }
422 }
423 }
424
425 protected synchronized void flushToDisk() {
426 if (!memoryList.isEmpty() && store != null) {
427 long start = 0;
428 if (LOG.isTraceEnabled()) {
429 start = System.currentTimeMillis();
430 LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size() + " " + (systemUsage != null ? systemUsage.getMemoryUsage() : "") );
431 }
432 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
433 MessageReference node = iterator.next();
434 node.decrementReferenceCount();
435 ByteSequence bs;
436 try {
437 bs = getByteSequence(node.getMessage());
438 getDiskList().addLast(node.getMessageId().toString(), bs);
439 } catch (IOException e) {
440 LOG.error("Failed to write to disk list", e);
441 throw new RuntimeException(e);
442 }
443
444 }
445 memoryList.clear();
446 setCacheEnabled(false);
447 if (LOG.isTraceEnabled()) {
448 LOG.trace("" + name + ", flushToDisk() done - " + (System.currentTimeMillis() - start) + "ms " + (systemUsage != null ? systemUsage.getMemoryUsage() : ""));
449 }
450 }
451 }
452
453 protected boolean isDiskListEmpty() {
454 return diskList == null || diskList.isEmpty();
455 }
456
457 protected PList getDiskList() {
458 if (diskList == null) {
459 try {
460 diskList = store.getPList(name);
461 } catch (Exception e) {
462 LOG.error("Caught an IO Exception getting the DiskList " + name, e);
463 throw new RuntimeException(e);
464 }
465 }
466 return diskList;
467 }
468
469 private void discardExpiredMessage(MessageReference reference) {
470 if (LOG.isDebugEnabled()) {
471 LOG.debug("Discarding expired message " + reference);
472 }
473 if (broker.isExpired(reference)) {
474 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
475 context.setBroker(broker);
476 reference.getRegionDestination().messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
477 }
478 }
479
480 protected ByteSequence getByteSequence(Message message) throws IOException {
481 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
482 return new ByteSequence(packet.data, packet.offset, packet.length);
483 }
484
485 protected Message getMessage(ByteSequence bs) throws IOException {
486 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
487 .getOffset(), bs.getLength());
488 return (Message) this.wireFormat.unmarshal(packet);
489
490 }
491
492 final class DiskIterator implements Iterator<MessageReference> {
493 private final PList.PListIterator iterator;
494 DiskIterator() {
495 try {
496 iterator = getDiskList().iterator();
497 } catch (Exception e) {
498 throw new RuntimeException(e);
499 }
500 }
501
502 public boolean hasNext() {
503 return iterator.hasNext();
504 }
505
506 public MessageReference next() {
507 try {
508 PListEntry entry = iterator.next();
509 return getMessage(entry.getByteSequence());
510 } catch (IOException e) {
511 LOG.error("I/O error", e);
512 throw new RuntimeException(e);
513 }
514 }
515
516 public void remove() {
517 iterator.remove();
518 }
519
520 public void release() {
521 iterator.release();
522 }
523 }
524 }