001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import org.apache.activemq.filter.BooleanExpression;
020 import org.apache.activemq.filter.MessageEvaluationContext;
021 import org.apache.activemq.util.JMSExceptionSupport;
022 import org.slf4j.Logger;
023 import org.slf4j.LoggerFactory;
024
025 import javax.jms.JMSException;
026 import java.io.IOException;
027 import java.util.Arrays;
028
029 /**
030 * @openwire:marshaller code="91"
031 *
032 */
033 public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
034
035 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
036 static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
037
038 protected BrokerId networkBrokerId;
039 protected int networkTTL;
040
041 public NetworkBridgeFilter() {
042 }
043
044 public NetworkBridgeFilter(BrokerId networkBrokerId, int networkTTL) {
045 this.networkBrokerId = networkBrokerId;
046 this.networkTTL = networkTTL;
047 }
048
049 public byte getDataStructureType() {
050 return DATA_STRUCTURE_TYPE;
051 }
052
053 public boolean isMarshallAware() {
054 return false;
055 }
056
057 public boolean matches(MessageEvaluationContext mec) throws JMSException {
058 try {
059 // for Queues - the message can be acknowledged and dropped whilst
060 // still
061 // in the dispatch loop
062 // so need to get the reference to it
063 Message message = mec.getMessage();
064 return message != null && matchesForwardingFilter(message, mec);
065 } catch (IOException e) {
066 throw JMSExceptionSupport.create(e);
067 }
068 }
069
070 public Object evaluate(MessageEvaluationContext message) throws JMSException {
071 return matches(message) ? Boolean.TRUE : Boolean.FALSE;
072 }
073
074 protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) {
075
076 if (contains(message.getBrokerPath(), networkBrokerId)) {
077 if (LOG.isTraceEnabled()) {
078 LOG.trace("Message all ready routed once through target broker ("
079 + networkBrokerId + "), path: "
080 + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
081 }
082 return false;
083 }
084
085 int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
086
087 if (hops >= networkTTL) {
088 if (LOG.isTraceEnabled()) {
089 LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
090 }
091 return false;
092 }
093
094 if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
095 ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
096 hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
097 if (hops >= networkTTL) {
098 if (LOG.isTraceEnabled()) {
099 LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
100 }
101 return false;
102 }
103
104 if (contains(info.getBrokerPath(), networkBrokerId)) {
105 LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
106 + networkBrokerId + "), path: "
107 + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
108 return false;
109 }
110 }
111 return true;
112 }
113
114 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
115 if (brokerPath != null && brokerId != null) {
116 for (int i = 0; i < brokerPath.length; i++) {
117 if (brokerId.equals(brokerPath[i])) {
118 return true;
119 }
120 }
121 }
122 return false;
123 }
124
125 /**
126 * @openwire:property version=1
127 */
128 public int getNetworkTTL() {
129 return networkTTL;
130 }
131
132 public void setNetworkTTL(int networkTTL) {
133 this.networkTTL = networkTTL;
134 }
135
136 /**
137 * @openwire:property version=1 cache=true
138 */
139 public BrokerId getNetworkBrokerId() {
140 return networkBrokerId;
141 }
142
143 public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
144 this.networkBrokerId = remoteBrokerPath;
145 }
146
147 }