001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.console.command.store.amq;
018
019 import java.io.File;
020 import java.io.InputStream;
021 import java.io.PrintWriter;
022 import java.util.ArrayList;
023 import java.util.Arrays;
024 import java.util.Collections;
025 import java.util.HashMap;
026 import java.util.Iterator;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Scanner;
030
031 import org.apache.activemq.command.ActiveMQBlobMessage;
032 import org.apache.activemq.command.ActiveMQBytesMessage;
033 import org.apache.activemq.command.ActiveMQMapMessage;
034 import org.apache.activemq.command.ActiveMQMessage;
035 import org.apache.activemq.command.ActiveMQObjectMessage;
036 import org.apache.activemq.command.ActiveMQStreamMessage;
037 import org.apache.activemq.command.ActiveMQTextMessage;
038 import org.apache.activemq.command.DataStructure;
039 import org.apache.activemq.command.JournalQueueAck;
040 import org.apache.activemq.command.JournalTopicAck;
041 import org.apache.activemq.command.JournalTrace;
042 import org.apache.activemq.command.JournalTransaction;
043 import org.apache.activemq.kaha.impl.async.Location;
044 import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
045 import org.apache.activemq.openwire.OpenWireFormat;
046 import org.apache.activemq.util.ByteSequence;
047 import org.apache.activemq.wireformat.WireFormat;
048 import org.apache.velocity.Template;
049 import org.apache.velocity.VelocityContext;
050 import org.apache.velocity.app.Velocity;
051 import org.apache.velocity.app.VelocityEngine;
052 import org.josql.Query;
053
054 /**
055 * Allows you to view the contents of a Journal.
056 *
057 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
058 */
059 public class AMQJournalTool {
060
061 private final ArrayList<File> dirs = new ArrayList<File>();
062 private final WireFormat wireFormat = new OpenWireFormat();
063 private final HashMap<String, String> resources = new HashMap<String, String>();
064
065 private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
066 private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
067 private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
068 private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
069 private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
070 private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
071 private String where;
072 private VelocityContext context;
073 private VelocityEngine velocity;
074 private boolean help;
075
076 public static void main(String[] args) throws Exception {
077 AMQJournalTool consumerTool = new AMQJournalTool();
078 String[] directories = CommandLineSupport
079 .setOptions(consumerTool, args);
080 if (directories.length < 1) {
081 System.out
082 .println("Please specify the directories with journal data to scan");
083 return;
084 }
085 for (int i = 0; i < directories.length; i++) {
086 consumerTool.getDirs().add(new File(directories[i]));
087 }
088 consumerTool.execute();
089 }
090
091 /**
092 * Creates a new VelocityContext that is pre-populated with the JVMs
093 * system properties.
094 *
095 * @return - the VelocityContext that got created.
096 */
097 protected VelocityContext createVelocityContext() {
098 VelocityContext ctx = new VelocityContext();
099 List keys = Arrays.asList(ctx.getKeys());
100
101 for (Iterator iterator = System.getProperties().entrySet()
102 .iterator(); iterator.hasNext();) {
103 Map.Entry kv = (Map.Entry) iterator.next();
104 String name = (String) kv.getKey();
105 String value = (String) kv.getValue();
106
107 if (!keys.contains(name)) {
108 ctx.put(name, value);
109 }
110 }
111 return ctx;
112 }
113
114
115 public void execute() throws Exception {
116
117 if( help ) {
118 showHelp();
119 return;
120 }
121
122 if (getDirs().size() < 1) {
123 System.out.println("");
124 System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
125 System.out.println("");
126 showHelp();
127 return;
128 }
129
130 for (File dir : getDirs()) {
131 if( !dir.exists() ) {
132 System.out.println("");
133 System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
134 System.out.println("");
135 showHelp();
136 return;
137 }
138 if( !dir.isDirectory() ) {
139 System.out.println("");
140 System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
141 System.out.println("");
142 showHelp();
143 return;
144 }
145 }
146
147 context = createVelocityContext();
148
149 velocity = new VelocityEngine();
150 velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
151 velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
152 velocity.init();
153
154 resources.put("message", messageFormat);
155 resources.put("topicAck", topicAckFormat);
156 resources.put("queueAck", queueAckFormat);
157 resources.put("transaction", transactionFormat);
158 resources.put("trace", traceFormat);
159 resources.put("unknown", unknownFormat);
160
161 Query query = null;
162 if (where != null) {
163 query = new Query();
164 query.parse("select * from "+Entry.class.getName()+" where "+where);
165
166 }
167
168 ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
169 manager.start();
170 try {
171 Location curr = manager.getFirstLocation();
172 while (curr != null) {
173
174 ByteSequence data = manager.read(curr);
175 DataStructure c = (DataStructure) wireFormat.unmarshal(data);
176
177 Entry entry = new Entry();
178 entry.setLocation(curr);
179 entry.setRecord(c);
180 entry.setData(data);
181 entry.setQuery(query);
182 process(entry);
183
184 curr = manager.getNextLocation(curr);
185 }
186 } finally {
187 manager.close();
188 }
189 }
190
191 private void showHelp() {
192 InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
193 Scanner scanner = new Scanner(is);
194 while (scanner.hasNextLine()) {
195 String line = scanner.nextLine();
196 System.out.println(line);
197 }
198 scanner.close(); }
199
200 private void process(Entry entry) throws Exception {
201
202 DataStructure record = entry.getRecord();
203
204 switch (record.getDataStructureType()) {
205 case ActiveMQMessage.DATA_STRUCTURE_TYPE:
206 entry.setType("ActiveMQMessage");
207 entry.setFormater("message");
208 display(entry);
209 break;
210 case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
211 entry.setType("ActiveMQBytesMessage");
212 entry.setFormater("message");
213 display(entry);
214 break;
215 case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
216 entry.setType("ActiveMQBlobMessage");
217 entry.setFormater("message");
218 display(entry);
219 break;
220 case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
221 entry.setType("ActiveMQMapMessage");
222 entry.setFormater("message");
223 display(entry);
224 break;
225 case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
226 entry.setType("ActiveMQObjectMessage");
227 entry.setFormater("message");
228 display(entry);
229 break;
230 case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
231 entry.setType("ActiveMQStreamMessage");
232 entry.setFormater("message");
233 display(entry);
234 break;
235 case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
236 entry.setType("ActiveMQTextMessage");
237 entry.setFormater("message");
238 display(entry);
239 break;
240 case JournalQueueAck.DATA_STRUCTURE_TYPE:
241 entry.setType("Queue Ack");
242 entry.setFormater("queueAck");
243 display(entry);
244 break;
245 case JournalTopicAck.DATA_STRUCTURE_TYPE:
246 entry.setType("Topic Ack");
247 entry.setFormater("topicAck");
248 display(entry);
249 break;
250 case JournalTransaction.DATA_STRUCTURE_TYPE:
251 entry.setType(getType((JournalTransaction) record));
252 entry.setFormater("transaction");
253 display(entry);
254 break;
255 case JournalTrace.DATA_STRUCTURE_TYPE:
256 entry.setType("Trace");
257 entry.setFormater("trace");
258 display(entry);
259 break;
260 default:
261 entry.setType("Unknown");
262 entry.setFormater("unknown");
263 display(entry);
264 break;
265 }
266 }
267
268 private String getType(JournalTransaction record) {
269 switch (record.getType()) {
270 case JournalTransaction.XA_PREPARE:
271 return "XA Prepare";
272 case JournalTransaction.XA_COMMIT:
273 return "XA Commit";
274 case JournalTransaction.XA_ROLLBACK:
275 return "XA Rollback";
276 case JournalTransaction.LOCAL_COMMIT:
277 return "Commit";
278 case JournalTransaction.LOCAL_ROLLBACK:
279 return "Rollback";
280 }
281 return "Unknown Transaction";
282 }
283
284 private void display(Entry entry) throws Exception {
285
286 if (entry.getQuery() != null) {
287 List list = Collections.singletonList(entry);
288 List results = entry.getQuery().execute(list).getResults();
289 if (results.isEmpty()) {
290 return;
291 }
292 }
293
294 CustomResourceLoader.setResources(resources);
295 try {
296
297 context.put("location", entry.getLocation());
298 context.put("record", entry.getRecord());
299 context.put("type", entry.getType());
300 if (entry.getRecord() instanceof ActiveMQMessage) {
301 context.put("body", new MessageBodyFormatter(
302 (ActiveMQMessage) entry.getRecord()));
303 }
304
305 Template template = velocity.getTemplate(entry.getFormater());
306 PrintWriter writer = new PrintWriter(System.out);
307 template.merge(context, writer);
308 writer.println();
309 writer.flush();
310 } finally {
311 CustomResourceLoader.setResources(null);
312 }
313 }
314
315 public void setMessageFormat(String messageFormat) {
316 this.messageFormat = messageFormat;
317 }
318
319 public void setTopicAckFormat(String ackFormat) {
320 this.topicAckFormat = ackFormat;
321 }
322
323 public void setTransactionFormat(String transactionFormat) {
324 this.transactionFormat = transactionFormat;
325 }
326
327 public void setTraceFormat(String traceFormat) {
328 this.traceFormat = traceFormat;
329 }
330
331 public void setUnknownFormat(String unknownFormat) {
332 this.unknownFormat = unknownFormat;
333 }
334
335 public void setQueueAckFormat(String queueAckFormat) {
336 this.queueAckFormat = queueAckFormat;
337 }
338
339 public String getQuery() {
340 return where;
341 }
342
343 public void setWhere(String query) {
344 this.where = query;
345 }
346
347 public boolean isHelp() {
348 return help;
349 }
350
351 public void setHelp(boolean help) {
352 this.help = help;
353 }
354
355 /**
356 * @return the dirs
357 */
358 public ArrayList<File> getDirs() {
359 return dirs;
360 }
361
362 }