001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.advisory;
018
019 import java.util.Iterator;
020 import java.util.Map;
021 import java.util.Set;
022 import java.util.concurrent.ConcurrentHashMap;
023 import org.apache.activemq.broker.Broker;
024 import org.apache.activemq.broker.BrokerFilter;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.broker.ProducerBrokerExchange;
027 import org.apache.activemq.broker.region.Destination;
028 import org.apache.activemq.broker.region.MessageReference;
029 import org.apache.activemq.broker.region.Subscription;
030 import org.apache.activemq.broker.region.TopicSubscription;
031 import org.apache.activemq.command.*;
032 import org.apache.activemq.security.SecurityContext;
033 import org.apache.activemq.state.ProducerState;
034 import org.apache.activemq.usage.Usage;
035 import org.apache.activemq.util.IdGenerator;
036 import org.apache.activemq.util.LongSequenceGenerator;
037 import org.slf4j.Logger;
038 import org.slf4j.LoggerFactory;
039
040 /**
041 * This broker filter handles tracking the state of the broker for purposes of
042 * publishing advisory messages to advisory consumers.
043 */
044 public class AdvisoryBroker extends BrokerFilter {
045
046 private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class);
047 private static final IdGenerator ID_GENERATOR = new IdGenerator();
048
049 protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
050 protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>();
051 protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
052 protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
053 protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
054 protected final ProducerId advisoryProducerId = new ProducerId();
055
056 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
057
058 public AdvisoryBroker(Broker next) {
059 super(next);
060 advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
061 }
062
063 @Override
064 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
065 super.addConnection(context, info);
066
067 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
068 //do not distribute usernames or passwords in advisory
069 ConnectionInfo copy = info.copy();
070 copy.setUserName("");
071 copy.setPassword("");
072 fireAdvisory(context, topic, copy);
073 connections.put(copy.getConnectionId(), copy);
074 }
075
076 @Override
077 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
078 Subscription answer = super.addConsumer(context, info);
079
080 // Don't advise advisory topics.
081 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
082 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
083 consumers.put(info.getConsumerId(), info);
084 fireConsumerAdvisory(context, info.getDestination(), topic, info);
085 } else {
086 // We need to replay all the previously collected state objects
087 // for this newly added consumer.
088 if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
089 // Replay the connections.
090 for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) {
091 ConnectionInfo value = iter.next();
092 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
093 fireAdvisory(context, topic, value, info.getConsumerId());
094 }
095 }
096
097 // We check here whether the Destination is Temporary Destination specific or not since we
098 // can avoid sending advisory messages to the consumer if it only wants Temporary Destination
099 // notifications. If its not just temporary destination related destinations then we have
100 // to send them all, a composite destination could want both.
101 if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
102 // Replay the temporary destinations.
103 for (DestinationInfo destination : destinations.values()) {
104 if (destination.getDestination().isTemporary()) {
105 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
106 fireAdvisory(context, topic, destination, info.getConsumerId());
107 }
108 }
109 } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
110 // Replay all the destinations.
111 for (DestinationInfo destination : destinations.values()) {
112 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
113 fireAdvisory(context, topic, destination, info.getConsumerId());
114 }
115 }
116
117 // Replay the producers.
118 if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
119 for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
120 ProducerInfo value = iter.next();
121 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
122 fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
123 }
124 }
125
126 // Replay the consumers.
127 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
128 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) {
129 ConsumerInfo value = iter.next();
130 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
131 fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
132 }
133 }
134
135 // Replay network bridges
136 if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
137 for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext();) {
138 BrokerInfo key = iter.next();
139 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
140 fireAdvisory(context, topic, key, null, networkBridges.get(key));
141 }
142 }
143 }
144 return answer;
145 }
146
147 @Override
148 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
149 super.addProducer(context, info);
150
151 // Don't advise advisory topics.
152 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
153 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
154 fireProducerAdvisory(context, info.getDestination(), topic, info);
155 producers.put(info.getProducerId(), info);
156 }
157 }
158
159 @Override
160 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
161 Destination answer = super.addDestination(context, destination,create);
162 if (!AdvisorySupport.isAdvisoryTopic(destination)) {
163 DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
164 DestinationInfo previous = destinations.putIfAbsent(destination, info);
165 if( previous==null ) {
166 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
167 fireAdvisory(context, topic, info);
168 }
169 }
170 return answer;
171 }
172
173 @Override
174 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
175 ActiveMQDestination destination = info.getDestination();
176 next.addDestinationInfo(context, info);
177
178 if (!AdvisorySupport.isAdvisoryTopic(destination)) {
179 DestinationInfo previous = destinations.putIfAbsent(destination, info);
180 if( previous==null ) {
181 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
182 fireAdvisory(context, topic, info);
183 }
184 }
185 }
186
187 @Override
188 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
189 super.removeDestination(context, destination, timeout);
190 DestinationInfo info = destinations.remove(destination);
191 if (info != null) {
192 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
193 info = info.copy();
194 info.setDestination(destination);
195 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
196 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
197 fireAdvisory(context, topic, info);
198 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination);
199 for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
200 try {
201 next.removeDestination(context, advisoryDestination, -1);
202 } catch (Exception expectedIfDestinationDidNotExistYet) {
203 }
204 }
205 }
206 }
207
208 @Override
209 public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
210 super.removeDestinationInfo(context, destInfo);
211 DestinationInfo info = destinations.remove(destInfo.getDestination());
212 if (info != null) {
213 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
214 info = info.copy();
215 info.setDestination(destInfo.getDestination());
216 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
217 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
218 fireAdvisory(context, topic, info);
219 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination());
220 for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
221 try {
222 next.removeDestination(context, advisoryDestination, -1);
223 } catch (Exception expectedIfDestinationDidNotExistYet) {
224 }
225 }
226 }
227 }
228
229 @Override
230 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
231 super.removeConnection(context, info, error);
232
233 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
234 fireAdvisory(context, topic, info.createRemoveCommand());
235 connections.remove(info.getConnectionId());
236 }
237
238 @Override
239 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
240 super.removeConsumer(context, info);
241
242 // Don't advise advisory topics.
243 ActiveMQDestination dest = info.getDestination();
244 if (!AdvisorySupport.isAdvisoryTopic(dest)) {
245 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
246 consumers.remove(info.getConsumerId());
247 if (!dest.isTemporary() || destinations.containsKey(dest)) {
248 fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
249 }
250 }
251 }
252
253 @Override
254 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
255 super.removeProducer(context, info);
256
257 // Don't advise advisory topics.
258 ActiveMQDestination dest = info.getDestination();
259 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
260 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
261 producers.remove(info.getProducerId());
262 if (!dest.isTemporary() || destinations.contains(dest)) {
263 fireProducerAdvisory(context, dest,topic, info.createRemoveCommand());
264 }
265 }
266 }
267
268 @Override
269 public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
270 super.messageExpired(context, messageReference, subscription);
271 try {
272 if(!messageReference.isAdvisory()) {
273 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
274 Message payload = messageReference.getMessage().copy();
275 payload.clearBody();
276 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
277 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
278 fireAdvisory(context, topic, payload, null, advisoryMessage);
279 }
280 } catch (Exception e) {
281 handleFireFailure("expired", e);
282 }
283 }
284
285 @Override
286 public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
287 super.messageConsumed(context, messageReference);
288 try {
289 if(!messageReference.isAdvisory()) {
290 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
291 Message payload = messageReference.getMessage().copy();
292 payload.clearBody();
293 fireAdvisory(context, topic,payload);
294 }
295 } catch (Exception e) {
296 handleFireFailure("consumed", e);
297 }
298 }
299
300 @Override
301 public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
302 super.messageDelivered(context, messageReference);
303 try {
304 if (!messageReference.isAdvisory()) {
305 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
306 Message payload = messageReference.getMessage().copy();
307 payload.clearBody();
308 fireAdvisory(context, topic,payload);
309 }
310 } catch (Exception e) {
311 handleFireFailure("delivered", e);
312 }
313 }
314
315 @Override
316 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
317 super.messageDiscarded(context, sub, messageReference);
318 try {
319 if (!messageReference.isAdvisory()) {
320 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
321 Message payload = messageReference.getMessage().copy();
322 payload.clearBody();
323 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
324 if (sub instanceof TopicSubscription) {
325 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded());
326 }
327 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
328 fireAdvisory(context, topic, payload, null, advisoryMessage);
329 }
330 } catch (Exception e) {
331 handleFireFailure("discarded", e);
332 }
333 }
334
335 @Override
336 public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
337 super.slowConsumer(context, destination,subs);
338 try {
339 ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
340 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
341 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
342 fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
343 } catch (Exception e) {
344 handleFireFailure("slow consumer", e);
345 }
346 }
347
348 @Override
349 public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
350 super.fastProducer(context, producerInfo);
351 try {
352 ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
353 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
354 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
355 fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
356 } catch (Exception e) {
357 handleFireFailure("fast producer", e);
358 }
359 }
360
361 @Override
362 public void isFull(ConnectionContext context, Destination destination, Usage usage) {
363 super.isFull(context, destination, usage);
364 if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) {
365 try {
366
367 ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination());
368 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
369 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName());
370 fireAdvisory(context, topic, null, null, advisoryMessage);
371
372 } catch (Exception e) {
373 handleFireFailure("is full", e);
374 }
375 }
376 }
377
378 @Override
379 public void nowMasterBroker() {
380 super.nowMasterBroker();
381 try {
382 ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
383 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
384 ConnectionContext context = new ConnectionContext();
385 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
386 context.setBroker(getBrokerService().getBroker());
387 fireAdvisory(context, topic,null,null,advisoryMessage);
388 } catch (Exception e) {
389 handleFireFailure("now master broker", e);
390 }
391 }
392
393 @Override
394 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
395 Subscription subscription){
396 super.sendToDeadLetterQueue(context, messageReference, subscription);
397 try {
398 if(!messageReference.isAdvisory()) {
399 ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
400 Message payload = messageReference.getMessage().copy();
401 payload.clearBody();
402 fireAdvisory(context, topic,payload);
403 }
404 } catch (Exception e) {
405 handleFireFailure("add to DLQ", e);
406 }
407 }
408
409 @Override
410 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
411 try {
412 if (brokerInfo != null) {
413 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
414 advisoryMessage.setBooleanProperty("started", true);
415 advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
416 advisoryMessage.setStringProperty("remoteIp", remoteIp);
417 networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
418
419 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
420
421 ConnectionContext context = new ConnectionContext();
422 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
423 context.setBroker(getBrokerService().getBroker());
424 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
425 }
426 } catch (Exception e) {
427 handleFireFailure("network bridge started", e);
428 }
429 }
430
431 @Override
432 public void networkBridgeStopped(BrokerInfo brokerInfo) {
433 try {
434 if (brokerInfo != null) {
435 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
436 advisoryMessage.setBooleanProperty("started", false);
437 networkBridges.remove(brokerInfo);
438
439 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
440
441 ConnectionContext context = new ConnectionContext();
442 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
443 context.setBroker(getBrokerService().getBroker());
444 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
445 }
446 } catch (Exception e) {
447 handleFireFailure("network bridge stopped", e);
448 }
449 }
450
451 private void handleFireFailure(String message, Throwable cause) {
452 LOG.warn("Failed to fire " + message + " advisory, reason: " + cause);
453 if (LOG.isDebugEnabled()) {
454 LOG.debug(message + " detail", cause);
455 }
456 }
457
458 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
459 fireAdvisory(context, topic, command, null);
460 }
461
462 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
463 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
464 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
465 }
466
467 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
468 fireConsumerAdvisory(context, consumerDestination,topic, command, null);
469 }
470
471 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
472 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
473 int count = 0;
474 Set<Destination>set = getDestinations(consumerDestination);
475 if (set != null) {
476 for (Destination dest:set) {
477 count += dest.getDestinationStatistics().getConsumers().getCount();
478 }
479 }
480 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count);
481
482 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
483 }
484
485 protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
486 fireProducerAdvisory(context,producerDestination, topic, command, null);
487 }
488
489 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
490 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
491 int count = 0;
492 if (producerDestination != null) {
493 Set<Destination> set = getDestinations(producerDestination);
494 if (set != null) {
495 for (Destination dest : set) {
496 count += dest.getDestinationStatistics().getProducers().getCount();
497 }
498 }
499 }
500 advisoryMessage.setIntProperty("producerCount", count);
501 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
502 }
503
504 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
505 if (getBrokerService().isStarted()) {
506 //set properties
507 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
508 String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
509 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
510
511 String url = getBrokerService().getVmConnectorURI().toString();
512 if (getBrokerService().getDefaultSocketURIString() != null) {
513 url = getBrokerService().getDefaultSocketURIString();
514 }
515 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
516
517 //set the data structure
518 advisoryMessage.setDataStructure(command);
519 advisoryMessage.setPersistent(false);
520 advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
521 advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
522 advisoryMessage.setTargetConsumerId(targetConsumerId);
523 advisoryMessage.setDestination(topic);
524 advisoryMessage.setResponseRequired(false);
525 advisoryMessage.setProducerId(advisoryProducerId);
526 boolean originalFlowControl = context.isProducerFlowControl();
527 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
528 producerExchange.setConnectionContext(context);
529 producerExchange.setMutable(true);
530 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
531 try {
532 context.setProducerFlowControl(false);
533 next.send(producerExchange, advisoryMessage);
534 } finally {
535 context.setProducerFlowControl(originalFlowControl);
536 }
537 }
538 }
539
540 public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
541 return connections;
542 }
543
544 public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() {
545 return consumers;
546 }
547
548 public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {
549 return producers;
550 }
551
552 public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() {
553 return destinations;
554 }
555 }