001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.Iterator;
023 import java.util.LinkedList;
024 import java.util.StringTokenizer;
025 import java.util.concurrent.CopyOnWriteArrayList;
026 import java.util.regex.Pattern;
027
028 import javax.management.ObjectName;
029 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030 import org.apache.activemq.broker.jmx.ManagementContext;
031 import org.apache.activemq.broker.region.ConnectorStatistics;
032 import org.apache.activemq.command.BrokerInfo;
033 import org.apache.activemq.command.ConnectionControl;
034 import org.apache.activemq.security.MessageAuthorizationPolicy;
035 import org.apache.activemq.thread.DefaultThreadPools;
036 import org.apache.activemq.thread.TaskRunnerFactory;
037 import org.apache.activemq.transport.Transport;
038 import org.apache.activemq.transport.TransportAcceptListener;
039 import org.apache.activemq.transport.TransportFactory;
040 import org.apache.activemq.transport.TransportServer;
041 import org.apache.activemq.transport.discovery.DiscoveryAgent;
042 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
043 import org.apache.activemq.util.ServiceStopper;
044 import org.apache.activemq.util.ServiceSupport;
045 import org.slf4j.Logger;
046 import org.slf4j.LoggerFactory;
047
048 /**
049 * @org.apache.xbean.XBean
050 *
051 */
052 public class TransportConnector implements Connector, BrokerServiceAware {
053
054 final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
055
056 protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
057 protected TransportStatusDetector statusDector;
058 private BrokerService brokerService;
059 private TransportServer server;
060 private URI uri;
061 private BrokerInfo brokerInfo = new BrokerInfo();
062 private TaskRunnerFactory taskRunnerFactory;
063 private MessageAuthorizationPolicy messageAuthorizationPolicy;
064 private DiscoveryAgent discoveryAgent;
065 private final ConnectorStatistics statistics = new ConnectorStatistics();
066 private URI discoveryUri;
067 private URI connectUri;
068 private String name;
069 private boolean disableAsyncDispatch;
070 private boolean enableStatusMonitor = false;
071 private Broker broker;
072 private boolean updateClusterClients = false;
073 private boolean rebalanceClusterClients;
074 private boolean updateClusterClientsOnRemove = false;
075 private String updateClusterFilter;
076 private boolean auditNetworkProducers = false;
077 private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
078 private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE;
079
080 LinkedList<String> peerBrokers = new LinkedList<String>();
081
082 public TransportConnector() {
083 }
084
085 public TransportConnector(TransportServer server) {
086 this();
087 setServer(server);
088 if (server != null && server.getConnectURI() != null) {
089 URI uri = server.getConnectURI();
090 if (uri != null && uri.getScheme().equals("vm")) {
091 setEnableStatusMonitor(false);
092 }
093 }
094
095 }
096
097 /**
098 * @return Returns the connections.
099 */
100 public CopyOnWriteArrayList<TransportConnection> getConnections() {
101 return connections;
102 }
103
104 /**
105 * Factory method to create a JMX managed version of this transport
106 * connector
107 */
108 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName)
109 throws IOException, URISyntaxException {
110 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
111 rc.setBrokerInfo(getBrokerInfo());
112 rc.setConnectUri(getConnectUri());
113 rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
114 rc.setDiscoveryAgent(getDiscoveryAgent());
115 rc.setDiscoveryUri(getDiscoveryUri());
116 rc.setEnableStatusMonitor(isEnableStatusMonitor());
117 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
118 rc.setName(getName());
119 rc.setTaskRunnerFactory(getTaskRunnerFactory());
120 rc.setUri(getUri());
121 rc.setBrokerService(brokerService);
122 rc.setUpdateClusterClients(isUpdateClusterClients());
123 rc.setRebalanceClusterClients(isRebalanceClusterClients());
124 rc.setUpdateClusterFilter(getUpdateClusterFilter());
125 rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
126 rc.setAuditNetworkProducers(isAuditNetworkProducers());
127 rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
128 rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
129 return rc;
130 }
131
132 public BrokerInfo getBrokerInfo() {
133 return brokerInfo;
134 }
135
136 public void setBrokerInfo(BrokerInfo brokerInfo) {
137 this.brokerInfo = brokerInfo;
138 }
139
140 /**
141 *
142 * @deprecated use the {@link #setBrokerService(BrokerService)} method
143 * instead.
144 */
145 @Deprecated
146 public void setBrokerName(String name) {
147 if (this.brokerInfo == null) {
148 this.brokerInfo = new BrokerInfo();
149 }
150 this.brokerInfo.setBrokerName(name);
151 }
152
153 public TransportServer getServer() throws IOException, URISyntaxException {
154 if (server == null) {
155 setServer(createTransportServer());
156 }
157 return server;
158 }
159
160 public void setServer(TransportServer server) {
161 this.server = server;
162 }
163
164 public URI getUri() {
165 if (uri == null) {
166 try {
167 uri = getConnectUri();
168 } catch (Throwable e) {
169 }
170 }
171 return uri;
172 }
173
174 /**
175 * Sets the server transport URI to use if there is not a
176 * {@link TransportServer} configured via the
177 * {@link #setServer(TransportServer)} method. This value is used to lazy
178 * create a {@link TransportServer} instance
179 *
180 * @param uri
181 */
182 public void setUri(URI uri) {
183 this.uri = uri;
184 }
185
186 public TaskRunnerFactory getTaskRunnerFactory() {
187 return taskRunnerFactory;
188 }
189
190 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
191 this.taskRunnerFactory = taskRunnerFactory;
192 }
193
194 /**
195 * @return the statistics for this connector
196 */
197 public ConnectorStatistics getStatistics() {
198 return statistics;
199 }
200
201 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
202 return messageAuthorizationPolicy;
203 }
204
205 /**
206 * Sets the policy used to decide if the current connection is authorized to
207 * consume a given message
208 */
209 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
210 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
211 }
212
213 public void start() throws Exception {
214 broker = brokerService.getBroker();
215 brokerInfo.setBrokerName(broker.getBrokerName());
216 brokerInfo.setBrokerId(broker.getBrokerId());
217 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
218 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
219 brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
220 getServer().setAcceptListener(new TransportAcceptListener() {
221 public void onAccept(final Transport transport) {
222 try {
223 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
224 public void run() {
225 try {
226 Connection connection = createConnection(transport);
227 connection.start();
228 } catch (Exception e) {
229 String remoteHost = transport.getRemoteAddress();
230 ServiceSupport.dispose(transport);
231 onAcceptError(e, remoteHost);
232 }
233 }
234 });
235 } catch (Exception e) {
236 String remoteHost = transport.getRemoteAddress();
237 ServiceSupport.dispose(transport);
238 onAcceptError(e, remoteHost);
239 }
240 }
241
242 public void onAcceptError(Exception error) {
243 onAcceptError(error, null);
244 }
245
246 private void onAcceptError(Exception error, String remoteHost) {
247 LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
248 + error);
249 LOG.debug("Reason: " + error, error);
250 }
251 });
252 getServer().setBrokerInfo(brokerInfo);
253 getServer().start();
254
255 DiscoveryAgent da = getDiscoveryAgent();
256 if (da != null) {
257 da.registerService(getPublishableConnectString());
258 da.start();
259 }
260 if (enableStatusMonitor) {
261 this.statusDector = new TransportStatusDetector(this);
262 this.statusDector.start();
263 }
264
265 LOG.info("Connector " + getName() + " Started");
266 }
267
268 public String getPublishableConnectString() throws Exception {
269 return getPublishableConnectString(getConnectUri());
270 }
271
272 public String getPublishableConnectString(URI theConnectURI) throws Exception {
273 String publishableConnectString = null;
274 if (theConnectURI != null) {
275 publishableConnectString = theConnectURI.toString();
276 // strip off server side query parameters which may not be compatible to
277 // clients
278 if (theConnectURI.getRawQuery() != null) {
279 publishableConnectString = publishableConnectString.substring(0, publishableConnectString
280 .indexOf(theConnectURI.getRawQuery()) - 1);
281 }
282 }
283 if (LOG.isDebugEnabled()) {
284 LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
285 }
286 return publishableConnectString;
287 }
288
289 public void stop() throws Exception {
290 ServiceStopper ss = new ServiceStopper();
291 if (discoveryAgent != null) {
292 ss.stop(discoveryAgent);
293 }
294 if (server != null) {
295 ss.stop(server);
296 }
297 if (this.statusDector != null) {
298 this.statusDector.stop();
299 }
300
301 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
302 TransportConnection c = iter.next();
303 ss.stop(c);
304 }
305 server = null;
306 ss.throwFirstException();
307 LOG.info("Connector " + getName() + " Stopped");
308 }
309
310 // Implementation methods
311 // -------------------------------------------------------------------------
312 protected Connection createConnection(Transport transport) throws IOException {
313 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
314 : taskRunnerFactory);
315 boolean statEnabled = this.getStatistics().isEnabled();
316 answer.getStatistics().setEnabled(statEnabled);
317 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
318 return answer;
319 }
320
321 protected TransportServer createTransportServer() throws IOException, URISyntaxException {
322 if (uri == null) {
323 throw new IllegalArgumentException("You must specify either a server or uri property");
324 }
325 if (brokerService == null) {
326 throw new IllegalArgumentException(
327 "You must specify the brokerService property. Maybe this connector should be added to a broker?");
328 }
329 return TransportFactory.bind(brokerService, uri);
330 }
331
332 public DiscoveryAgent getDiscoveryAgent() throws IOException {
333 if (discoveryAgent == null) {
334 discoveryAgent = createDiscoveryAgent();
335 }
336 return discoveryAgent;
337 }
338
339 protected DiscoveryAgent createDiscoveryAgent() throws IOException {
340 if (discoveryUri != null) {
341 DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
342
343 if( agent!=null && agent instanceof BrokerServiceAware ) {
344 ((BrokerServiceAware)agent).setBrokerService(brokerService);
345 }
346
347 return agent;
348 }
349 return null;
350 }
351
352 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
353 this.discoveryAgent = discoveryAgent;
354 }
355
356 public URI getDiscoveryUri() {
357 return discoveryUri;
358 }
359
360 public void setDiscoveryUri(URI discoveryUri) {
361 this.discoveryUri = discoveryUri;
362 }
363
364 public URI getConnectUri() throws IOException, URISyntaxException {
365 if (connectUri == null) {
366 if (server != null) {
367 connectUri = server.getConnectURI();
368 }
369 }
370 return connectUri;
371 }
372
373 public void setConnectUri(URI transportUri) {
374 this.connectUri = transportUri;
375 }
376
377 public void onStarted(TransportConnection connection) {
378 connections.add(connection);
379 }
380
381 public void onStopped(TransportConnection connection) {
382 connections.remove(connection);
383 }
384
385 public String getName() {
386 if (name == null) {
387 uri = getUri();
388 if (uri != null) {
389 name = uri.toString();
390 }
391 }
392 return name;
393 }
394
395 public void setName(String name) {
396 this.name = name;
397 }
398
399 @Override
400 public String toString() {
401 String rc = getName();
402 if (rc == null) {
403 rc = super.toString();
404 }
405 return rc;
406 }
407
408 protected ConnectionControl getConnectionControl() {
409 boolean rebalance = isRebalanceClusterClients();
410 String connectedBrokers = "";
411 String separator = "";
412
413 if (isUpdateClusterClients()) {
414 synchronized (peerBrokers) {
415 for (String uri : getPeerBrokers()) {
416 connectedBrokers += separator + uri;
417 separator = ",";
418 }
419
420 if (rebalance) {
421 String shuffle = getPeerBrokers().removeFirst();
422 getPeerBrokers().addLast(shuffle);
423 }
424 }
425 }
426 ConnectionControl control = new ConnectionControl();
427 control.setConnectedBrokers(connectedBrokers);
428 control.setRebalanceConnection(rebalance);
429 return control;
430
431 }
432
433 public void addPeerBroker(BrokerInfo info) {
434 if (isMatchesClusterFilter(info.getBrokerName())) {
435 synchronized (peerBrokers) {
436 getPeerBrokers().addLast(info.getBrokerURL());
437 }
438 }
439 }
440
441 public void removePeerBroker(BrokerInfo info) {
442 synchronized (peerBrokers) {
443 getPeerBrokers().remove(info.getBrokerURL());
444 }
445 }
446
447 public LinkedList<String> getPeerBrokers() {
448 synchronized (peerBrokers) {
449 if (peerBrokers.isEmpty()) {
450 peerBrokers.add(brokerService.getDefaultSocketURIString());
451 }
452 return peerBrokers;
453 }
454 }
455
456 public void updateClientClusterInfo() {
457
458 if (isRebalanceClusterClients() || isUpdateClusterClients()) {
459 ConnectionControl control = getConnectionControl();
460 for (Connection c : this.connections) {
461 c.updateClient(control);
462 if (isRebalanceClusterClients()) {
463 control = getConnectionControl();
464 }
465 }
466 }
467 }
468
469 private boolean isMatchesClusterFilter(String brokerName) {
470 boolean result = true;
471 String filter = getUpdateClusterFilter();
472 if (filter != null) {
473 filter = filter.trim();
474 if (filter.length() > 0) {
475 StringTokenizer tokenizer = new StringTokenizer(filter, ",");
476 while (result && tokenizer.hasMoreTokens()) {
477 String token = tokenizer.nextToken();
478 result = isMatchesClusterFilter(brokerName, token);
479 }
480 }
481 }
482 return result;
483 }
484
485 private boolean isMatchesClusterFilter(String brokerName, String match) {
486 boolean result = true;
487 if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
488 result = Pattern.matches(match, brokerName);
489 }
490 return result;
491 }
492
493 public boolean isDisableAsyncDispatch() {
494 return disableAsyncDispatch;
495 }
496
497 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
498 this.disableAsyncDispatch = disableAsyncDispatch;
499 }
500
501 /**
502 * @return the enableStatusMonitor
503 */
504 public boolean isEnableStatusMonitor() {
505 return enableStatusMonitor;
506 }
507
508 /**
509 * @param enableStatusMonitor
510 * the enableStatusMonitor to set
511 */
512 public void setEnableStatusMonitor(boolean enableStatusMonitor) {
513 this.enableStatusMonitor = enableStatusMonitor;
514 }
515
516 /**
517 * This is called by the BrokerService right before it starts the transport.
518 */
519 public void setBrokerService(BrokerService brokerService) {
520 this.brokerService = brokerService;
521 }
522
523 public Broker getBroker() {
524 return broker;
525 }
526
527 public BrokerService getBrokerService() {
528 return brokerService;
529 }
530
531 /**
532 * @return the updateClusterClients
533 */
534 public boolean isUpdateClusterClients() {
535 return this.updateClusterClients;
536 }
537
538 /**
539 * @param updateClusterClients
540 * the updateClusterClients to set
541 */
542 public void setUpdateClusterClients(boolean updateClusterClients) {
543 this.updateClusterClients = updateClusterClients;
544 }
545
546 /**
547 * @return the rebalanceClusterClients
548 */
549 public boolean isRebalanceClusterClients() {
550 return this.rebalanceClusterClients;
551 }
552
553 /**
554 * @param rebalanceClusterClients
555 * the rebalanceClusterClients to set
556 */
557 public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
558 this.rebalanceClusterClients = rebalanceClusterClients;
559 }
560
561 /**
562 * @return the updateClusterClientsOnRemove
563 */
564 public boolean isUpdateClusterClientsOnRemove() {
565 return this.updateClusterClientsOnRemove;
566 }
567
568 /**
569 * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
570 */
571 public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
572 this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
573 }
574
575 /**
576 * @return the updateClusterFilter
577 */
578 public String getUpdateClusterFilter() {
579 return this.updateClusterFilter;
580 }
581
582 /**
583 * @param updateClusterFilter
584 * the updateClusterFilter to set
585 */
586 public void setUpdateClusterFilter(String updateClusterFilter) {
587 this.updateClusterFilter = updateClusterFilter;
588 }
589
590 public int connectionCount() {
591 return connections.size();
592 }
593
594 public boolean isAuditNetworkProducers() {
595 return auditNetworkProducers;
596 }
597
598 /**
599 * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
600 * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
601 * @param auditNetworkProducers
602 */
603 public void setAuditNetworkProducers(boolean auditNetworkProducers) {
604 this.auditNetworkProducers = auditNetworkProducers;
605 }
606
607 public int getMaximumProducersAllowedPerConnection() {
608 return maximumProducersAllowedPerConnection;
609 }
610
611 public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
612 this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
613 }
614
615 public int getMaximumConsumersAllowedPerConnection() {
616 return maximumConsumersAllowedPerConnection;
617 }
618
619 public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
620 this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
621 }
622
623 }