001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.util.*;
020 import java.util.concurrent.ConcurrentHashMap;
021 import javax.jms.InvalidDestinationException;
022 import javax.jms.JMSException;
023 import org.apache.activemq.advisory.AdvisorySupport;
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.broker.region.policy.PolicyEntry;
026 import org.apache.activemq.command.ActiveMQDestination;
027 import org.apache.activemq.command.ConnectionId;
028 import org.apache.activemq.command.ConsumerId;
029 import org.apache.activemq.command.ConsumerInfo;
030 import org.apache.activemq.command.RemoveSubscriptionInfo;
031 import org.apache.activemq.command.SessionId;
032 import org.apache.activemq.command.SubscriptionInfo;
033 import org.apache.activemq.store.TopicMessageStore;
034 import org.apache.activemq.thread.TaskRunnerFactory;
035 import org.apache.activemq.usage.SystemUsage;
036 import org.apache.activemq.util.LongSequenceGenerator;
037 import org.apache.activemq.util.SubscriptionKey;
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040
041 /**
042 *
043 */
044 public class TopicRegion extends AbstractRegion {
045 private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
046 protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
047 private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
048 private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
049 private boolean keepDurableSubsActive;
050
051 private Timer cleanupTimer;
052 private TimerTask cleanupTask;
053
054 public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
055 DestinationFactory destinationFactory) {
056 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
057 if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
058 this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
059 this.cleanupTask = new TimerTask() {
060 public void run() {
061 doCleanup();
062 }
063 };
064 this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
065 }
066 }
067
068 @Override
069 public void stop() throws Exception {
070 super.stop();
071 if (cleanupTimer != null) {
072 cleanupTimer.cancel();
073 }
074 }
075
076 public void doCleanup() {
077 long now = System.currentTimeMillis();
078 for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
079 DurableTopicSubscription sub = entry.getValue();
080 if (!sub.isActive()) {
081 long offline = sub.getOfflineTimestamp();
082 if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
083 LOG.info("Destroying durable subscriber due to inactivity: " + sub);
084 try {
085 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
086 info.setClientId(entry.getKey().getClientId());
087 info.setSubscriptionName(entry.getKey().getSubscriptionName());
088 ConnectionContext context = new ConnectionContext();
089 context.setBroker(broker);
090 context.setClientId(entry.getKey().getClientId());
091 removeSubscription(context, info);
092 } catch (Exception e) {
093 LOG.error("Failed to remove inactive durable subscriber", e);
094 }
095 }
096 }
097 }
098 }
099
100 @Override
101 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
102 if (info.isDurable()) {
103 ActiveMQDestination destination = info.getDestination();
104 if (!destination.isPattern()) {
105 // Make sure the destination is created.
106 lookup(context, destination,true);
107 }
108 String clientId = context.getClientId();
109 String subscriptionName = info.getSubscriptionName();
110 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
111 DurableTopicSubscription sub = durableSubscriptions.get(key);
112 if (sub != null) {
113 if (sub.isActive()) {
114 throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
115 }
116 // Has the selector changed??
117 if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
118 // Remove the consumer first then add it.
119 durableSubscriptions.remove(key);
120 destinationsLock.readLock().lock();
121 try {
122 for (Destination dest : destinations.values()) {
123 //Account for virtual destinations
124 if (dest instanceof Topic){
125 Topic topic = (Topic)dest;
126 topic.deleteSubscription(context, key);
127 }
128 }
129 } finally {
130 destinationsLock.readLock().unlock();
131 }
132 super.removeConsumer(context, sub.getConsumerInfo());
133 super.addConsumer(context, info);
134 sub = durableSubscriptions.get(key);
135 } else {
136 // Change the consumer id key of the durable sub.
137 if (sub.getConsumerInfo().getConsumerId() != null) {
138 subscriptions.remove(sub.getConsumerInfo().getConsumerId());
139 }
140 subscriptions.put(info.getConsumerId(), sub);
141 }
142 } else {
143 super.addConsumer(context, info);
144 sub = durableSubscriptions.get(key);
145 if (sub == null) {
146 throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
147 + " subscriberName: " + key.getSubscriptionName());
148 }
149 }
150 sub.activate(usageManager, context, info);
151 return sub;
152 } else {
153 return super.addConsumer(context, info);
154 }
155 }
156
157 @Override
158 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
159 if (info.isDurable()) {
160
161 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
162 DurableTopicSubscription sub = durableSubscriptions.get(key);
163 if (sub != null) {
164 sub.deactivate(keepDurableSubsActive);
165 }
166
167 } else {
168 super.removeConsumer(context, info);
169 }
170 }
171
172 @Override
173 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
174 SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
175 DurableTopicSubscription sub = durableSubscriptions.get(key);
176 if (sub == null) {
177 throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
178 }
179 if (sub.isActive()) {
180 throw new JMSException("Durable consumer is in use");
181 } else {
182 durableSubscriptions.remove(key);
183 }
184
185 destinationsLock.readLock().lock();
186 try {
187 for (Destination dest : destinations.values()) {
188 //Account for virtual destinations
189 if (dest instanceof Topic){
190 Topic topic = (Topic)dest;
191 topic.deleteSubscription(context, key);
192 }
193 }
194 } finally {
195 destinationsLock.readLock().unlock();
196 }
197
198 if (subscriptions.get(sub.getConsumerInfo().getConsumerId()) != null) {
199 super.removeConsumer(context, sub.getConsumerInfo());
200 } else {
201 // try destroying inactive subscriptions
202 destroySubscription(sub);
203 }
204 }
205
206 @Override
207 public String toString() {
208 return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
209 }
210
211 @Override
212 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
213 List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
214 Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
215
216 TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
217 // Eagerly recover the durable subscriptions
218 if (store != null) {
219 SubscriptionInfo[] infos = store.getAllSubscriptions();
220 for (int i = 0; i < infos.length; i++) {
221
222 SubscriptionInfo info = infos[i];
223 if (LOG.isDebugEnabled()) {
224 LOG.debug("Restoring durable subscription: " + info);
225 }
226 SubscriptionKey key = new SubscriptionKey(info);
227
228 // A single durable sub may be subscribing to multiple topics.
229 // so it might exist already.
230 DurableTopicSubscription sub = durableSubscriptions.get(key);
231 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
232 if (sub == null) {
233 ConnectionContext c = new ConnectionContext();
234 c.setBroker(context.getBroker());
235 c.setClientId(key.getClientId());
236 c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
237 sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
238 }
239
240 if (dupChecker.contains(sub)) {
241 continue;
242 }
243
244 dupChecker.add(sub);
245 rc.add(sub);
246 dest.addSubscription(context, sub);
247 }
248
249 // Now perhaps there other durable subscriptions (via wild card)
250 // that would match this destination..
251 durableSubscriptions.values();
252 for (DurableTopicSubscription sub : durableSubscriptions.values()) {
253 // Skip over subscriptions that we allready added..
254 if (dupChecker.contains(sub)) {
255 continue;
256 }
257
258 if (sub.matches(dest.getActiveMQDestination())) {
259 rc.add(sub);
260 dest.addSubscription(context, sub);
261 }
262 }
263 }
264 return rc;
265 }
266
267 public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
268 ConsumerInfo rc = new ConsumerInfo();
269 rc.setSelector(info.getSelector());
270 rc.setSubscriptionName(info.getSubscriptionName());
271 rc.setDestination(info.getSubscribedDestination());
272 rc.setConsumerId(createConsumerId());
273 return rc;
274 }
275
276 private ConsumerId createConsumerId() {
277 return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
278 }
279
280 protected void configureTopic(Topic topic, ActiveMQDestination destination) {
281 if (broker.getDestinationPolicy() != null) {
282 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
283 if (entry != null) {
284 entry.configure(broker,topic);
285 }
286 }
287 }
288
289 @Override
290 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
291 ActiveMQDestination destination = info.getDestination();
292
293 if (info.isDurable()) {
294 if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
295 throw new JMSException("Cannot create a durable subscription for an advisory Topic");
296 }
297 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
298 DurableTopicSubscription sub = durableSubscriptions.get(key);
299
300 if (sub == null) {
301
302 sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
303
304 if (destination != null && broker.getDestinationPolicy() != null) {
305 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
306 if (entry != null) {
307 entry.configure(broker, usageManager, sub);
308 }
309 }
310 durableSubscriptions.put(key, sub);
311 } else {
312 throw new JMSException("That durable subscription is already active.");
313 }
314 return sub;
315 }
316 try {
317 TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
318 // lets configure the subscription depending on the destination
319 if (destination != null && broker.getDestinationPolicy() != null) {
320 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
321 if (entry != null) {
322 entry.configure(broker, usageManager, answer);
323 }
324 }
325 answer.init();
326 return answer;
327 } catch (Exception e) {
328 LOG.error("Failed to create TopicSubscription ", e);
329 JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
330 jmsEx.setLinkedException(e);
331 throw jmsEx;
332 }
333 }
334
335 /**
336 */
337 private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
338 if (info1.getSelector() != null ^ info2.getSelector() != null) {
339 return true;
340 }
341 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
342 return true;
343 }
344 return !info1.getDestination().equals(info2.getDestination());
345 }
346
347 @Override
348 protected Set<ActiveMQDestination> getInactiveDestinations() {
349 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
350 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
351 ActiveMQDestination dest = iter.next();
352 if (!dest.isTopic()) {
353 iter.remove();
354 }
355 }
356 return inactiveDestinations;
357 }
358
359 public boolean isKeepDurableSubsActive() {
360 return keepDurableSubsActive;
361 }
362
363 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
364 this.keepDurableSubsActive = keepDurableSubsActive;
365 }
366
367 public boolean durableSubscriptionExists(SubscriptionKey key) {
368 return this.durableSubscriptions.containsKey(key);
369 }
370
371 }