001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.util;
018
019 import java.util.Set;
020 import javax.annotation.PostConstruct;
021 import org.apache.activemq.broker.BrokerPluginSupport;
022 import org.apache.activemq.broker.Connection;
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.broker.ConsumerBrokerExchange;
025 import org.apache.activemq.broker.ProducerBrokerExchange;
026 import org.apache.activemq.broker.region.Destination;
027 import org.apache.activemq.broker.region.MessageReference;
028 import org.apache.activemq.broker.region.Subscription;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.BrokerInfo;
031 import org.apache.activemq.command.ConnectionInfo;
032 import org.apache.activemq.command.ConsumerInfo;
033 import org.apache.activemq.command.DestinationInfo;
034 import org.apache.activemq.command.Message;
035 import org.apache.activemq.command.MessageAck;
036 import org.apache.activemq.command.MessageDispatch;
037 import org.apache.activemq.command.MessageDispatchNotification;
038 import org.apache.activemq.command.MessagePull;
039 import org.apache.activemq.command.ProducerInfo;
040 import org.apache.activemq.command.RemoveSubscriptionInfo;
041 import org.apache.activemq.command.Response;
042 import org.apache.activemq.command.SessionInfo;
043 import org.apache.activemq.command.TransactionId;
044 import org.apache.activemq.usage.Usage;
045 import org.slf4j.Logger;
046 import org.slf4j.LoggerFactory;
047
048 /**
049 * A simple Broker intercepter which allows you to enable/disable logging.
050 *
051 * @org.apache.xbean.XBean
052 */
053
054 public class LoggingBrokerPlugin extends BrokerPluginSupport {
055
056 private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class);
057
058 private boolean logAll = false;
059 private boolean logMessageEvents = false;
060 private boolean logConnectionEvents = true;
061 private boolean logSessionEvents = true;
062 private boolean logTransactionEvents = false;
063 private boolean logConsumerEvents = false;
064 private boolean logProducerEvents = false;
065 private boolean logInternalEvents = false;
066
067 /**
068 * @throws Exception
069 * @org.apache.xbean.InitMethod
070 */
071 @PostConstruct
072 public void afterPropertiesSet() throws Exception {
073 LOG.info("Created LoggingBrokerPlugin: " + this.toString());
074 }
075
076 public boolean isLogAll() {
077 return logAll;
078 }
079
080 /**
081 * Logger all Events that go through the Plugin
082 */
083 public void setLogAll(boolean logAll) {
084 this.logAll = logAll;
085 }
086
087 public boolean isLogMessageEvents() {
088 return logMessageEvents;
089 }
090
091 /**
092 * Logger Events that are related to message processing
093 */
094 public void setLogMessageEvents(boolean logMessageEvents) {
095 this.logMessageEvents = logMessageEvents;
096 }
097
098 public boolean isLogConnectionEvents() {
099 return logConnectionEvents;
100 }
101
102 /**
103 * Logger Events that are related to connections
104 */
105 public void setLogConnectionEvents(boolean logConnectionEvents) {
106 this.logConnectionEvents = logConnectionEvents;
107 }
108
109 public boolean isLogSessionEvents() {
110 return logSessionEvents;
111 }
112
113 /**
114 * Logger Events that are related to sessions
115 */
116 public void setLogSessionEvents(boolean logSessionEvents) {
117 this.logSessionEvents = logSessionEvents;
118 }
119
120 public boolean isLogTransactionEvents() {
121 return logTransactionEvents;
122 }
123
124 /**
125 * Logger Events that are related to transaction processing
126 */
127 public void setLogTransactionEvents(boolean logTransactionEvents) {
128 this.logTransactionEvents = logTransactionEvents;
129 }
130
131 public boolean isLogConsumerEvents() {
132 return logConsumerEvents;
133 }
134
135 /**
136 * Logger Events that are related to Consumers
137 */
138 public void setLogConsumerEvents(boolean logConsumerEvents) {
139 this.logConsumerEvents = logConsumerEvents;
140 }
141
142 public boolean isLogProducerEvents() {
143 return logProducerEvents;
144 }
145
146 /**
147 * Logger Events that are related to Producers
148 */
149 public void setLogProducerEvents(boolean logProducerEvents) {
150 this.logProducerEvents = logProducerEvents;
151 }
152
153 public boolean isLogInternalEvents() {
154 return logInternalEvents;
155 }
156
157 /**
158 * Logger Events that are normally internal to the broker
159 */
160 public void setLogInternalEvents(boolean logInternalEvents) {
161 this.logInternalEvents = logInternalEvents;
162 }
163
164 @Override
165 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
166 if (isLogAll() || isLogConsumerEvents()) {
167 LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId()
168 + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
169 if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
170 LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId()
171 + ", Last Message Id: " + ack.getLastMessageId());
172 }
173 }
174 super.acknowledge(consumerExchange, ack);
175 }
176
177 @Override
178 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
179 if (isLogAll() || isLogConsumerEvents()) {
180 LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName());
181 }
182 return super.messagePull(context, pull);
183 }
184
185 @Override
186 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
187 if (isLogAll() || isLogConnectionEvents()) {
188 LOG.info("Adding Connection : " + info);
189 }
190 super.addConnection(context, info);
191 }
192
193 @Override
194 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
195 if (isLogAll() || isLogConsumerEvents()) {
196 LOG.info("Adding Consumer : " + info);
197 }
198 return super.addConsumer(context, info);
199 }
200
201 @Override
202 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
203 if (isLogAll() || isLogProducerEvents()) {
204 LOG.info("Adding Producer :" + info);
205 }
206 super.addProducer(context, info);
207 }
208
209 @Override
210 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
211 if (isLogAll() || isLogTransactionEvents()) {
212 LOG.info("Commiting transaction : " + xid.getTransactionKey());
213 }
214 super.commitTransaction(context, xid, onePhase);
215 }
216
217 @Override
218 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
219 if (isLogAll() || isLogConsumerEvents()) {
220 LOG.info("Removing subscription : " + info);
221 }
222 super.removeSubscription(context, info);
223 }
224
225 @Override
226 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
227
228 TransactionId[] result = super.getPreparedTransactions(context);
229 if ((isLogAll() || isLogTransactionEvents()) && result != null) {
230 StringBuffer tids = new StringBuffer();
231 for (TransactionId tid : result) {
232 if (tids.length() > 0) {
233 tids.append(", ");
234 }
235 tids.append(tid.getTransactionKey());
236 }
237 LOG.info("Prepared transactions : " + tids);
238 }
239 return result;
240 }
241
242 @Override
243 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
244 if (isLogAll() || isLogTransactionEvents()) {
245 LOG.info("Preparing transaction : " + xid.getTransactionKey());
246 }
247 return super.prepareTransaction(context, xid);
248 }
249
250 @Override
251 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
252 if (isLogAll() || isLogConnectionEvents()) {
253 LOG.info("Removing Connection : " + info);
254 }
255 super.removeConnection(context, info, error);
256 }
257
258 @Override
259 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
260 if (isLogAll() || isLogConsumerEvents()) {
261 LOG.info("Removing Consumer : " + info);
262 }
263 super.removeConsumer(context, info);
264 }
265
266 @Override
267 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
268 if (isLogAll() || isLogProducerEvents()) {
269 LOG.info("Removing Producer : " + info);
270 }
271 super.removeProducer(context, info);
272 }
273
274 @Override
275 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
276 if (isLogAll() || isLogTransactionEvents()) {
277 LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
278 }
279 super.rollbackTransaction(context, xid);
280 }
281
282 @Override
283 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
284 if (isLogAll() || isLogProducerEvents()) {
285 LOG.info("Sending message : " + messageSend.copy());
286 }
287 super.send(producerExchange, messageSend);
288 }
289
290 @Override
291 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
292 if (isLogAll() || isLogTransactionEvents()) {
293 LOG.info("Beginning transaction : " + xid.getTransactionKey());
294 }
295 super.beginTransaction(context, xid);
296 }
297
298 @Override
299 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
300 if (isLogAll() || isLogTransactionEvents()) {
301 LOG.info("Forgetting transaction : " + transactionId.getTransactionKey());
302 }
303 super.forgetTransaction(context, transactionId);
304 }
305
306 @Override
307 public Connection[] getClients() throws Exception {
308 Connection[] result = super.getClients();
309
310 if (isLogAll() || isLogInternalEvents()) {
311 if (result == null) {
312 LOG.info("Get Clients returned empty list.");
313 } else {
314 StringBuffer cids = new StringBuffer();
315 for (Connection c : result) {
316 cids.append(cids.length() > 0 ? ", " : "");
317 cids.append(c.getConnectionId());
318 }
319 LOG.info("Connected clients : " + cids);
320 }
321 }
322 return super.getClients();
323 }
324
325 @Override
326 public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
327 ActiveMQDestination destination, boolean create) throws Exception {
328 if (isLogAll() || isLogInternalEvents()) {
329 LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":"
330 + destination.getPhysicalName());
331 }
332 return super.addDestination(context, destination, create);
333 }
334
335 @Override
336 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
337 throws Exception {
338 if (isLogAll() || isLogInternalEvents()) {
339 LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":"
340 + destination.getPhysicalName());
341 }
342 super.removeDestination(context, destination, timeout);
343 }
344
345 @Override
346 public ActiveMQDestination[] getDestinations() throws Exception {
347 ActiveMQDestination[] result = super.getDestinations();
348 if (isLogAll() || isLogInternalEvents()) {
349 if (result == null) {
350 LOG.info("Get Destinations returned empty list.");
351 } else {
352 StringBuffer destinations = new StringBuffer();
353 for (ActiveMQDestination dest : result) {
354 destinations.append(destinations.length() > 0 ? ", " : "");
355 destinations.append(dest.getPhysicalName());
356 }
357 LOG.info("Get Destinations : " + destinations);
358 }
359 }
360 return result;
361 }
362
363 @Override
364 public void start() throws Exception {
365 if (isLogAll() || isLogInternalEvents()) {
366 LOG.info("Starting " + getBrokerName());
367 }
368 super.start();
369 }
370
371 @Override
372 public void stop() throws Exception {
373 if (isLogAll() || isLogInternalEvents()) {
374 LOG.info("Stopping " + getBrokerName());
375 }
376 super.stop();
377 }
378
379 @Override
380 public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
381 if (isLogAll() || isLogSessionEvents()) {
382 LOG.info("Adding Session : " + info);
383 }
384 super.addSession(context, info);
385 }
386
387 @Override
388 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
389 if (isLogAll() || isLogSessionEvents()) {
390 LOG.info("Removing Session : " + info);
391 }
392 super.removeSession(context, info);
393 }
394
395 @Override
396 public void addBroker(Connection connection, BrokerInfo info) {
397 if (isLogAll() || isLogInternalEvents()) {
398 LOG.info("Adding Broker " + info.getBrokerName());
399 }
400 super.addBroker(connection, info);
401 }
402
403 @Override
404 public void removeBroker(Connection connection, BrokerInfo info) {
405 if (isLogAll() || isLogInternalEvents()) {
406 LOG.info("Removing Broker " + info.getBrokerName());
407 }
408 super.removeBroker(connection, info);
409 }
410
411 @Override
412 public BrokerInfo[] getPeerBrokerInfos() {
413 BrokerInfo[] result = super.getPeerBrokerInfos();
414 if (isLogAll() || isLogInternalEvents()) {
415 if (result == null) {
416 LOG.info("Get Peer Broker Infos returned empty list.");
417 } else {
418 StringBuffer peers = new StringBuffer();
419 for (BrokerInfo bi : result) {
420 peers.append(peers.length() > 0 ? ", " : "");
421 peers.append(bi.getBrokerName());
422 }
423 LOG.info("Get Peer Broker Infos : " + peers);
424 }
425 }
426 return result;
427 }
428
429 @Override
430 public void preProcessDispatch(MessageDispatch messageDispatch) {
431 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
432 LOG.info("preProcessDispatch :" + messageDispatch);
433 }
434 super.preProcessDispatch(messageDispatch);
435 }
436
437 @Override
438 public void postProcessDispatch(MessageDispatch messageDispatch) {
439 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
440 LOG.info("postProcessDispatch :" + messageDispatch);
441 }
442 super.postProcessDispatch(messageDispatch);
443 }
444
445 @Override
446 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
447 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
448 LOG.info("ProcessDispatchNotification :" + messageDispatchNotification);
449 }
450 super.processDispatchNotification(messageDispatchNotification);
451 }
452
453 @Override
454 public Set<ActiveMQDestination> getDurableDestinations() {
455 Set<ActiveMQDestination> result = super.getDurableDestinations();
456 if (isLogAll() || isLogInternalEvents()) {
457 if (result == null) {
458 LOG.info("Get Durable Destinations returned empty list.");
459 } else {
460 StringBuffer destinations = new StringBuffer();
461 for (ActiveMQDestination dest : result) {
462 destinations.append(destinations.length() > 0 ? ", " : "");
463 destinations.append(dest.getPhysicalName());
464 }
465 LOG.info("Get Durable Destinations : " + destinations);
466 }
467 }
468 return result;
469 }
470
471 @Override
472 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
473 if (isLogAll() || isLogInternalEvents()) {
474 LOG.info("Adding destination info : " + info);
475 }
476 super.addDestinationInfo(context, info);
477 }
478
479 @Override
480 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
481 if (isLogAll() || isLogInternalEvents()) {
482 LOG.info("Removing destination info : " + info);
483 }
484 super.removeDestinationInfo(context, info);
485 }
486
487 @Override
488 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
489 if (isLogAll() || isLogInternalEvents()) {
490 String msg = "Unable to display message.";
491
492 msg = message.getMessage().toString();
493
494 LOG.info("Message has expired : " + msg);
495 }
496 super.messageExpired(context, message, subscription);
497 }
498
499 @Override
500 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
501 Subscription subscription) {
502 if (isLogAll() || isLogInternalEvents()) {
503 String msg = "Unable to display message.";
504
505 msg = messageReference.getMessage().toString();
506
507 LOG.info("Sending to DLQ : " + msg);
508 }
509 super.sendToDeadLetterQueue(context, messageReference, subscription);
510 }
511
512 @Override
513 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
514 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
515 LOG.info("Fast Producer : " + producerInfo);
516 }
517 super.fastProducer(context, producerInfo);
518 }
519
520 @Override
521 public void isFull(ConnectionContext context, Destination destination, Usage usage) {
522 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
523 LOG.info("Destination is full : " + destination.getName());
524 }
525 super.isFull(context, destination, usage);
526 }
527
528 @Override
529 public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
530 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
531 String msg = "Unable to display message.";
532
533 msg = messageReference.getMessage().toString();
534
535 LOG.info("Message consumed : " + msg);
536 }
537 super.messageConsumed(context, messageReference);
538 }
539
540 @Override
541 public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
542 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
543 String msg = "Unable to display message.";
544
545 msg = messageReference.getMessage().toString();
546
547 LOG.info("Message delivered : " + msg);
548 }
549 super.messageDelivered(context, messageReference);
550 }
551
552 @Override
553 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
554 if (isLogAll() || isLogInternalEvents()) {
555 String msg = "Unable to display message.";
556
557 msg = messageReference.getMessage().toString();
558
559 LOG.info("Message discarded : " + msg);
560 }
561 super.messageDiscarded(context, sub, messageReference);
562 }
563
564 @Override
565 public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
566 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
567 LOG.info("Detected slow consumer on " + destination.getName());
568 StringBuffer buf = new StringBuffer("Connection(");
569 buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
570 buf.append(") Session(");
571 buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
572 buf.append(")");
573 LOG.info(buf.toString());
574 }
575 super.slowConsumer(context, destination, subs);
576 }
577
578 @Override
579 public void nowMasterBroker() {
580 if (isLogAll() || isLogInternalEvents()) {
581 LOG.info("Is now the master broker : " + getBrokerName());
582 }
583 super.nowMasterBroker();
584 }
585
586 @Override
587 public String toString() {
588 StringBuffer buf = new StringBuffer();
589 buf.append("LoggingBrokerPlugin(");
590 buf.append("logAll=");
591 buf.append(isLogAll());
592 buf.append(", logConnectionEvents=");
593 buf.append(isLogConnectionEvents());
594 buf.append(", logSessionEvents=");
595 buf.append(isLogSessionEvents());
596 buf.append(", logConsumerEvents=");
597 buf.append(isLogConsumerEvents());
598 buf.append(", logProducerEvents=");
599 buf.append(isLogProducerEvents());
600 buf.append(", logMessageEvents=");
601 buf.append(isLogMessageEvents());
602 buf.append(", logTransactionEvents=");
603 buf.append(isLogTransactionEvents());
604 buf.append(", logInternalEvents=");
605 buf.append(isLogInternalEvents());
606 buf.append(")");
607 return buf.toString();
608 }
609 }