001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.memory;
018
019 import java.io.IOException;
020 import java.util.Collections;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.Map;
024 import java.util.Map.Entry;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.command.ActiveMQDestination;
027 import org.apache.activemq.command.Message;
028 import org.apache.activemq.command.MessageAck;
029 import org.apache.activemq.command.MessageId;
030 import org.apache.activemq.command.SubscriptionInfo;
031 import org.apache.activemq.store.MessageRecoveryListener;
032 import org.apache.activemq.store.TopicMessageStore;
033 import org.apache.activemq.util.LRUCache;
034 import org.apache.activemq.util.SubscriptionKey;
035
036 /**
037 *
038 */
039 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
040
041 private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
042 private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
043
044 public MemoryTopicMessageStore(ActiveMQDestination destination) {
045 this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap());
046 }
047
048 public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
049 super(destination, messageTable);
050 this.subscriberDatabase = subscriberDatabase;
051 this.topicSubMap = makeSubMap();
052 }
053
054 protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
055 return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
056 }
057
058 protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
059 return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
060 }
061
062 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
063 super.addMessage(context, message);
064 for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
065 MemoryTopicSub sub = i.next();
066 sub.addMessage(message.getMessageId(), message);
067 }
068 }
069
070 public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
071 MessageId messageId, MessageAck ack) throws IOException {
072 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
073 MemoryTopicSub sub = topicSubMap.get(key);
074 if (sub != null) {
075 sub.removeMessage(messageId);
076 }
077 }
078
079 public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
080 return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
081 }
082
083 public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
084 SubscriptionKey key = new SubscriptionKey(info);
085 MemoryTopicSub sub = new MemoryTopicSub();
086 topicSubMap.put(key, sub);
087 if (retroactive) {
088 for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
089 Map.Entry entry = (Entry)i.next();
090 sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
091 }
092 }
093 subscriberDatabase.put(key, info);
094 }
095
096 public synchronized void deleteSubscription(String clientId, String subscriptionName) {
097 org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
098 subscriberDatabase.remove(key);
099 topicSubMap.remove(key);
100 }
101
102 public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
103 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
104 if (sub != null) {
105 sub.recoverSubscription(listener);
106 }
107 }
108
109 public synchronized void delete() {
110 super.delete();
111 subscriberDatabase.clear();
112 topicSubMap.clear();
113 }
114
115 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
116 return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
117 }
118
119 public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
120 int result = 0;
121 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
122 if (sub != null) {
123 result = sub.size();
124 }
125 return result;
126 }
127
128 public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
129 MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
130 if (sub != null) {
131 sub.recoverNextMessages(maxReturned, listener);
132 }
133 }
134
135 public void resetBatching(String clientId, String subscriptionName) {
136 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
137 if (sub != null) {
138 sub.resetBatching();
139 }
140 }
141 }