001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.net.URI;
020 import java.util.Map;
021 import java.util.Set;
022 import java.util.concurrent.ThreadPoolExecutor;
023 import org.apache.activemq.broker.region.Destination;
024 import org.apache.activemq.broker.region.MessageReference;
025 import org.apache.activemq.broker.region.Subscription;
026 import org.apache.activemq.command.ActiveMQDestination;
027 import org.apache.activemq.command.BrokerId;
028 import org.apache.activemq.command.BrokerInfo;
029 import org.apache.activemq.command.ConnectionInfo;
030 import org.apache.activemq.command.ConsumerControl;
031 import org.apache.activemq.command.ConsumerInfo;
032 import org.apache.activemq.command.DestinationInfo;
033 import org.apache.activemq.command.Message;
034 import org.apache.activemq.command.MessageAck;
035 import org.apache.activemq.command.MessageDispatch;
036 import org.apache.activemq.command.MessageDispatchNotification;
037 import org.apache.activemq.command.MessagePull;
038 import org.apache.activemq.command.ProducerInfo;
039 import org.apache.activemq.command.RemoveSubscriptionInfo;
040 import org.apache.activemq.command.Response;
041 import org.apache.activemq.command.SessionInfo;
042 import org.apache.activemq.command.TransactionId;
043 import org.apache.activemq.store.kahadb.plist.PListStore;
044 import org.apache.activemq.thread.Scheduler;
045 import org.apache.activemq.usage.Usage;
046
047 /**
048 * Allows you to intercept broker operation so that features such as security
049 * can be implemented as a pluggable filter.
050 *
051 *
052 */
053 public class BrokerFilter implements Broker {
054
055 protected final Broker next;
056
057 public BrokerFilter(Broker next) {
058 this.next = next;
059 }
060
061 public Broker getAdaptor(Class type) {
062 if (type.isInstance(this)) {
063 return this;
064 }
065 return next.getAdaptor(type);
066 }
067
068 public Map<ActiveMQDestination, Destination> getDestinationMap() {
069 return next.getDestinationMap();
070 }
071
072 public Set <Destination>getDestinations(ActiveMQDestination destination) {
073 return next.getDestinations(destination);
074 }
075
076 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
077 next.acknowledge(consumerExchange, ack);
078 }
079
080 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
081 return next.messagePull(context, pull);
082 }
083
084 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
085 next.addConnection(context, info);
086 }
087
088 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
089 return next.addConsumer(context, info);
090 }
091
092 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
093 next.addProducer(context, info);
094 }
095
096 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
097 next.commitTransaction(context, xid, onePhase);
098 }
099
100 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
101 next.removeSubscription(context, info);
102 }
103
104 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
105 return next.getPreparedTransactions(context);
106 }
107
108 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
109 return next.prepareTransaction(context, xid);
110 }
111
112 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
113 next.removeConnection(context, info, error);
114 }
115
116 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
117 next.removeConsumer(context, info);
118 }
119
120 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
121 next.removeProducer(context, info);
122 }
123
124 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
125 next.rollbackTransaction(context, xid);
126 }
127
128 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
129 next.send(producerExchange, messageSend);
130 }
131
132 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
133 next.beginTransaction(context, xid);
134 }
135
136 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
137 next.forgetTransaction(context, transactionId);
138 }
139
140 public Connection[] getClients() throws Exception {
141 return next.getClients();
142 }
143
144 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
145 return next.addDestination(context, destination,createIfTemporary);
146 }
147
148 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
149 next.removeDestination(context, destination, timeout);
150 }
151
152 public ActiveMQDestination[] getDestinations() throws Exception {
153 return next.getDestinations();
154 }
155
156 public void start() throws Exception {
157 next.start();
158 }
159
160 public void stop() throws Exception {
161 next.stop();
162 }
163
164 public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
165 next.addSession(context, info);
166 }
167
168 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
169 next.removeSession(context, info);
170 }
171
172 public BrokerId getBrokerId() {
173 return next.getBrokerId();
174 }
175
176 public String getBrokerName() {
177 return next.getBrokerName();
178 }
179
180 public void gc() {
181 next.gc();
182 }
183
184 public void addBroker(Connection connection, BrokerInfo info) {
185 next.addBroker(connection, info);
186 }
187
188 public void removeBroker(Connection connection, BrokerInfo info) {
189 next.removeBroker(connection, info);
190 }
191
192 public BrokerInfo[] getPeerBrokerInfos() {
193 return next.getPeerBrokerInfos();
194 }
195
196 public void preProcessDispatch(MessageDispatch messageDispatch) {
197 next.preProcessDispatch(messageDispatch);
198 }
199
200 public void postProcessDispatch(MessageDispatch messageDispatch) {
201 next.postProcessDispatch(messageDispatch);
202 }
203
204 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
205 next.processDispatchNotification(messageDispatchNotification);
206 }
207
208 public boolean isStopped() {
209 return next.isStopped();
210 }
211
212 public Set<ActiveMQDestination> getDurableDestinations() {
213 return next.getDurableDestinations();
214 }
215
216 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
217 next.addDestinationInfo(context, info);
218 }
219
220 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
221 next.removeDestinationInfo(context, info);
222 }
223
224 public boolean isFaultTolerantConfiguration() {
225 return next.isFaultTolerantConfiguration();
226 }
227
228 public ConnectionContext getAdminConnectionContext() {
229 return next.getAdminConnectionContext();
230 }
231
232 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
233 next.setAdminConnectionContext(adminConnectionContext);
234 }
235
236 public PListStore getTempDataStore() {
237 return next.getTempDataStore();
238 }
239
240 public URI getVmConnectorURI() {
241 return next.getVmConnectorURI();
242 }
243
244 public void brokerServiceStarted() {
245 next.brokerServiceStarted();
246 }
247
248 public BrokerService getBrokerService() {
249 return next.getBrokerService();
250 }
251
252 public boolean isExpired(MessageReference messageReference) {
253 return next.isExpired(messageReference);
254 }
255
256 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
257 next.messageExpired(context, message, subscription);
258 }
259
260 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
261 Subscription subscription) {
262 next.sendToDeadLetterQueue(context, messageReference, subscription);
263 }
264
265 public Broker getRoot() {
266 return next.getRoot();
267 }
268
269 public long getBrokerSequenceId() {
270 return next.getBrokerSequenceId();
271 }
272
273
274 public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
275 next.fastProducer(context, producerInfo);
276 }
277
278 public void isFull(ConnectionContext context,Destination destination, Usage usage) {
279 next.isFull(context,destination, usage);
280 }
281
282 public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
283 next.messageConsumed(context, messageReference);
284 }
285
286 public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
287 next.messageDelivered(context, messageReference);
288 }
289
290 public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) {
291 next.messageDiscarded(context, sub, messageReference);
292 }
293
294 public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
295 next.slowConsumer(context, destination,subs);
296 }
297
298 public void nowMasterBroker() {
299 next.nowMasterBroker();
300 }
301
302 public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
303 ConsumerControl control) {
304 next.processConsumerControl(consumerExchange, control);
305 }
306
307 public Scheduler getScheduler() {
308 return next.getScheduler();
309 }
310
311 public ThreadPoolExecutor getExecutor() {
312 return next.getExecutor();
313 }
314
315 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
316 next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp);
317 }
318
319 public void networkBridgeStopped(BrokerInfo brokerInfo) {
320 next.networkBridgeStopped(brokerInfo);
321 }
322 }