001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.net.URI;
020 import java.util.Hashtable;
021 import java.util.Map;
022 import java.util.Random;
023 import java.util.concurrent.ConcurrentHashMap;
024
025 import javax.naming.CommunicationException;
026 import javax.naming.Context;
027 import javax.naming.NamingEnumeration;
028 import javax.naming.directory.Attributes;
029 import javax.naming.directory.DirContext;
030 import javax.naming.directory.InitialDirContext;
031 import javax.naming.directory.SearchControls;
032 import javax.naming.directory.SearchResult;
033 import javax.naming.event.EventDirContext;
034 import javax.naming.event.NamespaceChangeListener;
035 import javax.naming.event.NamingEvent;
036 import javax.naming.event.NamingExceptionEvent;
037 import javax.naming.event.ObjectChangeListener;
038
039 import org.apache.activemq.util.URISupport;
040 import org.apache.activemq.util.URISupport.CompositeData;
041 import org.slf4j.Logger;
042 import org.slf4j.LoggerFactory;
043
044 /**
045 * class to create dynamic network connectors listed in an directory
046 * server using the LDAP v3 protocol as defined in RFC 2251, the
047 * entries listed in the directory server must implement the ipHost
048 * and ipService objectClasses as defined in RFC 2307.
049 *
050 * @author Trevor Pounds
051 * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a>
052 * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a>
053 *
054 * @org.apache.xbean.XBean element="ldapNetworkConnector"
055 */
056 public class LdapNetworkConnector
057 extends NetworkConnector
058 implements NamespaceChangeListener,
059 ObjectChangeListener
060 {
061 private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
062
063 // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
064 private static final String REQUIRED_OBJECT_CLASS_FILTER = "(&(objectClass=ipHost)(objectClass=ipService))";
065
066 // connection
067 private URI[] availableURIs = null;
068 private int availableURIsIndex = 0;
069 private String base = null;
070 private boolean failover = false;
071 private long curReconnectDelay = 1000; /* 1 sec */
072 private long maxReconnectDelay = 30000; /* 30 sec */
073
074 // authentication
075 private String user = null;
076 private String password = null;
077 private boolean anonymousAuthentication = false;
078
079 // search
080 private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */);
081 private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
082 private boolean searchEventListener = false;
083
084 // connector management
085 private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap();
086 private Map<URI, Integer> referenceMap = new ConcurrentHashMap();
087 private Map<String, URI> uuidMap = new ConcurrentHashMap();
088
089 // local context
090 private DirContext context = null;
091 //currently in use URI
092 private URI ldapURI = null;
093
094 /**
095 * returns the next URI from the configured list
096 *
097 * @return random URI from the configured list
098 */
099 public URI getUri()
100 { return availableURIs[++availableURIsIndex % availableURIs.length]; }
101
102 /**
103 * sets the LDAP server URI
104 *
105 * @param _uri LDAP server URI
106 */
107 public void setUri(URI _uri)
108 throws Exception
109 {
110 CompositeData data = URISupport.parseComposite(_uri);
111 if(data.getScheme().equals("failover"))
112 {
113 availableURIs = data.getComponents();
114 failover = true;
115 }
116 else
117 { availableURIs = new URI[]{ _uri }; }
118 }
119
120 /**
121 * sets the base LDAP dn used for lookup operations
122 *
123 * @param _base LDAP base dn
124 */
125 public void setBase(String _base)
126 { base = _base; }
127
128 /**
129 * sets the LDAP user for access credentials
130 *
131 * @param _user LDAP dn of user
132 */
133 public void setUser(String _user)
134 { user = _user; }
135
136 /**
137 * sets the LDAP password for access credentials
138 *
139 * @param _password user password
140 */
141 public void setPassword(String _password)
142 { password = _password; }
143
144 /**
145 * sets LDAP anonymous authentication access credentials
146 *
147 * @param _anonymousAuthentication set to true to use anonymous authentication
148 */
149 public void setAnonymousAuthentication(boolean _anonymousAuthentication)
150 { anonymousAuthentication = _anonymousAuthentication; }
151
152 /**
153 * sets the LDAP search scope
154 *
155 * @param _searchScope LDAP JNDI search scope
156 */
157 public void setSearchScope(String _searchScope)
158 throws Exception
159 {
160 int scope;
161 if(_searchScope.equals("OBJECT_SCOPE"))
162 { scope = SearchControls.OBJECT_SCOPE; }
163 else if(_searchScope.equals("ONELEVEL_SCOPE"))
164 { scope = SearchControls.ONELEVEL_SCOPE; }
165 else if(_searchScope.equals("SUBTREE_SCOPE"))
166 { scope = SearchControls.SUBTREE_SCOPE; }
167 else
168 { throw new Exception("ERR: unknown LDAP search scope specified: " + _searchScope); }
169 searchControls.setSearchScope(scope);
170 }
171
172 /**
173 * sets the LDAP search filter as defined in RFC 2254
174 *
175 * @param _searchFilter LDAP search filter
176 * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
177 */
178 public void setSearchFilter(String _searchFilter)
179 { searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + _searchFilter + "))"; }
180
181 /**
182 * enables/disable a persistent search to the LDAP server as defined
183 * in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
184 *
185 * @param _searchEventListener enable = true, disable = false (default)
186 * @see <a href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
187 */
188 public void setSearchEventListener(boolean _searchEventListener)
189 { searchEventListener = _searchEventListener; }
190
191 /**
192 * start the connector
193 */
194 public void start()
195 throws Exception
196 {
197 LOG.info("connecting...");
198 Hashtable<String, String> env = new Hashtable();
199 env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
200 this.ldapURI = getUri();
201 LOG.debug(" URI [" + this.ldapURI + "]");
202 env.put(Context.PROVIDER_URL, this.ldapURI.toString());
203 if(anonymousAuthentication)
204 {
205 LOG.debug(" login credentials [anonymous]");
206 env.put(Context.SECURITY_AUTHENTICATION, "none");
207 }
208 else
209 {
210 LOG.debug(" login credentials [" + user + ":******]");
211 env.put(Context.SECURITY_PRINCIPAL, user);
212 env.put(Context.SECURITY_CREDENTIALS, password);
213 }
214 boolean isConnected = false;
215 while(!isConnected)
216 {
217 try
218 {
219 context = new InitialDirContext(env);
220 isConnected = true;
221 }
222 catch(CommunicationException err)
223 {
224 if(failover)
225 {
226 this.ldapURI = getUri();
227 LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]");
228 env.put(Context.PROVIDER_URL, this.ldapURI.toString());
229 Thread.sleep(curReconnectDelay);
230 curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
231 }
232 else
233 { throw err; }
234 }
235 }
236
237 // add connectors from search results
238 LOG.info("searching for network connectors...");
239 LOG.debug(" base [" + base + "]");
240 LOG.debug(" filter [" + searchFilter + "]");
241 LOG.debug(" scope [" + searchControls.getSearchScope() + "]");
242 NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
243 while(results.hasMore())
244 { addConnector(results.next()); }
245
246 // register persistent search event listener
247 if(searchEventListener)
248 {
249 LOG.info("registering persistent search listener...");
250 EventDirContext eventContext = (EventDirContext)context.lookup("");
251 eventContext.addNamingListener(base, searchFilter, searchControls, this);
252 }
253 else // otherwise close context (i.e. connection as it is no longer needed)
254 { context.close(); }
255 }
256
257 /**
258 * stop the connector
259 */
260 public void stop()
261 throws Exception
262 {
263 LOG.info("stopping context...");
264 for(NetworkConnector connector : connectorMap.values())
265 { connector.stop(); }
266 connectorMap.clear();
267 referenceMap.clear();
268 uuidMap.clear();
269 context.close();
270 }
271
272 public String toString() {
273 return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
274 }
275
276 /**
277 * add connector of the given URI
278 *
279 * @param result
280 * search result of connector to add
281 */
282 protected synchronized void addConnector(SearchResult result)
283 throws Exception
284 {
285 String uuid = toUUID(result);
286 if(uuidMap.containsKey(uuid))
287 {
288 LOG.warn("connector already regsitered for UUID [" + uuid + "]");
289 return;
290 }
291
292 URI connectorURI = toURI(result);
293 if(connectorMap.containsKey(connectorURI))
294 {
295 int referenceCount = referenceMap.get(connectorURI) + 1;
296 LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]");
297 referenceMap.put(connectorURI, referenceCount);
298 uuidMap.put(uuid, connectorURI);
299 return;
300 }
301
302 // FIXME: disable JMX listing of LDAP managed connectors, we will
303 // want to map/manage these differently in the future
304 // boolean useJMX = getBrokerService().isUseJmx();
305 // getBrokerService().setUseJmx(false);
306 NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
307 // getBrokerService().setUseJmx(useJMX);
308
309 // propogate std connector properties that may have been set via XML
310 connector.setDynamicOnly(isDynamicOnly());
311 connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
312 connector.setNetworkTTL(getNetworkTTL());
313 connector.setConduitSubscriptions(isConduitSubscriptions());
314 connector.setExcludedDestinations(getExcludedDestinations());
315 connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
316 connector.setDuplex(isDuplex());
317
318 // XXX: set in the BrokerService.startAllConnectors method and is
319 // required to prevent remote broker exceptions upon connection
320 connector.setLocalUri(getBrokerService().getVmConnectorURI());
321 connector.setBrokerName(getBrokerService().getBrokerName());
322 connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
323
324 // start network connector
325 connectorMap.put(connectorURI, connector);
326 referenceMap.put(connectorURI, 1);
327 uuidMap.put(uuid, connectorURI);
328 connector.start();
329 LOG.info("connector added with URI [" + connectorURI + "]");
330 }
331
332 /**
333 * remove connector of the given URI
334 *
335 * @param result search result of connector to remove
336 */
337 protected synchronized void removeConnector(SearchResult result)
338 throws Exception
339 {
340 String uuid = toUUID(result);
341 if(!uuidMap.containsKey(uuid))
342 {
343 LOG.warn("connector not regsitered for UUID [" + uuid + "]");
344 return;
345 }
346
347 URI connectorURI = uuidMap.get(uuid);
348 if(!connectorMap.containsKey(connectorURI))
349 {
350 LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
351 return;
352 }
353
354 int referenceCount = referenceMap.get(connectorURI) - 1;
355 referenceMap.put(connectorURI, referenceCount);
356 uuidMap.remove(uuid);
357 LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]");
358
359 if(referenceCount > 0)
360 { return; }
361
362 NetworkConnector connector = connectorMap.remove(connectorURI);
363 connector.stop();
364 LOG.info("connector removed with URI [" + connectorURI + "]");
365 }
366
367 /**
368 * convert search result into URI
369 *
370 * @param result search result to convert to URI
371 */
372 protected URI toURI(SearchResult result)
373 throws Exception
374 {
375 Attributes attributes = result.getAttributes();
376 String address = (String)attributes.get("iphostnumber").get();
377 String port = (String)attributes.get("ipserviceport").get();
378 String protocol = (String)attributes.get("ipserviceprotocol").get();
379 URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
380 LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
381 return connectorURI;
382 }
383
384 /**
385 * convert search result into URI
386 *
387 * @param result search result to convert to URI
388 */
389 protected String toUUID(SearchResult result)
390 {
391 String uuid = result.getNameInNamespace();
392 LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
393 return uuid;
394 }
395
396 /**
397 * invoked when an entry has been added during a persistent search
398 */
399 public void objectAdded(NamingEvent event)
400 {
401 LOG.debug("entry added");
402 try
403 { addConnector((SearchResult)event.getNewBinding()); }
404 catch(Exception err)
405 { LOG.error("ERR: caught unexpected exception", err); }
406 }
407
408 /**
409 * invoked when an entry has been removed during a persistent search
410 */
411 public void objectRemoved(NamingEvent event)
412 {
413 LOG.debug("entry removed");
414 try
415 { removeConnector((SearchResult)event.getOldBinding()); }
416 catch(Exception err)
417 { LOG.error("ERR: caught unexpected exception", err); }
418 }
419
420 /**
421 * invoked when an entry has been renamed during a persistent search
422 */
423 public void objectRenamed(NamingEvent event)
424 {
425 LOG.debug("entry renamed");
426 // XXX: getNameInNamespace method does not seem to work properly,
427 // but getName seems to provide the result we want
428 String uuidOld = event.getOldBinding().getName();
429 String uuidNew = event.getNewBinding().getName();
430 URI connectorURI = uuidMap.remove(uuidOld);
431 uuidMap.put(uuidNew, connectorURI);
432 LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]");
433 }
434
435 /**
436 * invoked when an entry has been changed during a persistent search
437 */
438 public void objectChanged(NamingEvent event)
439 {
440 LOG.debug("entry changed");
441 try
442 {
443 SearchResult result = (SearchResult)event.getNewBinding();
444 removeConnector(result);
445 addConnector(result);
446 }
447 catch(Exception err)
448 { LOG.error("ERR: caught unexpected exception", err); }
449 }
450
451 /**
452 * invoked when an exception has occurred during a persistent search
453 */
454 public void namingExceptionThrown(NamingExceptionEvent event)
455 { LOG.error("ERR: caught unexpected exception", event.getException()); }
456 }