001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.util.Iterator;
020 import java.util.Set;
021
022 import javax.jms.JMSException;
023
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.broker.region.policy.PolicyEntry;
026 import org.apache.activemq.command.ActiveMQDestination;
027 import org.apache.activemq.command.ConsumerInfo;
028 import org.apache.activemq.command.MessageDispatchNotification;
029 import org.apache.activemq.thread.TaskRunnerFactory;
030 import org.apache.activemq.usage.SystemUsage;
031
032 /**
033 *
034 *
035 */
036 public class QueueRegion extends AbstractRegion {
037
038 public QueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics,
039 SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
040 DestinationFactory destinationFactory) {
041 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
042 }
043
044 public String toString() {
045 return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
046 + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
047 }
048
049 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
050 throws JMSException {
051 ActiveMQDestination destination = info.getDestination();
052 PolicyEntry entry = null;
053 if (destination != null && broker.getDestinationPolicy() != null) {
054 entry = broker.getDestinationPolicy().getEntryFor(destination);
055
056 }
057 if (info.isBrowser()) {
058 QueueBrowserSubscription sub = new QueueBrowserSubscription(broker,usageManager, context, info);
059 if (entry != null) {
060 entry.configure(broker, usageManager, sub);
061 }
062 return sub;
063 } else {
064 QueueSubscription sub = new QueueSubscription(broker, usageManager,context, info);
065 if (entry != null) {
066 entry.configure(broker, usageManager, sub);
067 }
068 return sub;
069 }
070 }
071
072 protected Set<ActiveMQDestination> getInactiveDestinations() {
073 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
074 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
075 ActiveMQDestination dest = iter.next();
076 if (!dest.isQueue()) {
077 iter.remove();
078 }
079 }
080 return inactiveDestinations;
081 }
082
083 /*
084 * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till
085 * the notification to ensure that the subscription chosen by the master is used.
086 *
087 * (non-Javadoc)
088 * @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
089 */
090 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
091 processDispatchNotificationViaDestination(messageDispatchNotification);
092 }
093 }