001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.Map;
022 import java.util.concurrent.ConcurrentHashMap;
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.command.ActiveMQDestination;
025 import org.apache.activemq.command.Message;
026 import org.apache.activemq.command.MessageAck;
027 import org.apache.activemq.command.MessageId;
028 import org.apache.activemq.command.SubscriptionInfo;
029 import org.apache.activemq.kaha.ListContainer;
030 import org.apache.activemq.kaha.MapContainer;
031 import org.apache.activemq.kaha.Marshaller;
032 import org.apache.activemq.kaha.Store;
033 import org.apache.activemq.kaha.StoreEntry;
034 import org.apache.activemq.store.MessageRecoveryListener;
035 import org.apache.activemq.store.TopicMessageStore;
036
037 /**
038 *
039 */
040 public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore {
041
042 protected ListContainer<TopicSubAck> ackContainer;
043 protected Map<Object, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<Object, TopicSubContainer>();
044 private Map<String, SubscriptionInfo> subscriberContainer;
045 private Store store;
046
047 public KahaTopicMessageStore(Store store, MapContainer<MessageId, Message> messageContainer,
048 ListContainer<TopicSubAck> ackContainer, MapContainer<String, SubscriptionInfo> subsContainer,
049 ActiveMQDestination destination) throws IOException {
050 super(messageContainer, destination);
051 this.store = store;
052 this.ackContainer = ackContainer;
053 subscriberContainer = subsContainer;
054 // load all the Ack containers
055 for (Iterator<String> i = subscriberContainer.keySet().iterator(); i.hasNext();) {
056 Object key = i.next();
057 addSubscriberMessageContainer(key);
058 }
059 }
060
061 @Override
062 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
063 int subscriberCount = subscriberMessages.size();
064 if (subscriberCount > 0) {
065 MessageId id = message.getMessageId();
066 StoreEntry messageEntry = messageContainer.place(id, message);
067 TopicSubAck tsa = new TopicSubAck();
068 tsa.setCount(subscriberCount);
069 tsa.setMessageEntry(messageEntry);
070 StoreEntry ackEntry = ackContainer.placeLast(tsa);
071 for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
072 TopicSubContainer container = i.next();
073 ConsumerMessageRef ref = new ConsumerMessageRef();
074 ref.setAckEntry(ackEntry);
075 ref.setMessageEntry(messageEntry);
076 ref.setMessageId(id);
077 container.add(ref);
078 }
079 }
080 }
081
082 public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
083 MessageId messageId, MessageAck ack) throws IOException {
084 String subcriberId = getSubscriptionKey(clientId, subscriptionName);
085 TopicSubContainer container = subscriberMessages.get(subcriberId);
086 if (container != null) {
087 ConsumerMessageRef ref = container.remove(messageId);
088 if (container.isEmpty()) {
089 container.reset();
090 }
091 if (ref != null) {
092 TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
093 if (tsa != null) {
094 if (tsa.decrementCount() <= 0) {
095 StoreEntry entry = ref.getAckEntry();
096 entry = ackContainer.refresh(entry);
097 ackContainer.remove(entry);
098 entry = tsa.getMessageEntry();
099 entry = messageContainer.refresh(entry);
100 messageContainer.remove(entry);
101 } else {
102 ackContainer.update(ref.getAckEntry(), tsa);
103 }
104 }
105 }
106 }
107 }
108
109 public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
110 return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
111 }
112
113 public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
114 String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
115 // if already exists - won't add it again as it causes data files
116 // to hang around
117 if (!subscriberContainer.containsKey(key)) {
118 subscriberContainer.put(key, info);
119 }
120 // add the subscriber
121 addSubscriberMessageContainer(key);
122 /*
123 * if(retroactive){ for(StoreEntry
124 * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
125 * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
126 * ConsumerMessageRef ref=new ConsumerMessageRef();
127 * ref.setAckEntry(entry); ref.setMessageEntry(tsa.getMessageEntry());
128 * container.add(ref); } }
129 */
130 }
131
132 public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException {
133 String key = getSubscriptionKey(clientId, subscriptionName);
134 removeSubscriberMessageContainer(key);
135 }
136
137 public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
138 throws Exception {
139 String key = getSubscriptionKey(clientId, subscriptionName);
140 TopicSubContainer container = subscriberMessages.get(key);
141 if (container != null) {
142 for (Iterator i = container.iterator(); i.hasNext();) {
143 ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
144 Message msg = messageContainer.get(ref.getMessageEntry());
145 if (msg != null) {
146 if (!recoverMessage(listener, msg)) {
147 break;
148 }
149 }
150 }
151 }
152 }
153
154 public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
155 MessageRecoveryListener listener) throws Exception {
156 String key = getSubscriptionKey(clientId, subscriptionName);
157 TopicSubContainer container = subscriberMessages.get(key);
158 if (container != null) {
159 int count = 0;
160 StoreEntry entry = container.getBatchEntry();
161 if (entry == null) {
162 entry = container.getEntry();
163 } else {
164 entry = container.refreshEntry(entry);
165 if (entry != null) {
166 entry = container.getNextEntry(entry);
167 }
168 }
169 if (entry != null) {
170 do {
171 ConsumerMessageRef consumerRef = container.get(entry);
172 Message msg = messageContainer.getValue(consumerRef.getMessageEntry());
173 if (msg != null) {
174 recoverMessage(listener, msg);
175 count++;
176 container.setBatchEntry(msg.getMessageId().toString(), entry);
177 } else {
178 container.reset();
179 }
180
181 entry = container.getNextEntry(entry);
182 } while (entry != null && count < maxReturned && listener.hasSpace());
183 }
184 }
185 }
186
187 public synchronized void delete() {
188 super.delete();
189 ackContainer.clear();
190 subscriberContainer.clear();
191 }
192
193 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
194 return subscriberContainer.values()
195 .toArray(new SubscriptionInfo[subscriberContainer.size()]);
196 }
197
198 protected String getSubscriptionKey(String clientId, String subscriberName) {
199 String result = clientId + ":";
200 result += subscriberName != null ? subscriberName : "NOT_SET";
201 return result;
202 }
203
204 protected MapContainer addSubscriberMessageContainer(Object key) throws IOException {
205 MapContainer container = store.getMapContainer(key, "topic-subs");
206 container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
207 Marshaller marshaller = new ConsumerMessageRefMarshaller();
208 container.setValueMarshaller(marshaller);
209 TopicSubContainer tsc = new TopicSubContainer(container);
210 subscriberMessages.put(key, tsc);
211 return container;
212 }
213
214 protected synchronized void removeSubscriberMessageContainer(Object key)
215 throws IOException {
216 subscriberContainer.remove(key);
217 TopicSubContainer container = subscriberMessages.remove(key);
218 if (container != null) {
219 for (Iterator i = container.iterator(); i.hasNext();) {
220 ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
221 if (ref != null) {
222 TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
223 if (tsa != null) {
224 if (tsa.decrementCount() <= 0) {
225 ackContainer.remove(ref.getAckEntry());
226 messageContainer.remove(tsa.getMessageEntry());
227 } else {
228 ackContainer.update(ref.getAckEntry(), tsa);
229 }
230 }
231 }
232 }
233 container.clear();
234 }
235 store.deleteListContainer(key, "topic-subs");
236
237 }
238
239 public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
240 String key = getSubscriptionKey(clientId, subscriberName);
241 TopicSubContainer container = subscriberMessages.get(key);
242 return container != null ? container.size() : 0;
243 }
244
245 /**
246 * @param context
247 * @throws IOException
248 * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
249 */
250 public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
251 messageContainer.clear();
252 ackContainer.clear();
253 for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
254 TopicSubContainer container = i.next();
255 container.clear();
256 }
257 }
258
259 public synchronized void resetBatching(String clientId, String subscriptionName) {
260 String key = getSubscriptionKey(clientId, subscriptionName);
261 TopicSubContainer topicSubContainer = subscriberMessages.get(key);
262 if (topicSubContainer != null) {
263 topicSubContainer.reset();
264 }
265 }
266 }