001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.memory.list;
018
019 import java.util.ArrayList;
020 import java.util.HashMap;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.Set;
025 import org.apache.activemq.broker.region.MessageReference;
026 import org.apache.activemq.broker.region.Subscription;
027 import org.apache.activemq.command.ActiveMQDestination;
028 import org.apache.activemq.command.ActiveMQMessage;
029 import org.apache.activemq.command.Message;
030 import org.apache.activemq.filter.DestinationMap;
031 import org.apache.activemq.memory.buffer.MessageBuffer;
032 import org.apache.activemq.memory.buffer.MessageQueue;
033 import org.apache.activemq.memory.buffer.OrderBasedMessageBuffer;
034
035 /**
036 * An implementation of {@link MessageList} which maintains a separate message
037 * list for each destination to reduce contention on the list and to speed up
038 * recovery times by only recovering the interested topics.
039 *
040 *
041 */
042 public class DestinationBasedMessageList implements MessageList {
043
044 private MessageBuffer messageBuffer;
045 private Map<ActiveMQDestination, MessageQueue> queueIndex = new HashMap<ActiveMQDestination, MessageQueue>();
046 private DestinationMap subscriptionIndex = new DestinationMap();
047 private Object lock = new Object();
048
049 public DestinationBasedMessageList(int maximumSize) {
050 this(new OrderBasedMessageBuffer(maximumSize));
051 }
052
053 public DestinationBasedMessageList(MessageBuffer buffer) {
054 messageBuffer = buffer;
055 }
056
057 public void add(MessageReference node) {
058 ActiveMQMessage message = (ActiveMQMessage) node.getMessageHardRef();
059 ActiveMQDestination destination = message.getDestination();
060 MessageQueue queue = null;
061 synchronized (lock) {
062 queue = queueIndex.get(destination);
063 if (queue == null) {
064 queue = messageBuffer.createMessageQueue();
065 queueIndex.put(destination, queue);
066 subscriptionIndex.put(destination, queue);
067 }
068 }
069 queue.add(node);
070 }
071
072 public List<MessageReference> getMessages(Subscription sub) {
073 return getMessages(sub.getConsumerInfo().getDestination());
074 }
075
076 public List<MessageReference> getMessages(ActiveMQDestination destination) {
077 Set set = null;
078 synchronized (lock) {
079 set = subscriptionIndex.get(destination);
080 }
081 List<MessageReference> answer = new ArrayList<MessageReference>();
082 for (Iterator iter = set.iterator(); iter.hasNext();) {
083 MessageQueue queue = (MessageQueue) iter.next();
084 queue.appendMessages(answer);
085 }
086 return answer;
087 }
088
089 public Message[] browse(ActiveMQDestination destination) {
090 List<MessageReference> result = getMessages(destination);
091 return result.toArray(new Message[result.size()]);
092 }
093
094
095 public void clear() {
096 messageBuffer.clear();
097 }
098 }