001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.util;
018
019 import java.io.DataOutputStream;
020 import java.io.IOException;
021 import java.net.DatagramPacket;
022 import java.net.DatagramSocket;
023 import java.net.InetAddress;
024 import java.net.InetSocketAddress;
025 import java.net.SocketAddress;
026 import java.net.URI;
027 import java.net.URISyntaxException;
028 import java.net.UnknownHostException;
029
030 import org.apache.activemq.broker.BrokerPluginSupport;
031 import org.apache.activemq.broker.ConnectionContext;
032 import org.apache.activemq.broker.ConsumerBrokerExchange;
033 import org.apache.activemq.broker.ProducerBrokerExchange;
034 import org.apache.activemq.broker.region.Subscription;
035 import org.apache.activemq.command.ActiveMQDestination;
036 import org.apache.activemq.command.BrokerId;
037 import org.apache.activemq.command.ConnectionInfo;
038 import org.apache.activemq.command.ConsumerInfo;
039 import org.apache.activemq.command.DataStructure;
040 import org.apache.activemq.command.DestinationInfo;
041 import org.apache.activemq.command.JournalTrace;
042 import org.apache.activemq.command.Message;
043 import org.apache.activemq.command.MessageAck;
044 import org.apache.activemq.command.MessageDispatch;
045 import org.apache.activemq.command.MessageDispatchNotification;
046 import org.apache.activemq.command.MessagePull;
047 import org.apache.activemq.command.ProducerInfo;
048 import org.apache.activemq.command.RemoveSubscriptionInfo;
049 import org.apache.activemq.command.Response;
050 import org.apache.activemq.command.SessionInfo;
051 import org.apache.activemq.command.TransactionId;
052 import org.apache.activemq.command.TransactionInfo;
053 import org.apache.activemq.openwire.OpenWireFormatFactory;
054 import org.apache.activemq.util.ByteArrayOutputStream;
055 import org.apache.activemq.util.ByteSequence;
056 import org.apache.activemq.wireformat.WireFormat;
057 import org.apache.activemq.wireformat.WireFormatFactory;
058 import org.slf4j.Logger;
059 import org.slf4j.LoggerFactory;
060
061 /**
062 * A Broker interceptor which allows you to trace all operations to a UDP
063 * socket.
064 *
065 * @org.apache.xbean.XBean element="udpTraceBrokerPlugin"
066 *
067 */
068 public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
069
070 private static final Logger LOG = LoggerFactory.getLogger(UDPTraceBrokerPlugin.class);
071 protected WireFormat wireFormat;
072 protected WireFormatFactory wireFormatFactory;
073 protected int maxTraceDatagramSize = 1024 * 4;
074 protected URI destination;
075 protected DatagramSocket socket;
076
077 protected BrokerId brokerId;
078 protected SocketAddress address;
079 protected boolean broadcast;
080
081 public UDPTraceBrokerPlugin() {
082 try {
083 destination = new URI("udp://127.0.0.1:61616");
084 } catch (URISyntaxException wontHappen) {
085 }
086 }
087
088 public void start() throws Exception {
089 super.start();
090 if (getWireFormat() == null) {
091 throw new IllegalArgumentException("Wireformat must be specifed.");
092 }
093 if (address == null) {
094 address = createSocketAddress(destination);
095 }
096 socket = createSocket();
097
098 brokerId = super.getBrokerId();
099 trace(new JournalTrace("START"));
100 }
101
102 protected DatagramSocket createSocket() throws IOException {
103 DatagramSocket s = new DatagramSocket();
104 s.setSendBufferSize(maxTraceDatagramSize);
105 s.setBroadcast(broadcast);
106 return s;
107 }
108
109 public void stop() throws Exception {
110 trace(new JournalTrace("STOP"));
111 socket.close();
112 super.stop();
113 }
114
115 private void trace(DataStructure command) {
116 try {
117
118 ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize);
119 DataOutputStream out = new DataOutputStream(baos);
120 wireFormat.marshal(brokerId, out);
121 wireFormat.marshal(command, out);
122 out.close();
123 ByteSequence sequence = baos.toByteSequence();
124 DatagramPacket datagram = new DatagramPacket(sequence.getData(), sequence.getOffset(), sequence.getLength(), address);
125 socket.send(datagram);
126
127 } catch (Throwable e) {
128 LOG.debug("Failed to trace: " + command, e);
129 }
130 }
131
132 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
133 trace(messageSend);
134 super.send(producerExchange, messageSend);
135 }
136
137 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
138 trace(ack);
139 super.acknowledge(consumerExchange, ack);
140 }
141
142 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
143 trace(info);
144 super.addConnection(context, info);
145 }
146
147 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
148 trace(info);
149 return super.addConsumer(context, info);
150 }
151
152 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
153 trace(info);
154 super.addDestinationInfo(context, info);
155 }
156
157 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
158 trace(info);
159 super.addProducer(context, info);
160 }
161
162 public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
163 trace(info);
164 super.addSession(context, info);
165 }
166
167 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
168 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN));
169 super.beginTransaction(context, xid);
170 }
171
172 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
173 trace(new TransactionInfo(context.getConnectionId(), xid, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE));
174 super.commitTransaction(context, xid, onePhase);
175 }
176
177 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
178 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET));
179 super.forgetTransaction(context, xid);
180 }
181
182 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
183 trace(pull);
184 return super.messagePull(context, pull);
185 }
186
187 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
188 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE));
189 return super.prepareTransaction(context, xid);
190 }
191
192 public void postProcessDispatch(MessageDispatch messageDispatch) {
193 trace(messageDispatch);
194 super.postProcessDispatch(messageDispatch);
195 }
196
197 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
198 trace(messageDispatchNotification);
199 super.processDispatchNotification(messageDispatchNotification);
200 }
201
202 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
203 trace(info.createRemoveCommand());
204 super.removeConnection(context, info, error);
205 }
206
207 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
208 trace(info.createRemoveCommand());
209 super.removeConsumer(context, info);
210 }
211
212 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
213 super.removeDestination(context, destination, timeout);
214 }
215
216 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
217 trace(info);
218 super.removeDestinationInfo(context, info);
219 }
220
221 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
222 trace(info.createRemoveCommand());
223 super.removeProducer(context, info);
224 }
225
226 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
227 trace(info.createRemoveCommand());
228 super.removeSession(context, info);
229 }
230
231 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
232 trace(info);
233 super.removeSubscription(context, info);
234 }
235
236 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
237 trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK));
238 super.rollbackTransaction(context, xid);
239 }
240
241 public WireFormat getWireFormat() {
242 if (wireFormat == null) {
243 wireFormat = createWireFormat();
244 }
245 return wireFormat;
246 }
247
248 protected WireFormat createWireFormat() {
249 return getWireFormatFactory().createWireFormat();
250 }
251
252 public void setWireFormat(WireFormat wireFormat) {
253 this.wireFormat = wireFormat;
254 }
255
256 public WireFormatFactory getWireFormatFactory() {
257 if (wireFormatFactory == null) {
258 wireFormatFactory = createWireFormatFactory();
259 }
260 return wireFormatFactory;
261 }
262
263 protected OpenWireFormatFactory createWireFormatFactory() {
264 OpenWireFormatFactory wf = new OpenWireFormatFactory();
265 wf.setCacheEnabled(false);
266 wf.setVersion(1);
267 wf.setTightEncodingEnabled(true);
268 wf.setSizePrefixDisabled(true);
269 return wf;
270 }
271
272 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
273 this.wireFormatFactory = wireFormatFactory;
274 }
275
276 protected SocketAddress createSocketAddress(URI location) throws UnknownHostException {
277 InetAddress a = InetAddress.getByName(location.getHost());
278 int port = location.getPort();
279 return new InetSocketAddress(a, port);
280 }
281
282 public URI getDestination() {
283 return destination;
284 }
285
286 public void setDestination(URI destination) {
287 this.destination = destination;
288 }
289
290 public int getMaxTraceDatagramSize() {
291 return maxTraceDatagramSize;
292 }
293
294 public void setMaxTraceDatagramSize(int maxTraceDatagramSize) {
295 this.maxTraceDatagramSize = maxTraceDatagramSize;
296 }
297
298 public boolean isBroadcast() {
299 return broadcast;
300 }
301
302 public void setBroadcast(boolean broadcast) {
303 this.broadcast = broadcast;
304 }
305
306 public SocketAddress getAddress() {
307 return address;
308 }
309
310 public void setAddress(SocketAddress address) {
311 this.address = address;
312 }
313
314 }