001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.jmx;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Hashtable;
023 import java.util.Iterator;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027 import java.util.Set;
028 import java.util.concurrent.ConcurrentHashMap;
029 import java.util.concurrent.CopyOnWriteArraySet;
030 import java.util.concurrent.ThreadPoolExecutor;
031
032 import javax.management.InstanceNotFoundException;
033 import javax.management.MalformedObjectNameException;
034 import javax.management.ObjectName;
035 import javax.management.openmbean.CompositeData;
036 import javax.management.openmbean.CompositeDataSupport;
037 import javax.management.openmbean.CompositeType;
038 import javax.management.openmbean.OpenDataException;
039 import javax.management.openmbean.TabularData;
040 import javax.management.openmbean.TabularDataSupport;
041 import javax.management.openmbean.TabularType;
042
043 import org.apache.activemq.broker.Broker;
044 import org.apache.activemq.broker.BrokerService;
045 import org.apache.activemq.broker.ConnectionContext;
046 import org.apache.activemq.broker.ProducerBrokerExchange;
047 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
048 import org.apache.activemq.broker.region.Destination;
049 import org.apache.activemq.broker.region.DestinationFactory;
050 import org.apache.activemq.broker.region.DestinationFactoryImpl;
051 import org.apache.activemq.broker.region.DestinationInterceptor;
052 import org.apache.activemq.broker.region.Queue;
053 import org.apache.activemq.broker.region.Region;
054 import org.apache.activemq.broker.region.RegionBroker;
055 import org.apache.activemq.broker.region.Subscription;
056 import org.apache.activemq.broker.region.Topic;
057 import org.apache.activemq.broker.region.TopicRegion;
058 import org.apache.activemq.broker.region.TopicSubscription;
059 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
060 import org.apache.activemq.command.ActiveMQDestination;
061 import org.apache.activemq.command.ActiveMQMessage;
062 import org.apache.activemq.command.ActiveMQTopic;
063 import org.apache.activemq.command.ConsumerInfo;
064 import org.apache.activemq.command.Message;
065 import org.apache.activemq.command.MessageId;
066 import org.apache.activemq.command.ProducerInfo;
067 import org.apache.activemq.command.SubscriptionInfo;
068 import org.apache.activemq.store.MessageRecoveryListener;
069 import org.apache.activemq.store.PersistenceAdapter;
070 import org.apache.activemq.store.TopicMessageStore;
071 import org.apache.activemq.thread.Scheduler;
072 import org.apache.activemq.thread.TaskRunnerFactory;
073 import org.apache.activemq.transaction.XATransaction;
074 import org.apache.activemq.usage.SystemUsage;
075 import org.apache.activemq.util.JMXSupport;
076 import org.apache.activemq.util.ServiceStopper;
077 import org.apache.activemq.util.SubscriptionKey;
078 import org.slf4j.Logger;
079 import org.slf4j.LoggerFactory;
080
081 public class ManagedRegionBroker extends RegionBroker {
082 private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
083 private final ManagementContext managementContext;
084 private final ObjectName brokerObjectName;
085 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
086 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
087 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
088 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
089 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
095 private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
096 private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097 private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098 private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099 private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
100 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
101 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
102 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
103 /* This is the first broker in the broker interceptor chain. */
104 private Broker contextBroker;
105
106 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
107 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
108 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
109 this.managementContext = context;
110 this.brokerObjectName = brokerObjectName;
111 }
112
113 @Override
114 public void start() throws Exception {
115 super.start();
116 // build all existing durable subscriptions
117 buildExistingSubscriptions();
118 }
119
120 @Override
121 protected void doStop(ServiceStopper stopper) {
122 super.doStop(stopper);
123 // lets remove any mbeans not yet removed
124 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
125 ObjectName name = iter.next();
126 try {
127 managementContext.unregisterMBean(name);
128 } catch (InstanceNotFoundException e) {
129 LOG.warn("The MBean: " + name + " is no longer registered with JMX");
130 } catch (Exception e) {
131 stopper.onException(this, e);
132 }
133 }
134 registeredMBeans.clear();
135 }
136
137 @Override
138 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
139 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
140 }
141
142 @Override
143 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
144 return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
145 }
146
147 @Override
148 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
149 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
150 }
151
152 @Override
153 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
154 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
155 }
156
157 public void register(ActiveMQDestination destName, Destination destination) {
158 // TODO refactor to allow views for custom destinations
159 try {
160 ObjectName objectName = createObjectName(destName);
161 DestinationView view;
162 if (destination instanceof Queue) {
163 view = new QueueView(this, (Queue)destination);
164 } else if (destination instanceof Topic) {
165 view = new TopicView(this, (Topic)destination);
166 } else {
167 view = null;
168 LOG.warn("JMX View is not supported for custom destination: " + destination);
169 }
170 if (view != null) {
171 registerDestination(objectName, destName, view);
172 }
173 } catch (Exception e) {
174 LOG.error("Failed to register destination " + destName, e);
175 }
176 }
177
178 public void unregister(ActiveMQDestination destName) {
179 try {
180 ObjectName objectName = createObjectName(destName);
181 unregisterDestination(objectName);
182 } catch (Exception e) {
183 LOG.error("Failed to unregister " + destName, e);
184 }
185 }
186
187 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
188 String connectionClientId = context.getClientId();
189 ObjectName brokerJmxObjectName = brokerObjectName;
190 String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName);
191 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
192 try {
193 ObjectName objectName = new ObjectName(objectNameStr);
194 SubscriptionView view;
195 if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
196 // add offline subscribers to inactive list
197 SubscriptionInfo info = new SubscriptionInfo();
198 info.setClientId(context.getClientId());
199 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
200 info.setDestination(sub.getConsumerInfo().getDestination());
201 info.setSelector(sub.getSelector());
202 addInactiveSubscription(key, info, sub);
203 } else {
204 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
205 if (sub.getConsumerInfo().isDurable()) {
206 view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
207 } else {
208 if (sub instanceof TopicSubscription) {
209 view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
210 } else {
211 view = new SubscriptionView(context.getClientId(), userName, sub);
212 }
213 }
214 registerSubscription(objectName, sub.getConsumerInfo(), key, view);
215 }
216 subscriptionMap.put(sub, objectName);
217 return objectName;
218 } catch (Exception e) {
219 LOG.error("Failed to register subscription " + sub, e);
220 return null;
221 }
222 }
223
224 public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) {
225 Hashtable<String, String> map = brokerJmxObjectName.getKeyPropertyList();
226 String brokerDomain = brokerJmxObjectName.getDomain();
227 String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
228 String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString();
229 String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName());
230 String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
231 String persistentMode = "persistentMode=";
232 String consumerId = "";
233 if (info.isDurable()) {
234 persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName());
235 } else {
236 persistentMode += "Non-Durable";
237 if (info.getConsumerId() != null) {
238 consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
239 }
240 }
241 objectNameStr += persistentMode + ",";
242 objectNameStr += destinationType + ",";
243 objectNameStr += destinationName + ",";
244 objectNameStr += clientId;
245 objectNameStr += consumerId;
246 return objectNameStr;
247 }
248
249 @Override
250 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
251 Subscription sub = super.addConsumer(context, info);
252 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
253 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
254 if (inactiveName != null) {
255 // if it was inactive, register it
256 registerSubscription(context, sub);
257 }
258 return sub;
259 }
260
261 @Override
262 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
263 for (Subscription sub : subscriptionMap.keySet()) {
264 if (sub.getConsumerInfo().equals(info)) {
265 // unregister all consumer subs
266 unregisterSubscription(subscriptionMap.get(sub), true);
267 }
268 }
269 super.removeConsumer(context, info);
270 }
271
272 @Override
273 public void addProducer(ConnectionContext context, ProducerInfo info)
274 throws Exception {
275 super.addProducer(context, info);
276 String connectionClientId = context.getClientId();
277 ObjectName objectName = createObjectName(info, connectionClientId);
278 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
279 ProducerView view = new ProducerView(info, connectionClientId, userName, this);
280 registerProducer(objectName, info.getDestination(), view);
281 }
282
283 @Override
284 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
285 ObjectName objectName = createObjectName(info, context.getClientId());
286 unregisterProducer(objectName);
287 super.removeProducer(context, info);
288 }
289
290 @Override
291 public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
292 if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
293 ProducerInfo info = exchange.getProducerState().getInfo();
294 if (info.getDestination() == null && info.getProducerId() != null) {
295 ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId());
296 ProducerView view = this.dynamicDestinationProducers.get(objectName);
297 if (view != null) {
298 ActiveMQDestination dest = message.getDestination();
299 if (dest != null) {
300 view.setLastUsedDestinationName(dest);
301 }
302 }
303 }
304 }
305 super.send(exchange, message);
306 }
307
308 public void unregisterSubscription(Subscription sub) {
309 ObjectName name = subscriptionMap.remove(sub);
310 if (name != null) {
311 try {
312 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
313 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
314 if (inactiveName != null) {
315 inactiveDurableTopicSubscribers.remove(inactiveName);
316 managementContext.unregisterMBean(inactiveName);
317 }
318 } catch (Exception e) {
319 LOG.error("Failed to unregister subscription " + sub, e);
320 }
321 }
322 }
323
324 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
325 if (dest.isQueue()) {
326 if (dest.isTemporary()) {
327 temporaryQueues.put(key, view);
328 } else {
329 queues.put(key, view);
330 }
331 } else {
332 if (dest.isTemporary()) {
333 temporaryTopics.put(key, view);
334 } else {
335 topics.put(key, view);
336 }
337 }
338 try {
339 AnnotatedMBean.registerMBean(managementContext, view, key);
340 registeredMBeans.add(key);
341 } catch (Throwable e) {
342 LOG.warn("Failed to register MBean: " + key);
343 LOG.debug("Failure reason: " + e, e);
344 }
345 }
346
347 protected void unregisterDestination(ObjectName key) throws Exception {
348
349 DestinationView view = removeAndRemember(topics, key, null);
350 view = removeAndRemember(queues, key, view);
351 view = removeAndRemember(temporaryQueues, key, view);
352 view = removeAndRemember(temporaryTopics, key, view);
353 if (registeredMBeans.remove(key)) {
354 try {
355 managementContext.unregisterMBean(key);
356 } catch (Throwable e) {
357 LOG.warn("Failed to unregister MBean: " + key);
358 LOG.debug("Failure reason: " + e, e);
359 }
360 }
361 if (view != null) {
362 key = view.getSlowConsumerStrategy();
363 if (key!= null && registeredMBeans.remove(key)) {
364 try {
365 managementContext.unregisterMBean(key);
366 } catch (Throwable e) {
367 LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
368 LOG.debug("Failure reason: " + e, e);
369 }
370 }
371 }
372 }
373
374 protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
375
376 if (dest != null) {
377 if (dest.isQueue()) {
378 if (dest.isTemporary()) {
379 temporaryQueueProducers.put(key, view);
380 } else {
381 queueProducers.put(key, view);
382 }
383 } else {
384 if (dest.isTemporary()) {
385 temporaryTopicProducers.put(key, view);
386 } else {
387 topicProducers.put(key, view);
388 }
389 }
390 } else {
391 dynamicDestinationProducers.put(key, view);
392 }
393
394 try {
395 AnnotatedMBean.registerMBean(managementContext, view, key);
396 registeredMBeans.add(key);
397 } catch (Throwable e) {
398 LOG.warn("Failed to register MBean: " + key);
399 LOG.debug("Failure reason: " + e, e);
400 }
401 }
402
403 protected void unregisterProducer(ObjectName key) throws Exception {
404 queueProducers.remove(key);
405 topicProducers.remove(key);
406 temporaryQueueProducers.remove(key);
407 temporaryTopicProducers.remove(key);
408 dynamicDestinationProducers.remove(key);
409 if (registeredMBeans.remove(key)) {
410 try {
411 managementContext.unregisterMBean(key);
412 } catch (Throwable e) {
413 LOG.warn("Failed to unregister MBean: " + key);
414 LOG.debug("Failure reason: " + e, e);
415 }
416 }
417 }
418
419 private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
420 DestinationView candidate = map.remove(key);
421 if (candidate != null && view == null) {
422 view = candidate;
423 }
424 return candidate != null ? candidate : view;
425 }
426
427 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
428 ActiveMQDestination dest = info.getDestination();
429 if (dest.isQueue()) {
430 if (dest.isTemporary()) {
431 temporaryQueueSubscribers.put(key, view);
432 } else {
433 queueSubscribers.put(key, view);
434 }
435 } else {
436 if (dest.isTemporary()) {
437 temporaryTopicSubscribers.put(key, view);
438 } else {
439 if (info.isDurable()) {
440 durableTopicSubscribers.put(key, view);
441 // unregister any inactive durable subs
442 try {
443 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
444 if (inactiveName != null) {
445 inactiveDurableTopicSubscribers.remove(inactiveName);
446 registeredMBeans.remove(inactiveName);
447 managementContext.unregisterMBean(inactiveName);
448 }
449 } catch (Throwable e) {
450 LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
451 }
452 } else {
453 topicSubscribers.put(key, view);
454 }
455 }
456 }
457
458 try {
459 AnnotatedMBean.registerMBean(managementContext, view, key);
460 registeredMBeans.add(key);
461 } catch (Throwable e) {
462 LOG.warn("Failed to register MBean: " + key);
463 LOG.debug("Failure reason: " + e, e);
464 }
465
466 }
467
468 protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
469 queueSubscribers.remove(key);
470 topicSubscribers.remove(key);
471 temporaryQueueSubscribers.remove(key);
472 temporaryTopicSubscribers.remove(key);
473 if (registeredMBeans.remove(key)) {
474 try {
475 managementContext.unregisterMBean(key);
476 } catch (Throwable e) {
477 LOG.warn("Failed to unregister MBean: " + key);
478 LOG.debug("Failure reason: " + e, e);
479 }
480 }
481 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
482 if (view != null) {
483 // need to put this back in the inactive list
484 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
485 if (addToInactive) {
486 SubscriptionInfo info = new SubscriptionInfo();
487 info.setClientId(subscriptionKey.getClientId());
488 info.setSubscriptionName(subscriptionKey.getSubscriptionName());
489 info.setDestination(new ActiveMQTopic(view.getDestinationName()));
490 info.setSelector(view.getSelector());
491 addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
492 }
493 }
494 }
495
496 protected void buildExistingSubscriptions() throws Exception {
497 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
498 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
499 if (destinations != null) {
500 for (ActiveMQDestination dest : destinations) {
501 if (dest.isTopic()) {
502 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
503 if (infos != null) {
504 for (int i = 0; i < infos.length; i++) {
505 SubscriptionInfo info = infos[i];
506 SubscriptionKey key = new SubscriptionKey(info);
507 if (!alreadyKnown(key)) {
508 LOG.debug("Restoring durable subscription mbean: " + info);
509 subscriptions.put(key, info);
510 }
511 }
512 }
513 }
514 }
515 }
516
517 for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
518 addInactiveSubscription(entry.getKey(), entry.getValue(), null);
519 }
520 }
521
522 private boolean alreadyKnown(SubscriptionKey key) {
523 boolean known = false;
524 known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
525 if (LOG.isTraceEnabled()) {
526 LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") + " already registered");
527 }
528 return known;
529 }
530
531 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
532 try {
533 ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
534 ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName));
535 SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
536
537 try {
538 AnnotatedMBean.registerMBean(managementContext, view, objectName);
539 registeredMBeans.add(objectName);
540 } catch (Throwable e) {
541 LOG.warn("Failed to register MBean: " + key);
542 LOG.debug("Failure reason: " + e, e);
543 }
544
545 inactiveDurableTopicSubscribers.put(objectName, view);
546 subscriptionKeys.put(key, objectName);
547 } catch (Exception e) {
548 LOG.error("Failed to register subscription " + info, e);
549 }
550 }
551
552 public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
553 List<Message> messages = getSubscriberMessages(view);
554 CompositeData c[] = new CompositeData[messages.size()];
555 for (int i = 0; i < c.length; i++) {
556 try {
557 c[i] = OpenTypeSupport.convert(messages.get(i));
558 } catch (Throwable e) {
559 LOG.error("failed to browse : " + view, e);
560 }
561 }
562 return c;
563 }
564
565 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
566 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
567 List<Message> messages = getSubscriberMessages(view);
568 CompositeType ct = factory.getCompositeType();
569 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
570 TabularDataSupport rc = new TabularDataSupport(tt);
571 for (int i = 0; i < messages.size(); i++) {
572 rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
573 }
574 return rc;
575 }
576
577 protected List<Message> getSubscriberMessages(SubscriptionView view) {
578 // TODO It is very dangerous operation for big backlogs
579 if (!(destinationFactory instanceof DestinationFactoryImpl)) {
580 throw new RuntimeException("unsupported by " + destinationFactory);
581 }
582 PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
583 final List<Message> result = new ArrayList<Message>();
584 try {
585 ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
586 TopicMessageStore store = adapter.createTopicMessageStore(topic);
587 store.recover(new MessageRecoveryListener() {
588 public boolean recoverMessage(Message message) throws Exception {
589 result.add(message);
590 return true;
591 }
592
593 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
594 throw new RuntimeException("Should not be called.");
595 }
596
597 public boolean hasSpace() {
598 return true;
599 }
600
601 public boolean isDuplicate(MessageId id) {
602 return false;
603 }
604 });
605 } catch (Throwable e) {
606 LOG.error("Failed to browse messages for Subscription " + view, e);
607 }
608 return result;
609
610 }
611
612 protected ObjectName[] getTopics() {
613 Set<ObjectName> set = topics.keySet();
614 return set.toArray(new ObjectName[set.size()]);
615 }
616
617 protected ObjectName[] getQueues() {
618 Set<ObjectName> set = queues.keySet();
619 return set.toArray(new ObjectName[set.size()]);
620 }
621
622 protected ObjectName[] getTemporaryTopics() {
623 Set<ObjectName> set = temporaryTopics.keySet();
624 return set.toArray(new ObjectName[set.size()]);
625 }
626
627 protected ObjectName[] getTemporaryQueues() {
628 Set<ObjectName> set = temporaryQueues.keySet();
629 return set.toArray(new ObjectName[set.size()]);
630 }
631
632 protected ObjectName[] getTopicSubscribers() {
633 Set<ObjectName> set = topicSubscribers.keySet();
634 return set.toArray(new ObjectName[set.size()]);
635 }
636
637 protected ObjectName[] getDurableTopicSubscribers() {
638 Set<ObjectName> set = durableTopicSubscribers.keySet();
639 return set.toArray(new ObjectName[set.size()]);
640 }
641
642 protected ObjectName[] getQueueSubscribers() {
643 Set<ObjectName> set = queueSubscribers.keySet();
644 return set.toArray(new ObjectName[set.size()]);
645 }
646
647 protected ObjectName[] getTemporaryTopicSubscribers() {
648 Set<ObjectName> set = temporaryTopicSubscribers.keySet();
649 return set.toArray(new ObjectName[set.size()]);
650 }
651
652 protected ObjectName[] getTemporaryQueueSubscribers() {
653 Set<ObjectName> set = temporaryQueueSubscribers.keySet();
654 return set.toArray(new ObjectName[set.size()]);
655 }
656
657 protected ObjectName[] getInactiveDurableTopicSubscribers() {
658 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
659 return set.toArray(new ObjectName[set.size()]);
660 }
661
662 protected ObjectName[] getTopicProducers() {
663 Set<ObjectName> set = topicProducers.keySet();
664 return set.toArray(new ObjectName[set.size()]);
665 }
666
667 protected ObjectName[] getQueueProducers() {
668 Set<ObjectName> set = queueProducers.keySet();
669 return set.toArray(new ObjectName[set.size()]);
670 }
671
672 protected ObjectName[] getTemporaryTopicProducers() {
673 Set<ObjectName> set = temporaryTopicProducers.keySet();
674 return set.toArray(new ObjectName[set.size()]);
675 }
676
677 protected ObjectName[] getTemporaryQueueProducers() {
678 Set<ObjectName> set = temporaryQueueProducers.keySet();
679 return set.toArray(new ObjectName[set.size()]);
680 }
681
682 protected ObjectName[] getDynamicDestinationProducers() {
683 Set<ObjectName> set = dynamicDestinationProducers.keySet();
684 return set.toArray(new ObjectName[set.size()]);
685 }
686
687 public Broker getContextBroker() {
688 return contextBroker;
689 }
690
691 public void setContextBroker(Broker contextBroker) {
692 this.contextBroker = contextBroker;
693 }
694
695 protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
696 // Build the object name for the destination
697 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
698 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
699 + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
700 + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
701 return objectName;
702 }
703
704 protected ObjectName createObjectName(ProducerInfo producerInfo, String connectionClientId) throws MalformedObjectNameException {
705 // Build the object name for the producer info
706 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
707
708 String destinationType = "destinationType=";
709 String destinationName = "destinationName=";
710
711 if (producerInfo.getDestination() == null) {
712 destinationType += "Dynamic";
713 destinationName = null;
714 } else {
715 destinationType += producerInfo.getDestination().getDestinationTypeAsString();
716 destinationName += JMXSupport.encodeObjectNamePart(producerInfo.getDestination().getPhysicalName());
717 }
718
719 String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
720 String producerId = "producerId=" + JMXSupport.encodeObjectNamePart(producerInfo.getProducerId().toString());
721
722 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
723 + "Type=Producer" + ","
724 + destinationType + ","
725 + (destinationName != null ? destinationName + "," : "")
726 + clientId + "," + producerId);
727 return objectName;
728 }
729
730 public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
731 ObjectName objectName = null;
732 try {
733 objectName = createObjectName(strategy);
734 if (!registeredMBeans.contains(objectName)) {
735 AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
736 AnnotatedMBean.registerMBean(managementContext, view, objectName);
737 registeredMBeans.add(objectName);
738 }
739 } catch (Exception e) {
740 LOG.warn("Failed to register MBean: " + strategy);
741 LOG.debug("Failure reason: " + e, e);
742 }
743 return objectName;
744 }
745
746 protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException {
747 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
748 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName")
749 + "," + "Type=RecoveredXaTransaction"
750 + "," + "Xid="
751 + JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString()));
752 return objectName;
753 }
754
755 public void registerRecoveredTransactionMBean(XATransaction transaction) {
756 try {
757 ObjectName objectName = createObjectName(transaction);
758 if (!registeredMBeans.contains(objectName)) {
759 RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
760 AnnotatedMBean.registerMBean(managementContext, view, objectName);
761 registeredMBeans.add(objectName);
762 }
763 } catch (Exception e) {
764 LOG.warn("Failed to register prepared transaction MBean: " + transaction);
765 LOG.debug("Failure reason: " + e, e);
766 }
767 }
768
769 public void unregister(XATransaction transaction) {
770 try {
771 ObjectName objectName = createObjectName(transaction);
772 if (registeredMBeans.remove(objectName)) {
773 try {
774 managementContext.unregisterMBean(objectName);
775 } catch (Throwable e) {
776 LOG.warn("Failed to unregister MBean: " + objectName);
777 LOG.debug("Failure reason: " + e, e);
778 }
779 }
780 } catch (Exception e) {
781 LOG.warn("Failed to create object name to unregister " + transaction, e);
782 }
783 }
784
785 private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
786 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
787 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
788 + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
789 return objectName;
790 }
791
792 public ObjectName getSubscriberObjectName(Subscription key) {
793 return subscriptionMap.get(key);
794 }
795
796 public Subscription getSubscriber(ObjectName key) {
797 Subscription sub = null;
798 for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
799 if (entry.getValue().equals(key)) {
800 sub = entry.getKey();
801 break;
802 }
803 }
804 return sub;
805 }
806 }