001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.jmx;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Collections;
022 import java.util.Iterator;
023 import java.util.List;
024 import java.util.Map;
025 import javax.jms.Connection;
026 import javax.jms.InvalidSelectorException;
027 import javax.jms.MessageProducer;
028 import javax.jms.Session;
029 import javax.management.MalformedObjectNameException;
030 import javax.management.ObjectName;
031 import javax.management.openmbean.CompositeData;
032 import javax.management.openmbean.CompositeDataSupport;
033 import javax.management.openmbean.CompositeType;
034 import javax.management.openmbean.OpenDataException;
035 import javax.management.openmbean.TabularData;
036 import javax.management.openmbean.TabularDataSupport;
037 import javax.management.openmbean.TabularType;
038 import org.apache.activemq.ActiveMQConnectionFactory;
039 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
040 import org.apache.activemq.broker.region.Destination;
041 import org.apache.activemq.broker.region.Subscription;
042 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
043 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
044 import org.apache.activemq.command.ActiveMQDestination;
045 import org.apache.activemq.command.ActiveMQMessage;
046 import org.apache.activemq.command.ActiveMQTextMessage;
047 import org.apache.activemq.command.Message;
048 import org.apache.activemq.filter.BooleanExpression;
049 import org.apache.activemq.filter.MessageEvaluationContext;
050 import org.apache.activemq.selector.SelectorParser;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 public class DestinationView implements DestinationViewMBean {
055 private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class);
056 protected final Destination destination;
057 protected final ManagedRegionBroker broker;
058
059 public DestinationView(ManagedRegionBroker broker, Destination destination) {
060 this.broker = broker;
061 this.destination = destination;
062 }
063
064 public void gc() {
065 destination.gc();
066 }
067
068 public String getName() {
069 return destination.getName();
070 }
071
072 public void resetStatistics() {
073 destination.getDestinationStatistics().reset();
074 }
075
076 public long getEnqueueCount() {
077 return destination.getDestinationStatistics().getEnqueues().getCount();
078 }
079
080 public long getDequeueCount() {
081 return destination.getDestinationStatistics().getDequeues().getCount();
082 }
083
084 public long getDispatchCount() {
085 return destination.getDestinationStatistics().getDispatched().getCount();
086 }
087
088 public long getInFlightCount() {
089 return destination.getDestinationStatistics().getInflight().getCount();
090 }
091
092 public long getExpiredCount() {
093 return destination.getDestinationStatistics().getExpired().getCount();
094 }
095
096 public long getConsumerCount() {
097 return destination.getDestinationStatistics().getConsumers().getCount();
098 }
099
100 public long getQueueSize() {
101 return destination.getDestinationStatistics().getMessages().getCount();
102 }
103
104 public long getMessagesCached() {
105 return destination.getDestinationStatistics().getMessagesCached().getCount();
106 }
107
108 public int getMemoryPercentUsage() {
109 return destination.getMemoryUsage().getPercentUsage();
110 }
111
112 public long getMemoryLimit() {
113 return destination.getMemoryUsage().getLimit();
114 }
115
116 public void setMemoryLimit(long limit) {
117 destination.getMemoryUsage().setLimit(limit);
118 }
119
120 public double getAverageEnqueueTime() {
121 return destination.getDestinationStatistics().getProcessTime().getAverageTime();
122 }
123
124 public long getMaxEnqueueTime() {
125 return destination.getDestinationStatistics().getProcessTime().getMaxTime();
126 }
127
128 public long getMinEnqueueTime() {
129 return destination.getDestinationStatistics().getProcessTime().getMinTime();
130 }
131
132 public boolean isPrioritizedMessages() {
133 return destination.isPrioritizedMessages();
134 }
135
136 public CompositeData[] browse() throws OpenDataException {
137 try {
138 return browse(null);
139 } catch (InvalidSelectorException e) {
140 // should not happen.
141 throw new RuntimeException(e);
142 }
143 }
144
145 public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
146 Message[] messages = destination.browse();
147 ArrayList<CompositeData> c = new ArrayList<CompositeData>();
148
149 MessageEvaluationContext ctx = new MessageEvaluationContext();
150 ctx.setDestination(destination.getActiveMQDestination());
151 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
152
153 for (int i = 0; i < messages.length; i++) {
154 try {
155
156 if (selectorExpression == null) {
157 c.add(OpenTypeSupport.convert(messages[i]));
158 } else {
159 ctx.setMessageReference(messages[i]);
160 if (selectorExpression.matches(ctx)) {
161 c.add(OpenTypeSupport.convert(messages[i]));
162 }
163 }
164
165 } catch (Throwable e) {
166 // TODO DELETE ME
167 System.out.println(e);
168 e.printStackTrace();
169 // TODO DELETE ME
170 LOG.warn("exception browsing destination", e);
171 }
172 }
173
174 CompositeData rc[] = new CompositeData[c.size()];
175 c.toArray(rc);
176 return rc;
177 }
178
179 /**
180 * Browses the current destination returning a list of messages
181 */
182 public List<Object> browseMessages() throws InvalidSelectorException {
183 return browseMessages(null);
184 }
185
186 /**
187 * Browses the current destination with the given selector returning a list
188 * of messages
189 */
190 public List<Object> browseMessages(String selector) throws InvalidSelectorException {
191 Message[] messages = destination.browse();
192 ArrayList<Object> answer = new ArrayList<Object>();
193
194 MessageEvaluationContext ctx = new MessageEvaluationContext();
195 ctx.setDestination(destination.getActiveMQDestination());
196 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
197
198 for (int i = 0; i < messages.length; i++) {
199 try {
200 Message message = messages[i];
201 if (selectorExpression == null) {
202 answer.add(message);
203 } else {
204 ctx.setMessageReference(message);
205 if (selectorExpression.matches(ctx)) {
206 answer.add(message);
207 }
208 }
209
210 } catch (Throwable e) {
211 LOG.warn("exception browsing destination", e);
212 }
213 }
214 return answer;
215 }
216
217 public TabularData browseAsTable() throws OpenDataException {
218 try {
219 return browseAsTable(null);
220 } catch (InvalidSelectorException e) {
221 throw new RuntimeException(e);
222 }
223 }
224
225 public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
226 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
227 Message[] messages = destination.browse();
228 CompositeType ct = factory.getCompositeType();
229 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
230 TabularDataSupport rc = new TabularDataSupport(tt);
231
232 MessageEvaluationContext ctx = new MessageEvaluationContext();
233 ctx.setDestination(destination.getActiveMQDestination());
234 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
235
236 for (int i = 0; i < messages.length; i++) {
237 try {
238 if (selectorExpression == null) {
239 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
240 } else {
241 ctx.setMessageReference(messages[i]);
242 if (selectorExpression.matches(ctx)) {
243 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
244 }
245 }
246 } catch (Throwable e) {
247 LOG.warn("exception browsing destination", e);
248 }
249 }
250
251 return rc;
252 }
253
254 public String sendTextMessage(String body) throws Exception {
255 return sendTextMessage(Collections.EMPTY_MAP, body);
256 }
257
258 public String sendTextMessage(Map headers, String body) throws Exception {
259 return sendTextMessage(headers, body, null, null);
260 }
261
262 public String sendTextMessage(String body, String user, String password) throws Exception {
263 return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
264 }
265
266 public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception {
267
268 String brokerUrl = "vm://" + broker.getBrokerName();
269 ActiveMQDestination dest = destination.getActiveMQDestination();
270
271 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
272 Connection connection = null;
273 try {
274
275 connection = cf.createConnection(userName, password);
276 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
277 MessageProducer producer = session.createProducer(dest);
278 ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
279
280 for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
281 Map.Entry entry = (Map.Entry) iter.next();
282 msg.setObjectProperty((String) entry.getKey(), entry.getValue());
283 }
284
285 producer.setDeliveryMode(msg.getJMSDeliveryMode());
286 producer.setPriority(msg.getPriority());
287 long ttl = msg.getExpiration() - System.currentTimeMillis();
288 producer.setTimeToLive(ttl > 0 ? ttl : 0);
289 producer.send(msg);
290
291 return msg.getJMSMessageID();
292
293 } finally {
294 connection.close();
295 }
296
297 }
298
299 public int getMaxAuditDepth() {
300 return destination.getMaxAuditDepth();
301 }
302
303 public int getMaxProducersToAudit() {
304 return destination.getMaxProducersToAudit();
305 }
306
307 public boolean isEnableAudit() {
308 return destination.isEnableAudit();
309 }
310
311 public void setEnableAudit(boolean enableAudit) {
312 destination.setEnableAudit(enableAudit);
313 }
314
315 public void setMaxAuditDepth(int maxAuditDepth) {
316 destination.setMaxAuditDepth(maxAuditDepth);
317 }
318
319 public void setMaxProducersToAudit(int maxProducersToAudit) {
320 destination.setMaxProducersToAudit(maxProducersToAudit);
321 }
322
323 public float getMemoryUsagePortion() {
324 return destination.getMemoryUsage().getUsagePortion();
325 }
326
327 public long getProducerCount() {
328 return destination.getDestinationStatistics().getProducers().getCount();
329 }
330
331 public boolean isProducerFlowControl() {
332 return destination.isProducerFlowControl();
333 }
334
335 public void setMemoryUsagePortion(float value) {
336 destination.getMemoryUsage().setUsagePortion(value);
337 }
338
339 public void setProducerFlowControl(boolean producerFlowControl) {
340 destination.setProducerFlowControl(producerFlowControl);
341 }
342
343 public boolean isAlwaysRetroactive() {
344 return destination.isAlwaysRetroactive();
345 }
346
347 public void setAlwaysRetroactive(boolean alwaysRetroactive) {
348 destination.setAlwaysRetroactive(alwaysRetroactive);
349 }
350
351 /**
352 * Set's the interval at which warnings about producers being blocked by
353 * resource usage will be triggered. Values of 0 or less will disable
354 * warnings
355 *
356 * @param blockedProducerWarningInterval the interval at which warning about
357 * blocked producers will be triggered.
358 */
359 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
360 destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
361 }
362
363 /**
364 *
365 * @return the interval at which warning about blocked producers will be
366 * triggered.
367 */
368 public long getBlockedProducerWarningInterval() {
369 return destination.getBlockedProducerWarningInterval();
370 }
371
372 public int getMaxPageSize() {
373 return destination.getMaxPageSize();
374 }
375
376 public void setMaxPageSize(int pageSize) {
377 destination.setMaxPageSize(pageSize);
378 }
379
380 public boolean isUseCache() {
381 return destination.isUseCache();
382 }
383
384 public void setUseCache(boolean value) {
385 destination.setUseCache(value);
386 }
387
388 public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
389 List<Subscription> subscriptions = destination.getConsumers();
390 ObjectName[] answer = new ObjectName[subscriptions.size()];
391 ObjectName objectName = broker.getBrokerService().getBrokerObjectName();
392 int index = 0;
393 for (Subscription subscription : subscriptions) {
394 String connectionClientId = subscription.getContext().getClientId();
395 String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription.getConsumerInfo(), connectionClientId, objectName);
396 answer[index++] = new ObjectName(objectNameStr);
397 }
398 return answer;
399 }
400
401 public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
402 ObjectName result = null;
403 SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
404 if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
405 result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
406 }
407 return result;
408 }
409
410 }