001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.virtual;
018
019 import java.io.IOException;
020 import java.util.List;
021 import java.util.Set;
022
023 import org.apache.activemq.broker.Broker;
024 import org.apache.activemq.broker.ProducerBrokerExchange;
025 import org.apache.activemq.broker.region.Destination;
026 import org.apache.activemq.broker.region.Subscription;
027 import org.apache.activemq.command.ActiveMQDestination;
028 import org.apache.activemq.command.Message;
029 import org.apache.activemq.filter.BooleanExpression;
030 import org.apache.activemq.filter.MessageEvaluationContext;
031 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
032 import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
033 import org.apache.activemq.selector.SelectorParser;
034 import org.apache.activemq.util.LRUCache;
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
039 private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
040 LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
041 private SubQueueSelectorCacheBroker selectorCachePlugin;
042
043 public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
044 super(next, prefix, postfix, local);
045 }
046
047 /**
048 * Respect the selectors of the subscriptions to ensure only matched messages are dispatched to
049 * the virtual queues, hence there is no build up of unmatched messages on these destinations
050 */
051 @Override
052 protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
053 Broker broker = context.getConnectionContext().getBroker();
054 Set<Destination> destinations = broker.getDestinations(destination);
055
056 for (Destination dest : destinations) {
057 if (matchesSomeConsumer(broker, message, dest)) {
058 dest.send(context, message.copy());
059 }
060 }
061 }
062
063 private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException {
064 boolean matches = false;
065 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
066 msgContext.setDestination(dest.getActiveMQDestination());
067 msgContext.setMessageReference(message);
068 List<Subscription> subs = dest.getConsumers();
069 for (Subscription sub : subs) {
070 if (sub.matches(message, msgContext)) {
071 matches = true;
072 break;
073
074 }
075 }
076 if (matches == false && subs.size() == 0) {
077 matches = tryMatchingCachedSubs(broker, dest, msgContext);
078 }
079 return matches;
080 }
081
082 private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) {
083 boolean matches = false;
084 LOG.debug("No active consumer match found. Will try cache if configured...");
085
086 //retrieve the specific plugin class and lookup the selector for the destination.
087 final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker);
088
089 if (cache != null) {
090 final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
091 if (selector != null) {
092 try {
093 final BooleanExpression expression = getExpression(selector);
094 matches = expression.matches(msgContext);
095 } catch (Exception e) {
096 LOG.error(e.getMessage(), e);
097 }
098 }
099 }
100 return matches;
101 }
102
103 private BooleanExpression getExpression(String selector) throws Exception{
104 BooleanExpression result;
105 synchronized(expressionCache){
106 result = expressionCache.get(selector);
107 if (result == null){
108 result = compileSelector(selector);
109 expressionCache.put(selector,result);
110 }
111 }
112 return result;
113 }
114
115 /**
116 * @return The SubQueueSelectorCacheBroker instance or null if no such broker is available.
117 */
118 private SubQueueSelectorCacheBroker getSubQueueSelectorCacheBrokerPlugin(final Broker broker) {
119 if (selectorCachePlugin == null) {
120 selectorCachePlugin = (SubQueueSelectorCacheBroker) broker.getAdaptor(SubQueueSelectorCacheBroker.class);
121 } //if
122
123 return selectorCachePlugin;
124 }
125
126 /**
127 * Pre-compile the JMS selector.
128 *
129 * @param selectorExpression The non-null JMS selector expression.
130 */
131 private BooleanExpression compileSelector(final String selectorExpression) throws Exception {
132 return SelectorParser.parse(selectorExpression);
133 }
134 }