001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.io.IOException;
020 import java.util.HashSet;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.Set;
025 import java.util.Map.Entry;
026 import java.util.concurrent.ConcurrentHashMap;
027
028 import org.apache.activemq.broker.ConnectionContext;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.Message;
031 import org.apache.activemq.command.MessageAck;
032 import org.apache.activemq.command.MessageId;
033 import org.apache.activemq.command.SubscriptionInfo;
034 import org.apache.activemq.kaha.ListContainer;
035 import org.apache.activemq.kaha.MapContainer;
036 import org.apache.activemq.kaha.Marshaller;
037 import org.apache.activemq.kaha.Store;
038 import org.apache.activemq.kaha.StoreEntry;
039 import org.apache.activemq.store.MessageRecoveryListener;
040 import org.apache.activemq.store.TopicReferenceStore;
041 import org.slf4j.Logger;
042 import org.slf4j.LoggerFactory;
043
044 public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
045 private static final Logger LOG = LoggerFactory.getLogger(KahaTopicReferenceStore.class);
046 protected ListContainer<TopicSubAck> ackContainer;
047 protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
048 private MapContainer<String, SubscriptionInfo> subscriberContainer;
049 private Store store;
050 private static final String TOPIC_SUB_NAME = "tsn";
051
052 public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
053 MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer,
054 MapContainer<String, SubscriptionInfo> subsContainer, ActiveMQDestination destination)
055 throws IOException {
056 super(adapter, messageContainer, destination);
057 this.store = store;
058 this.ackContainer = ackContainer;
059 subscriberContainer = subsContainer;
060 // load all the Ack containers
061 for (Iterator<SubscriptionInfo> i = subscriberContainer.values().iterator(); i.hasNext();) {
062 SubscriptionInfo info = i.next();
063 addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
064 }
065 }
066
067 public void dispose(ConnectionContext context) {
068 super.dispose(context);
069 subscriberContainer.delete();
070 }
071
072 protected MessageId getMessageId(Object object) {
073 return new MessageId(((ReferenceRecord)object).getMessageId());
074 }
075
076 public void addMessage(ConnectionContext context, Message message) throws IOException {
077 throw new RuntimeException("Use addMessageReference instead");
078 }
079
080 public Message getMessage(MessageId identity) throws IOException {
081 throw new RuntimeException("Use addMessageReference instead");
082 }
083
084 public boolean addMessageReference(final ConnectionContext context, final MessageId messageId,
085 final ReferenceData data) {
086 boolean uniqueReferenceAdded = false;
087 lock.lock();
088 try {
089 final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
090 final int subscriberCount = subscriberMessages.size();
091 if (subscriberCount > 0 && !isDuplicate(messageId)) {
092 final StoreEntry messageEntry = messageContainer.place(messageId, record);
093 addInterest(record);
094 uniqueReferenceAdded = true;
095 final TopicSubAck tsa = new TopicSubAck();
096 tsa.setCount(subscriberCount);
097 tsa.setMessageEntry(messageEntry);
098 final StoreEntry ackEntry = ackContainer.placeLast(tsa);
099 for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
100 final TopicSubContainer container = i.next();
101 final ConsumerMessageRef ref = new ConsumerMessageRef();
102 ref.setAckEntry(ackEntry);
103 ref.setMessageEntry(messageEntry);
104 ref.setMessageId(messageId);
105 container.add(ref);
106 }
107 if (LOG.isTraceEnabled()) {
108 LOG.trace(destination.getPhysicalName() + " add reference: " + messageId);
109 }
110 } else {
111 if (LOG.isTraceEnabled()) {
112 LOG.trace("no subscribers or duplicate add for: " + messageId);
113 }
114 }
115 } finally {
116 lock.unlock();
117 }
118 return uniqueReferenceAdded;
119 }
120
121 public ReferenceData getMessageReference(final MessageId identity) throws IOException {
122 final ReferenceRecord result = messageContainer.get(identity);
123 if (result == null) {
124 return null;
125 }
126 return result.getData();
127 }
128
129 public void addReferenceFileIdsInUse() {
130 for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) {
131 TopicSubAck subAck = ackContainer.get(entry);
132 if (subAck.getCount() > 0) {
133 ReferenceRecord rr = messageContainer.getValue(subAck.getMessageEntry());
134 addInterest(rr);
135 }
136 }
137 }
138
139
140 protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
141 String containerName = getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName));
142 MapContainer container = store.getMapContainer(containerName,containerName);
143 container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
144 Marshaller marshaller = new ConsumerMessageRefMarshaller();
145 container.setValueMarshaller(marshaller);
146 TopicSubContainer tsc = new TopicSubContainer(container);
147 subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc);
148 return container;
149 }
150
151 public boolean acknowledgeReference(ConnectionContext context,
152 String clientId, String subscriptionName, MessageId messageId)
153 throws IOException {
154 boolean removeMessage = false;
155 lock.lock();
156 try {
157 String key = getSubscriptionKey(clientId, subscriptionName);
158
159 TopicSubContainer container = subscriberMessages.get(key);
160 if (container != null) {
161 ConsumerMessageRef ref = null;
162 if((ref = container.remove(messageId)) != null) {
163 StoreEntry entry = ref.getAckEntry();
164 //ensure we get up to-date pointers
165 entry = ackContainer.refresh(entry);
166 TopicSubAck tsa = ackContainer.get(entry);
167 if (tsa != null) {
168 if (tsa.decrementCount() <= 0) {
169 ackContainer.remove(entry);
170 ReferenceRecord rr = messageContainer.get(messageId);
171 if (rr != null) {
172 entry = tsa.getMessageEntry();
173 entry = messageContainer.refresh(entry);
174 messageContainer.remove(entry);
175 removeInterest(rr);
176 removeMessage = true;
177 dispatchAudit.isDuplicate(messageId);
178 }
179 }else {
180 ackContainer.update(entry,tsa);
181 }
182 }
183 if (LOG.isTraceEnabled()) {
184 LOG.trace(destination.getPhysicalName() + " remove: " + messageId);
185 }
186 }else{
187 if (ackContainer.isEmpty() || subscriberMessages.size() == 1 || isUnreferencedBySubscribers(key, subscriberMessages, messageId)) {
188 // no message reference held
189 removeMessage = true;
190 // ensure we don't later add a reference
191 dispatchAudit.isDuplicate(messageId);
192 if (LOG.isDebugEnabled()) {
193 LOG.debug(destination.getPhysicalName() + " remove with no outstanding reference (ack before add): " + messageId);
194 }
195 }
196 }
197 }
198 }finally {
199 lock.unlock();
200 }
201 return removeMessage;
202 }
203
204 // verify that no subscriber has a reference to this message. In the case where the subscribers
205 // references are persisted but more than the persisted consumers get the message, the ack from the non
206 // persisted consumer would remove the message in error
207 //
208 // see: https://issues.apache.org/activemq/browse/AMQ-2123
209 private boolean isUnreferencedBySubscribers(
210 String key, Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) {
211 boolean isUnreferenced = true;
212 for (Entry<String, TopicSubContainer> entry : subscriberContainers.entrySet()) {
213 if (!key.equals(entry.getKey()) && !entry.getValue().isEmpty()) {
214 TopicSubContainer container = entry.getValue();
215 for (Iterator i = container.iterator(); i.hasNext();) {
216 ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
217 if (messageId.equals(ref.getMessageId())) {
218 isUnreferenced = false;
219 break;
220 }
221 }
222 }
223 }
224 return isUnreferenced;
225 }
226
227 public void acknowledge(ConnectionContext context,
228 String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
229 acknowledgeReference(context, clientId, subscriptionName, messageId);
230 }
231
232 public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
233 String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
234 lock.lock();
235 try {
236 // if already exists - won't add it again as it causes data files
237 // to hang around
238 if (!subscriberContainer.containsKey(key)) {
239 subscriberContainer.put(key, info);
240 adapter.addSubscriberState(info);
241 }
242 // add the subscriber
243 addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
244 if (retroactive) {
245 /*
246 * for(StoreEntry
247 * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
248 * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
249 * ConsumerMessageRef ref=new ConsumerMessageRef();
250 * ref.setAckEntry(entry);
251 * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
252 */
253 }
254 }finally {
255 lock.unlock();
256 }
257 }
258
259 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
260 lock.lock();
261 try {
262 SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
263 if (info != null) {
264 adapter.removeSubscriberState(info);
265 }
266 removeSubscriberMessageContainer(clientId,subscriptionName);
267 }finally {
268 lock.unlock();
269 }
270 }
271
272 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
273 SubscriptionInfo[] result = subscriberContainer.values()
274 .toArray(new SubscriptionInfo[subscriberContainer.size()]);
275 return result;
276 }
277
278 public int getMessageCount(String clientId, String subscriberName) throws IOException {
279 String key = getSubscriptionKey(clientId, subscriberName);
280 TopicSubContainer container = subscriberMessages.get(key);
281 return container != null ? container.size() : 0;
282 }
283
284 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
285 return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
286 }
287
288 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
289 MessageRecoveryListener listener) throws Exception {
290 String key = getSubscriptionKey(clientId, subscriptionName);
291 lock.lock();
292 try {
293 TopicSubContainer container = subscriberMessages.get(key);
294 if (container != null) {
295 int count = 0;
296 StoreEntry entry = container.getBatchEntry();
297 if (entry == null) {
298 entry = container.getEntry();
299 } else {
300 entry = container.refreshEntry(entry);
301 if (entry != null) {
302 entry = container.getNextEntry(entry);
303 }
304 }
305
306 if (entry != null) {
307 do {
308 ConsumerMessageRef consumerRef = container.get(entry);
309 ReferenceRecord msg = messageContainer.getValue(consumerRef
310 .getMessageEntry());
311 if (msg != null) {
312 if (recoverReference(listener, msg)) {
313 count++;
314 container.setBatchEntry(msg.getMessageId(), entry);
315 }
316 } else {
317 container.reset();
318 }
319
320 entry = container.getNextEntry(entry);
321 } while (entry != null && count < maxReturned && listener.hasSpace());
322 }
323 }
324 }finally {
325 lock.unlock();
326 }
327 }
328
329 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
330 throws Exception {
331 String key = getSubscriptionKey(clientId, subscriptionName);
332 TopicSubContainer container = subscriberMessages.get(key);
333 if (container != null) {
334 for (Iterator i = container.iterator(); i.hasNext();) {
335 ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
336 ReferenceRecord msg = messageContainer.getValue(ref.getMessageEntry());
337 if (msg != null) {
338 if (!recoverReference(listener, msg)) {
339 break;
340 }
341 }
342 }
343 }
344 }
345
346 public void resetBatching(String clientId, String subscriptionName) {
347 lock.lock();
348 try {
349 String key = getSubscriptionKey(clientId, subscriptionName);
350 TopicSubContainer topicSubContainer = subscriberMessages.get(key);
351 if (topicSubContainer != null) {
352 topicSubContainer.reset();
353 }
354 }finally {
355 lock.unlock();
356 }
357 }
358
359 public void removeAllMessages(ConnectionContext context) throws IOException {
360 lock.lock();
361 try {
362 Set<String> tmpSet = new HashSet<String>(subscriberContainer.keySet());
363 for (String key:tmpSet) {
364 TopicSubContainer container = subscriberMessages.get(key);
365 if (container != null) {
366 container.clear();
367 }
368 }
369 ackContainer.clear();
370 }finally {
371 lock.unlock();
372 }
373 super.removeAllMessages(context);
374 }
375
376 protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
377 String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
378 String containerName = getSubscriptionContainerName(subscriberKey);
379 subscriberContainer.remove(subscriberKey);
380 TopicSubContainer container = subscriberMessages.remove(subscriberKey);
381 if (container != null) {
382 for (Iterator i = container.iterator(); i.hasNext();) {
383 ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
384 if (ref != null) {
385 TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
386 if (tsa != null) {
387 if (tsa.decrementCount() <= 0) {
388 ackContainer.remove(ref.getAckEntry());
389 messageContainer.remove(tsa.getMessageEntry());
390 } else {
391 ackContainer.update(ref.getAckEntry(), tsa);
392 }
393 }
394 }
395 }
396 }
397 store.deleteMapContainer(containerName,containerName);
398 }
399
400 protected String getSubscriptionKey(String clientId, String subscriberName) {
401 StringBuffer buffer = new StringBuffer();
402 buffer.append(clientId).append(":");
403 String name = subscriberName != null ? subscriberName : "NOT_SET";
404 return buffer.append(name).toString();
405 }
406
407 private String getSubscriptionContainerName(String subscriptionKey) {
408 StringBuffer result = new StringBuffer(TOPIC_SUB_NAME);
409 result.append(destination.getQualifiedName());
410 result.append(subscriptionKey);
411 return result.toString();
412 }
413 }