001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.concurrent.ConcurrentHashMap;
022 import java.util.concurrent.atomic.AtomicBoolean;
023 import java.util.concurrent.atomic.AtomicLong;
024
025 import javax.jms.InvalidSelectorException;
026 import javax.jms.JMSException;
027
028 import org.apache.activemq.broker.Broker;
029 import org.apache.activemq.broker.ConnectionContext;
030 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
032 import org.apache.activemq.command.ActiveMQDestination;
033 import org.apache.activemq.command.ConsumerInfo;
034 import org.apache.activemq.command.Message;
035 import org.apache.activemq.command.MessageAck;
036 import org.apache.activemq.command.MessageDispatch;
037 import org.apache.activemq.command.MessageId;
038 import org.apache.activemq.store.TopicMessageStore;
039 import org.apache.activemq.usage.SystemUsage;
040 import org.apache.activemq.usage.Usage;
041 import org.apache.activemq.usage.UsageListener;
042 import org.apache.activemq.util.SubscriptionKey;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046 public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
047
048 private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
049 private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
050 private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
051 private final SubscriptionKey subscriptionKey;
052 private final boolean keepDurableSubsActive;
053 private AtomicBoolean active = new AtomicBoolean();
054 private AtomicLong offlineTimestamp = new AtomicLong(-1);
055
056 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
057 throws JMSException {
058 super(broker,usageManager, context, info);
059 this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
060 this.pending.setSystemUsage(usageManager);
061 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
062 this.keepDurableSubsActive = keepDurableSubsActive;
063 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
064
065 }
066
067 public final boolean isActive() {
068 return active.get();
069 }
070
071 public final long getOfflineTimestamp() {
072 return offlineTimestamp.get();
073 }
074
075 public boolean isFull() {
076 return !active.get() || super.isFull();
077 }
078
079 public void gc() {
080 }
081
082 /**
083 * store will have a pending ack for all durables, irrespective of the selector
084 * so we need to ack if node is un-matched
085 */
086 public void unmatched(MessageReference node) throws IOException {
087 MessageAck ack = new MessageAck();
088 ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
089 ack.setMessageID(node.getMessageId());
090 node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
091 }
092
093 @Override
094 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
095 // statically configured via maxPageSize
096 }
097
098 public void add(ConnectionContext context, Destination destination) throws Exception {
099 if (!destinations.contains(destination)) {
100 super.add(context, destination);
101 }
102 // do it just once per destination
103 if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
104 return;
105 }
106 durableDestinations.put(destination.getActiveMQDestination(), destination);
107
108 if (active.get() || keepDurableSubsActive) {
109 Topic topic = (Topic)destination;
110 topic.activate(context, this);
111 if (pending.isEmpty(topic)) {
112 topic.recoverRetroactiveMessages(context, this);
113 }
114 this.enqueueCounter+=pending.size();
115 } else if (destination.getMessageStore() != null) {
116 TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
117 try {
118 this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
119 } catch (IOException e) {
120 JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store "+ e);
121 jmsEx.setLinkedException(e);
122 throw jmsEx;
123 }
124 }
125 dispatchPending();
126 }
127
128 public void activate(SystemUsage memoryManager, ConnectionContext context,
129 ConsumerInfo info) throws Exception {
130 if (!active.get()) {
131 this.context = context;
132 this.info = info;
133 LOG.debug("Activating " + this);
134 if (!keepDurableSubsActive) {
135 for (Iterator<Destination> iter = durableDestinations.values()
136 .iterator(); iter.hasNext();) {
137 Topic topic = (Topic) iter.next();
138 add(context, topic);
139 topic.activate(context, this);
140 }
141 }
142 synchronized (pendingLock) {
143 pending.setSystemUsage(memoryManager);
144 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
145 pending.setMaxAuditDepth(getMaxAuditDepth());
146 pending.setMaxProducersToAudit(getMaxProducersToAudit());
147 pending.start();
148 // If nothing was in the persistent store, then try to use the
149 // recovery policy.
150 if (pending.isEmpty()) {
151 for (Iterator<Destination> iter = durableDestinations.values()
152 .iterator(); iter.hasNext();) {
153 Topic topic = (Topic) iter.next();
154 topic.recoverRetroactiveMessages(context, this);
155 }
156 }
157 }
158 this.active.set(true);
159 this.offlineTimestamp.set(-1);
160 dispatchPending();
161 this.usageManager.getMemoryUsage().addUsageListener(this);
162 }
163 }
164
165 public void deactivate(boolean keepDurableSubsActive) throws Exception {
166 LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this);
167 active.set(false);
168 offlineTimestamp.set(System.currentTimeMillis());
169 this.usageManager.getMemoryUsage().removeUsageListener(this);
170 synchronized (pendingLock) {
171 pending.stop();
172
173 synchronized (dispatchLock) {
174 for (Iterator<Destination> iter = durableDestinations.values().iterator(); iter.hasNext();) {
175 Topic topic = (Topic)iter.next();
176 if (!keepDurableSubsActive) {
177 topic.deactivate(context, this);
178 } else {
179 topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
180 }
181 }
182
183 for (final MessageReference node : dispatched) {
184 // Mark the dispatched messages as redelivered for next time.
185 Integer count = redeliveredMessages.get(node.getMessageId());
186 if (count != null) {
187 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
188 } else {
189 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
190 }
191 if (keepDurableSubsActive && pending.isTransient()) {
192 pending.addMessageFirst(node);
193 pending.rollback(node.getMessageId());
194 } else {
195 node.decrementReferenceCount();
196 }
197 }
198 dispatched.clear();
199 }
200 if (!keepDurableSubsActive && pending.isTransient()) {
201 try {
202 pending.reset();
203 while (pending.hasNext()) {
204 MessageReference node = pending.next();
205 node.decrementReferenceCount();
206 pending.remove();
207 }
208 } finally {
209 pending.release();
210 }
211 }
212 }
213 prefetchExtension.set(0);
214 }
215
216
217 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
218 MessageDispatch md = super.createMessageDispatch(node, message);
219 if (node != QueueMessageReference.NULL_MESSAGE) {
220 Integer count = redeliveredMessages.get(node.getMessageId());
221 if (count != null) {
222 md.setRedeliveryCounter(count.intValue());
223 }
224 }
225 return md;
226 }
227
228 public void add(MessageReference node) throws Exception {
229 if (!active.get() && !keepDurableSubsActive) {
230 return;
231 }
232 super.add(node);
233 }
234
235 protected void dispatchPending() throws IOException {
236 if (isActive()) {
237 super.dispatchPending();
238 }
239 }
240
241 public void removePending(MessageReference node) throws IOException {
242 pending.remove(node);
243 }
244
245 protected void doAddRecoveredMessage(MessageReference message) throws Exception {
246 synchronized(pending) {
247 pending.addRecoveredMessage(message);
248 }
249 }
250
251 public int getPendingQueueSize() {
252 if (active.get() || keepDurableSubsActive) {
253 return super.getPendingQueueSize();
254 }
255 // TODO: need to get from store
256 return 0;
257 }
258
259 public void setSelector(String selector) throws InvalidSelectorException {
260 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
261 }
262
263 protected boolean canDispatch(MessageReference node) {
264 return isActive();
265 }
266
267 protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
268 node.getRegionDestination().acknowledge(context, this, ack, node);
269 redeliveredMessages.remove(node.getMessageId());
270 node.decrementReferenceCount();
271 }
272
273 public synchronized String toString() {
274 return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + enqueueCounter + ", pending="
275 + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
276 }
277
278 public SubscriptionKey getSubscriptionKey() {
279 return subscriptionKey;
280 }
281
282 /**
283 * Release any references that we are holding.
284 */
285 public void destroy() {
286 synchronized (pendingLock) {
287 try {
288
289 pending.reset();
290 while (pending.hasNext()) {
291 MessageReference node = pending.next();
292 node.decrementReferenceCount();
293 }
294
295 } finally {
296 pending.release();
297 pending.clear();
298 }
299 }
300 synchronized (dispatchLock) {
301 for (MessageReference node : dispatched) {
302 node.decrementReferenceCount();
303 }
304 dispatched.clear();
305 }
306 setSlowConsumer(false);
307 }
308
309 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
310 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
311 try {
312 dispatchPending();
313 } catch (IOException e) {
314 LOG.warn("problem calling dispatchMatched", e);
315 }
316 }
317 }
318
319 protected boolean isDropped(MessageReference node) {
320 return false;
321 }
322
323 public boolean isKeepDurableSubsActive() {
324 return keepDurableSubsActive;
325 }
326 }