001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.List;
021 import java.util.Set;
022 import org.apache.activemq.broker.Broker;
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.broker.ProducerBrokerExchange;
025 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
026 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
027 import org.apache.activemq.command.ActiveMQDestination;
028 import org.apache.activemq.command.Message;
029 import org.apache.activemq.command.MessageAck;
030 import org.apache.activemq.command.MessageDispatchNotification;
031 import org.apache.activemq.command.ProducerInfo;
032 import org.apache.activemq.store.MessageStore;
033 import org.apache.activemq.usage.MemoryUsage;
034 import org.apache.activemq.usage.Usage;
035
036 /**
037 *
038 *
039 */
040 public class DestinationFilter implements Destination {
041
042 private final Destination next;
043
044 public DestinationFilter(Destination next) {
045 this.next = next;
046 }
047
048 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
049 next.acknowledge(context, sub, ack, node);
050 }
051
052 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
053 next.addSubscription(context, sub);
054 }
055
056 public Message[] browse() {
057 return next.browse();
058 }
059
060 public void dispose(ConnectionContext context) throws IOException {
061 next.dispose(context);
062 }
063
064 public boolean isDisposed() {
065 return next.isDisposed();
066 }
067
068 public void gc() {
069 next.gc();
070 }
071
072 public void markForGC(long timeStamp) {
073 next.markForGC(timeStamp);
074 }
075
076 public boolean canGC() {
077 return next.canGC();
078 }
079
080 public long getInactiveTimoutBeforeGC() {
081 return next.getInactiveTimoutBeforeGC();
082 }
083
084 public ActiveMQDestination getActiveMQDestination() {
085 return next.getActiveMQDestination();
086 }
087
088 public DeadLetterStrategy getDeadLetterStrategy() {
089 return next.getDeadLetterStrategy();
090 }
091
092 public DestinationStatistics getDestinationStatistics() {
093 return next.getDestinationStatistics();
094 }
095
096 public String getName() {
097 return next.getName();
098 }
099
100 public MemoryUsage getMemoryUsage() {
101 return next.getMemoryUsage();
102 }
103
104 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
105 next.removeSubscription(context, sub, lastDeliveredSequenceId);
106 }
107
108 public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
109 next.send(context, messageSend);
110 }
111
112 public void start() throws Exception {
113 next.start();
114 }
115
116 public void stop() throws Exception {
117 next.stop();
118 }
119
120 public List<Subscription> getConsumers() {
121 return next.getConsumers();
122 }
123
124 /**
125 * Sends a message to the given destination which may be a wildcard
126 *
127 * @param context broker context
128 * @param message message to send
129 * @param destination possibly wildcard destination to send the message to
130 * @throws Exception on error
131 */
132 protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
133 Broker broker = context.getConnectionContext().getBroker();
134 Set<Destination> destinations = broker.getDestinations(destination);
135
136 for (Destination dest : destinations) {
137 dest.send(context, message.copy());
138 }
139 }
140
141 public MessageStore getMessageStore() {
142 return next.getMessageStore();
143 }
144
145 public boolean isProducerFlowControl() {
146 return next.isProducerFlowControl();
147 }
148
149 public void setProducerFlowControl(boolean value) {
150 next.setProducerFlowControl(value);
151 }
152
153 public boolean isAlwaysRetroactive() {
154 return next.isAlwaysRetroactive();
155 }
156
157 public void setAlwaysRetroactive(boolean value) {
158 next.setAlwaysRetroactive(value);
159 }
160
161 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
162 next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
163 }
164
165 public long getBlockedProducerWarningInterval() {
166 return next.getBlockedProducerWarningInterval();
167 }
168
169 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
170 next.addProducer(context, info);
171 }
172
173 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
174 next.removeProducer(context, info);
175 }
176
177 public int getMaxAuditDepth() {
178 return next.getMaxAuditDepth();
179 }
180
181 public int getMaxProducersToAudit() {
182 return next.getMaxProducersToAudit();
183 }
184
185 public boolean isEnableAudit() {
186 return next.isEnableAudit();
187 }
188
189 public void setEnableAudit(boolean enableAudit) {
190 next.setEnableAudit(enableAudit);
191 }
192
193 public void setMaxAuditDepth(int maxAuditDepth) {
194 next.setMaxAuditDepth(maxAuditDepth);
195 }
196
197 public void setMaxProducersToAudit(int maxProducersToAudit) {
198 next.setMaxProducersToAudit(maxProducersToAudit);
199 }
200
201 public boolean isActive() {
202 return next.isActive();
203 }
204
205 public int getMaxPageSize() {
206 return next.getMaxPageSize();
207 }
208
209 public void setMaxPageSize(int maxPageSize) {
210 next.setMaxPageSize(maxPageSize);
211 }
212
213 public boolean isUseCache() {
214 return next.isUseCache();
215 }
216
217 public void setUseCache(boolean useCache) {
218 next.setUseCache(useCache);
219 }
220
221 public int getMinimumMessageSize() {
222 return next.getMinimumMessageSize();
223 }
224
225 public void setMinimumMessageSize(int minimumMessageSize) {
226 next.setMinimumMessageSize(minimumMessageSize);
227 }
228
229 public void wakeup() {
230 next.wakeup();
231 }
232
233 public boolean isLazyDispatch() {
234 return next.isLazyDispatch();
235 }
236
237 public void setLazyDispatch(boolean value) {
238 next.setLazyDispatch(value);
239 }
240
241 public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
242 next.messageExpired(context, prefetchSubscription, node);
243 }
244
245 public boolean iterate() {
246 return next.iterate();
247 }
248
249 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
250 next.fastProducer(context, producerInfo);
251 }
252
253 public void isFull(ConnectionContext context, Usage<?> usage) {
254 next.isFull(context, usage);
255 }
256
257 public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
258 next.messageConsumed(context, messageReference);
259 }
260
261 public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
262 next.messageDelivered(context, messageReference);
263 }
264
265 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
266 next.messageDiscarded(context, sub, messageReference);
267 }
268
269 public void slowConsumer(ConnectionContext context, Subscription subs) {
270 next.slowConsumer(context, subs);
271 }
272
273 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
274 next.messageExpired(context, subs, node);
275 }
276
277 public int getMaxBrowsePageSize() {
278 return next.getMaxBrowsePageSize();
279 }
280
281 public void setMaxBrowsePageSize(int maxPageSize) {
282 next.setMaxBrowsePageSize(maxPageSize);
283 }
284
285 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
286 next.processDispatchNotification(messageDispatchNotification);
287 }
288
289 public int getCursorMemoryHighWaterMark() {
290 return next.getCursorMemoryHighWaterMark();
291 }
292
293 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
294 next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
295 }
296
297 public boolean isPrioritizedMessages() {
298 return next.isPrioritizedMessages();
299 }
300
301 public SlowConsumerStrategy getSlowConsumerStrategy() {
302 return next.getSlowConsumerStrategy();
303 }
304
305 public boolean isDoOptimzeMessageStorage() {
306 return next.isDoOptimzeMessageStorage();
307 }
308
309 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
310 next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
311 }
312
313 }