001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.util;
018
019 import org.apache.activemq.broker.Broker;
020 import org.apache.activemq.broker.ConnectionContext;
021 import org.apache.activemq.broker.ProducerBrokerExchange;
022 import org.apache.activemq.command.ActiveMQDestination;
023 import org.apache.activemq.command.Message;
024 import org.apache.activemq.command.ProducerInfo;
025 import org.apache.activemq.security.SecurityContext;
026 import org.apache.activemq.state.ProducerState;
027
028 /**
029 * Utility class for broker operations
030 *
031 */
032 public final class BrokerSupport {
033
034 private BrokerSupport() {
035 }
036
037 public static void resendNoCopy(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
038 doResend(context, originalMessage, deadLetterDestination, false);
039 }
040
041 /**
042 * @param context
043 * @param originalMessage
044 * @param deadLetterDestination
045 * @throws Exception
046 */
047 public static void resend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
048 doResend(context, originalMessage, deadLetterDestination, true);
049 }
050
051 public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception {
052 Message message = copy ? originalMessage.copy() : originalMessage;
053 message.setOriginalDestination(message.getDestination());
054 message.setOriginalTransactionId(message.getTransactionId());
055 message.setDestination(deadLetterDestination);
056 message.setTransactionId(null);
057 message.setMemoryUsage(null);
058 message.setRedeliveryCounter(0);
059 boolean originalFlowControl = context.isProducerFlowControl();
060 try {
061 context.setProducerFlowControl(false);
062 ProducerInfo info = new ProducerInfo();
063 ProducerState state = new ProducerState(info);
064 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
065 producerExchange.setProducerState(state);
066 producerExchange.setMutable(true);
067 producerExchange.setConnectionContext(context);
068 context.getBroker().send(producerExchange, message);
069 } finally {
070 context.setProducerFlowControl(originalFlowControl);
071 }
072 }
073
074 /**
075 * Returns the broker's administration connection context used for
076 * configuring the broker at startup
077 */
078 public static ConnectionContext getConnectionContext(Broker broker) {
079 ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
080 if (adminConnectionContext == null) {
081 adminConnectionContext = createAdminConnectionContext(broker);
082 broker.setAdminConnectionContext(adminConnectionContext);
083 }
084 return adminConnectionContext;
085 }
086
087 /**
088 * Factory method to create the new administration connection context
089 * object. Note this method is here rather than inside a default broker
090 * implementation to ensure that the broker reference inside it is the outer
091 * most interceptor
092 */
093 protected static ConnectionContext createAdminConnectionContext(Broker broker) {
094 ConnectionContext context = new ConnectionContext();
095 context.setBroker(broker);
096 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
097 return context;
098 }
099 }