001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.policy;
018
019 import java.util.ArrayList;
020 import java.util.Iterator;
021 import java.util.List;
022
023 import org.apache.activemq.broker.region.MessageReference;
024 import org.apache.activemq.broker.region.Subscription;
025 import org.apache.activemq.command.ConsumerId;
026 import org.apache.activemq.command.ConsumerInfo;
027 import org.apache.activemq.filter.MessageEvaluationContext;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 /**
032 * dispatch policy that ignores lower priority duplicate network consumers,
033 * used in conjunction with network bridge suppresDuplicateTopicSubscriptions
034 *
035 * @org.apache.xbean.XBean
036 */
037 public class PriorityNetworkDispatchPolicy extends SimpleDispatchPolicy {
038
039 private static final Logger LOG = LoggerFactory.getLogger(PriorityNetworkDispatchPolicy.class);
040 @Override
041 public boolean dispatch(MessageReference node,
042 MessageEvaluationContext msgContext,
043 List<Subscription> consumers) throws Exception {
044
045 List<Subscription> duplicateFreeSubs = new ArrayList<Subscription>();
046 synchronized (consumers) {
047 for (Subscription sub: consumers) {
048 ConsumerInfo info = sub.getConsumerInfo();
049 if (info.isNetworkSubscription()) {
050 boolean highestPrioritySub = true;
051 for (Iterator<Subscription> it = duplicateFreeSubs.iterator(); it.hasNext(); ) {
052 Subscription candidate = it.next();
053 if (matches(candidate, info)) {
054 if (hasLowerPriority(candidate, info)) {
055 it.remove();
056 } else {
057 // higher priority matching sub exists
058 highestPrioritySub = false;
059 if (LOG.isDebugEnabled()) {
060 LOG.debug("ignoring lower priority: " + candidate
061 + "[" +candidate.getConsumerInfo().getNetworkConsumerIds() +", "
062 + candidate.getConsumerInfo().getNetworkConsumerIds() +"] in favour of: "
063 + sub
064 + "[" +sub.getConsumerInfo().getNetworkConsumerIds() +", "
065 + sub.getConsumerInfo().getNetworkConsumerIds() +"]");
066 }
067 }
068 }
069 }
070 if (highestPrioritySub) {
071 duplicateFreeSubs.add(sub);
072 }
073 } else {
074 duplicateFreeSubs.add(sub);
075 }
076 }
077 }
078
079 return super.dispatch(node, msgContext, duplicateFreeSubs);
080 }
081
082 private boolean hasLowerPriority(Subscription candidate,
083 ConsumerInfo info) {
084 return candidate.getConsumerInfo().getPriority() < info.getPriority();
085 }
086
087 private boolean matches(Subscription candidate, ConsumerInfo info) {
088 boolean matched = false;
089 for (ConsumerId candidateId: candidate.getConsumerInfo().getNetworkConsumerIds()) {
090 for (ConsumerId subId: info.getNetworkConsumerIds()) {
091 if (candidateId.equals(subId)) {
092 matched = true;
093 break;
094 }
095 }
096 }
097 return matched;
098 }
099
100 }