001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.memory.buffer;
018
019 import java.util.ArrayList;
020 import java.util.Iterator;
021 import java.util.List;
022
023 /**
024 * A {@link MessageBuffer} which evicts from the largest buffers first.
025 *
026 *
027 */
028 public class SizeBasedMessageBuffer implements MessageBuffer {
029
030 private int limit = 100 * 64 * 1024;
031 private List<MessageQueue> bubbleList = new ArrayList<MessageQueue>();
032 private int size;
033 private Object lock = new Object();
034
035 public SizeBasedMessageBuffer() {
036 }
037
038 public SizeBasedMessageBuffer(int limit) {
039 this.limit = limit;
040 }
041
042 public int getSize() {
043 synchronized (lock) {
044 return size;
045 }
046 }
047
048 /**
049 * Creates a new message queue instance
050 */
051 public MessageQueue createMessageQueue() {
052 MessageQueue queue = new MessageQueue(this);
053 synchronized (lock) {
054 queue.setPosition(bubbleList.size());
055 bubbleList.add(queue);
056 }
057 return queue;
058 }
059
060 /**
061 * After a message queue has changed we may need to perform some evictions
062 *
063 * @param delta
064 * @param queueSize
065 */
066 public void onSizeChanged(MessageQueue queue, int delta, int queueSize) {
067 synchronized (lock) {
068 bubbleUp(queue, queueSize);
069
070 size += delta;
071 while (size > limit) {
072 MessageQueue biggest = bubbleList.get(0);
073 size -= biggest.evictMessage();
074
075 bubbleDown(biggest, 0);
076 }
077 }
078 }
079
080 public void clear() {
081 synchronized (lock) {
082 for (Iterator<MessageQueue> iter = bubbleList.iterator(); iter.hasNext();) {
083 MessageQueue queue = iter.next();
084 queue.clear();
085 }
086 size = 0;
087 }
088 }
089
090 protected void bubbleUp(MessageQueue queue, int queueSize) {
091 // lets bubble up to head of queueif we need to
092 int position = queue.getPosition();
093 while (--position >= 0) {
094 MessageQueue pivot = bubbleList.get(position);
095 if (pivot.getSize() < queueSize) {
096 swap(position, pivot, position + 1, queue);
097 } else {
098 break;
099 }
100 }
101 }
102
103 protected void bubbleDown(MessageQueue biggest, int position) {
104 int queueSize = biggest.getSize();
105 int end = bubbleList.size();
106 for (int second = position + 1; second < end; second++) {
107 MessageQueue pivot = bubbleList.get(second);
108 if (pivot.getSize() > queueSize) {
109 swap(position, biggest, second, pivot);
110 } else {
111 break;
112 }
113 position = second;
114 }
115 }
116
117 protected void swap(int firstPosition, MessageQueue first, int secondPosition, MessageQueue second) {
118 bubbleList.set(firstPosition, second);
119 bubbleList.set(secondPosition, first);
120 first.setPosition(secondPosition);
121 second.setPosition(firstPosition);
122 }
123 }