001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.ft;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.List;
023 import java.util.concurrent.atomic.AtomicBoolean;
024
025 import org.apache.activemq.Service;
026 import org.apache.activemq.broker.BrokerService;
027 import org.apache.activemq.broker.BrokerServiceAware;
028 import org.apache.activemq.broker.TransportConnector;
029 import org.apache.activemq.command.BrokerInfo;
030 import org.apache.activemq.command.Command;
031 import org.apache.activemq.command.CommandTypes;
032 import org.apache.activemq.command.ConnectionId;
033 import org.apache.activemq.command.ConnectionInfo;
034 import org.apache.activemq.command.MessageDispatch;
035 import org.apache.activemq.command.ProducerInfo;
036 import org.apache.activemq.command.Response;
037 import org.apache.activemq.command.SessionInfo;
038 import org.apache.activemq.command.ShutdownInfo;
039 import org.apache.activemq.transport.DefaultTransportListener;
040 import org.apache.activemq.transport.Transport;
041 import org.apache.activemq.transport.TransportDisposedIOException;
042 import org.apache.activemq.transport.TransportFactory;
043 import org.apache.activemq.util.IdGenerator;
044 import org.apache.activemq.util.ServiceStopper;
045 import org.apache.activemq.util.ServiceSupport;
046 import org.slf4j.Logger;
047 import org.slf4j.LoggerFactory;
048
049 /**
050 * Connects a Slave Broker to a Master when using <a
051 * href="http://activemq.apache.org/masterslave.html">Master Slave</a> for High
052 * Availability of messages.
053 *
054 * @org.apache.xbean.XBean
055 *
056 */
057 public class MasterConnector implements Service, BrokerServiceAware {
058
059 private static final Logger LOG = LoggerFactory.getLogger(MasterConnector.class);
060 private BrokerService broker;
061 private URI remoteURI;
062 private URI localURI;
063 private Transport localBroker;
064 private Transport remoteBroker;
065 private TransportConnector connector;
066 private AtomicBoolean started = new AtomicBoolean(false);
067 private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false);
068 private final IdGenerator idGenerator = new IdGenerator();
069 private String userName;
070 private String password;
071 private ConnectionInfo connectionInfo;
072 private SessionInfo sessionInfo;
073 private ProducerInfo producerInfo;
074 private final AtomicBoolean masterActive = new AtomicBoolean();
075 private BrokerInfo brokerInfo;
076 private boolean firstConnection=true;
077 private boolean failedToStart;
078
079 public MasterConnector() {
080 }
081
082 public MasterConnector(String remoteUri) throws URISyntaxException {
083 remoteURI = new URI(remoteUri);
084 }
085
086 public void setBrokerService(BrokerService broker) {
087 this.broker = broker;
088 if (localURI == null) {
089 localURI = broker.getVmConnectorURI();
090 }
091 if (connector == null) {
092 List transportConnectors = broker.getTransportConnectors();
093 if (!transportConnectors.isEmpty()) {
094 connector = (TransportConnector)transportConnectors.get(0);
095 }
096 }
097 }
098
099 public boolean isSlave() {
100 return masterActive.get();
101 }
102
103 protected void restartBridge() throws Exception {
104 localBroker.oneway(connectionInfo);
105 remoteBroker.oneway(connectionInfo);
106 localBroker.oneway(sessionInfo);
107 remoteBroker.oneway(sessionInfo);
108 remoteBroker.oneway(producerInfo);
109 remoteBroker.oneway(brokerInfo);
110 }
111
112 public void start() throws Exception {
113 if (!started.compareAndSet(false, true)) {
114 return;
115 }
116 if (remoteURI == null) {
117 throw new IllegalArgumentException("You must specify a remoteURI");
118 }
119 localBroker = TransportFactory.connect(localURI);
120 remoteBroker = TransportFactory.connect(remoteURI);
121 LOG.info("Starting a slave connection between " + localBroker + " and " + remoteBroker);
122 localBroker.setTransportListener(new DefaultTransportListener() {
123
124 public void onCommand(Object command) {
125 }
126
127 public void onException(IOException error) {
128 if (started.get()) {
129 serviceLocalException(error);
130 }
131 }
132 });
133 remoteBroker.setTransportListener(new DefaultTransportListener() {
134
135 public void onCommand(Object o) {
136 Command command = (Command)o;
137 if (started.get()) {
138 serviceRemoteCommand(command);
139 }
140 }
141
142 public void onException(IOException error) {
143 if (started.get()) {
144 serviceRemoteException(error);
145 }
146 }
147
148 public void transportResumed() {
149 try{
150 if(!firstConnection){
151 localBroker = TransportFactory.connect(localURI);
152 localBroker.setTransportListener(new DefaultTransportListener() {
153
154 public void onCommand(Object command) {
155 }
156
157 public void onException(IOException error) {
158 if (started.get()) {
159 serviceLocalException(error);
160 }
161 }
162 });
163 localBroker.start();
164 restartBridge();
165 LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been reestablished.");
166 }else{
167 firstConnection=false;
168 }
169 }catch(IOException e){
170 LOG.error("MasterConnector failed to send BrokerInfo in transportResumed:", e);
171 }catch(Exception e){
172 LOG.error("MasterConnector failed to restart localBroker in transportResumed:", e);
173 }
174
175 }
176 });
177 try {
178 localBroker.start();
179 remoteBroker.start();
180 startBridge();
181 masterActive.set(true);
182 } catch (Exception e) {
183 masterActive.set(false);
184 if(!stoppedBeforeStart.get()){
185 LOG.error("Failed to start network bridge: " + e, e);
186 }else{
187 LOG.info("Slave stopped before connected to the master.");
188 }
189 setFailedToStart(true);
190 }
191 }
192
193 protected void startBridge() throws Exception {
194 connectionInfo = new ConnectionInfo();
195 connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
196 connectionInfo.setClientId(idGenerator.generateId());
197 connectionInfo.setUserName(userName);
198 connectionInfo.setPassword(password);
199 connectionInfo.setBrokerMasterConnector(true);
200 sessionInfo = new SessionInfo(connectionInfo, 1);
201 producerInfo = new ProducerInfo(sessionInfo, 1);
202 producerInfo.setResponseRequired(false);
203 if (connector != null) {
204 brokerInfo = connector.getBrokerInfo();
205 } else {
206 brokerInfo = new BrokerInfo();
207 }
208 brokerInfo.setBrokerName(broker.getBrokerName());
209 brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
210 brokerInfo.setSlaveBroker(true);
211 brokerInfo.setPassiveSlave(broker.isPassiveSlave());
212 restartBridge();
213 LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
214 }
215
216 public void stop() throws Exception {
217 if (!started.compareAndSet(true, false)||!masterActive.get()) {
218 return;
219 }
220 masterActive.set(false);
221 try {
222 // if (connectionInfo!=null){
223 // localBroker.request(connectionInfo.createRemoveCommand());
224 // }
225 // localBroker.setTransportListener(null);
226 // remoteBroker.setTransportListener(null);
227 remoteBroker.oneway(new ShutdownInfo());
228 localBroker.oneway(new ShutdownInfo());
229 } catch (IOException e) {
230 LOG.debug("Caught exception stopping", e);
231 } finally {
232 ServiceStopper ss = new ServiceStopper();
233 ss.stop(localBroker);
234 ss.stop(remoteBroker);
235 ss.throwFirstException();
236 }
237 }
238
239 public void stopBeforeConnected()throws Exception{
240 masterActive.set(false);
241 started.set(false);
242 stoppedBeforeStart.set(true);
243 ServiceStopper ss = new ServiceStopper();
244 ss.stop(localBroker);
245 ss.stop(remoteBroker);
246 }
247
248 protected void serviceRemoteException(IOException error) {
249 LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
250 shutDown();
251 }
252
253 protected void serviceRemoteCommand(Command command) {
254 try {
255 if (command.isMessageDispatch()) {
256 MessageDispatch md = (MessageDispatch)command;
257 command = md.getMessage();
258 }
259 if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) {
260 LOG.warn("The Master has shutdown");
261 shutDown();
262 } else {
263 boolean responseRequired = command.isResponseRequired();
264 int commandId = command.getCommandId();
265 if (responseRequired) {
266 Response response = (Response)localBroker.request(command);
267 response.setCorrelationId(commandId);
268 remoteBroker.oneway(response);
269 } else {
270 localBroker.oneway(command);
271 }
272 }
273 } catch (IOException e) {
274 serviceRemoteException(e);
275 }
276 }
277
278 protected void serviceLocalException(Throwable error) {
279 if (!(error instanceof TransportDisposedIOException) || localBroker.isDisposed()){
280 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
281 ServiceSupport.dispose(this);
282 }else{
283 LOG.info(error.getMessage());
284 }
285 }
286
287 /**
288 * @return Returns the localURI.
289 */
290 public URI getLocalURI() {
291 return localURI;
292 }
293
294 /**
295 * @param localURI The localURI to set.
296 */
297 public void setLocalURI(URI localURI) {
298 this.localURI = localURI;
299 }
300
301 /**
302 * @return Returns the remoteURI.
303 */
304 public URI getRemoteURI() {
305 return remoteURI;
306 }
307
308 /**
309 * @param remoteURI The remoteURI to set.
310 */
311 public void setRemoteURI(URI remoteURI) {
312 this.remoteURI = remoteURI;
313 }
314
315 /**
316 * @return Returns the password.
317 */
318 public String getPassword() {
319 return password;
320 }
321
322 /**
323 * @param password The password to set.
324 */
325 public void setPassword(String password) {
326 this.password = password;
327 }
328
329 /**
330 * @return Returns the userName.
331 */
332 public String getUserName() {
333 return userName;
334 }
335
336 /**
337 * @param userName The userName to set.
338 */
339 public void setUserName(String userName) {
340 this.userName = userName;
341 }
342
343 private void shutDown() {
344 masterActive.set(false);
345 broker.masterFailed();
346 ServiceSupport.dispose(this);
347 }
348
349 public boolean isStoppedBeforeStart() {
350 return stoppedBeforeStart.get();
351 }
352
353 /**
354 * Get the failedToStart
355 * @return the failedToStart
356 */
357 public boolean isFailedToStart() {
358 return this.failedToStart;
359 }
360
361 /**
362 * Set the failedToStart
363 * @param failedToStart the failedToStart to set
364 */
365 public void setFailedToStart(boolean failedToStart) {
366 this.failedToStart = failedToStart;
367 }
368
369 }