001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.util;
018
019 import javax.annotation.PostConstruct;
020 import javax.annotation.PreDestroy;
021 import javax.jms.Connection;
022 import javax.jms.ConnectionFactory;
023 import javax.jms.Destination;
024 import javax.jms.ExceptionListener;
025 import javax.jms.JMSException;
026 import javax.jms.MessageConsumer;
027 import javax.jms.Session;
028 import org.apache.activemq.ActiveMQConnectionFactory;
029 import org.apache.activemq.Service;
030 import org.apache.activemq.advisory.AdvisorySupport;
031 import org.apache.activemq.util.ServiceStopper;
032 import org.slf4j.Logger;
033 import org.slf4j.LoggerFactory;
034
035 /**
036 * An agent which listens to commands on a JMS destination
037 *
038 *
039 * @org.apache.xbean.XBean
040 */
041 public class CommandAgent implements Service, ExceptionListener {
042 private static final Logger LOG = LoggerFactory.getLogger(CommandAgent.class);
043
044 private String brokerUrl = "vm://localhost";
045 private String username;
046 private String password;
047 private ConnectionFactory connectionFactory;
048 private Connection connection;
049 private Destination commandDestination;
050 private CommandMessageListener listener;
051 private Session session;
052 private MessageConsumer consumer;
053
054 /**
055 *
056 * @throws Exception
057 * @org.apache.xbean.InitMethod
058 */
059 @PostConstruct
060 public void start() throws Exception {
061 session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
062 listener = new CommandMessageListener(session);
063 Destination destination = getCommandDestination();
064 if (LOG.isDebugEnabled()) {
065 LOG.debug("Agent subscribing to control destination: " + destination);
066 }
067 consumer = session.createConsumer(destination);
068 consumer.setMessageListener(listener);
069 }
070
071 /**
072 *
073 * @throws Exception
074 * @org.apache.xbean.DestroyMethod
075 */
076 @PreDestroy
077 public void stop() throws Exception {
078 ServiceStopper stopper = new ServiceStopper();
079 if (consumer != null) {
080 try {
081 consumer.close();
082 consumer = null;
083 } catch (JMSException e) {
084 stopper.onException(this, e);
085 }
086 }
087 if (session != null) {
088 try {
089 session.close();
090 session = null;
091 } catch (JMSException e) {
092 stopper.onException(this, e);
093 }
094 }
095 if (connection != null) {
096 try {
097 connection.close();
098 connection = null;
099 } catch (JMSException e) {
100 stopper.onException(this, e);
101 }
102 }
103 stopper.throwFirstException();
104 }
105
106 // Properties
107 // -------------------------------------------------------------------------
108 public String getBrokerUrl() {
109 return brokerUrl;
110 }
111
112 public void setBrokerUrl(String brokerUrl) {
113 this.brokerUrl = brokerUrl;
114 }
115
116 public String getUsername() {
117 return username;
118 }
119
120 public void setUsername(String username) {
121 this.username = username;
122 }
123
124 public String getPassword() {
125 return password;
126 }
127
128 public void setPassword(String password) {
129 this.password = password;
130 }
131
132 public ConnectionFactory getConnectionFactory() {
133 if (connectionFactory == null) {
134 connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
135 }
136 return connectionFactory;
137 }
138
139 public void setConnectionFactory(ConnectionFactory connectionFactory) {
140 this.connectionFactory = connectionFactory;
141 }
142
143 public Connection getConnection() throws JMSException {
144 if (connection == null) {
145 connection = createConnection();
146 connection.setExceptionListener(this);
147 connection.start();
148 }
149 return connection;
150 }
151
152 public void setConnection(Connection connection) {
153 this.connection = connection;
154 }
155
156 public Destination getCommandDestination() {
157 if (commandDestination == null) {
158 commandDestination = createCommandDestination();
159 }
160 return commandDestination;
161 }
162
163 public void setCommandDestination(Destination commandDestination) {
164 this.commandDestination = commandDestination;
165 }
166
167 protected Connection createConnection() throws JMSException {
168 return getConnectionFactory().createConnection(username, password);
169 }
170
171 protected Destination createCommandDestination() {
172 return AdvisorySupport.getAgentDestination();
173 }
174
175 public void onException(JMSException exception) {
176 try {
177 stop();
178 } catch (Exception e) {
179 }
180 }
181 }