001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.pool;
018
019 import java.util.HashMap;
020 import java.util.Iterator;
021 import java.util.LinkedList;
022 import java.util.Map;
023 import java.util.concurrent.atomic.AtomicBoolean;
024 import javax.jms.Connection;
025 import javax.jms.ConnectionFactory;
026 import javax.jms.JMSException;
027 import org.apache.activemq.ActiveMQConnection;
028 import org.apache.activemq.ActiveMQConnectionFactory;
029 import org.apache.activemq.Service;
030 import org.apache.activemq.util.IOExceptionSupport;
031 import org.slf4j.Logger;
032 import org.slf4j.LoggerFactory;
033 import org.apache.commons.pool.ObjectPoolFactory;
034 import org.apache.commons.pool.impl.GenericObjectPool;
035 import org.apache.commons.pool.impl.GenericObjectPoolFactory;
036
037 /**
038 * A JMS provider which pools Connection, Session and MessageProducer instances
039 * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
040 * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
041 * Connections, sessions and producers are returned to a pool after use so that they can be reused later
042 * without having to undergo the cost of creating them again.
043 *
044 * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
045 * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
046 * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
047 * just created at startup and left active, handling incoming messages as they come. When a consumer is
048 * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
049 * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
050 * where they'll get held until the consumer is active again.
051 *
052 * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
053 * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
054 * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
055 * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
056 *
057 * @org.apache.xbean.XBean element="pooledConnectionFactory"
058 *
059 *
060 */
061 public class PooledConnectionFactory implements ConnectionFactory, Service {
062 private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
063 private ConnectionFactory connectionFactory;
064 private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
065 private ObjectPoolFactory poolFactory;
066 private int maximumActive = 500;
067 private int maxConnections = 1;
068 private int idleTimeout = 30 * 1000;
069 private boolean blockIfSessionPoolIsFull = true;
070 private AtomicBoolean stopped = new AtomicBoolean(false);
071 private long expiryTimeout = 0l;
072
073 public PooledConnectionFactory() {
074 this(new ActiveMQConnectionFactory());
075 }
076
077 public PooledConnectionFactory(String brokerURL) {
078 this(new ActiveMQConnectionFactory(brokerURL));
079 }
080
081 public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
082 this.connectionFactory = connectionFactory;
083 }
084
085 public ConnectionFactory getConnectionFactory() {
086 return connectionFactory;
087 }
088
089 public void setConnectionFactory(ConnectionFactory connectionFactory) {
090 this.connectionFactory = connectionFactory;
091 }
092
093 public Connection createConnection() throws JMSException {
094 return createConnection(null, null);
095 }
096
097 public synchronized Connection createConnection(String userName, String password) throws JMSException {
098 if (stopped.get()) {
099 LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
100 return null;
101 }
102
103 ConnectionKey key = new ConnectionKey(userName, password);
104 LinkedList<ConnectionPool> pools = cache.get(key);
105
106 if (pools == null) {
107 pools = new LinkedList<ConnectionPool>();
108 cache.put(key, pools);
109 }
110
111 ConnectionPool connection = null;
112 if (pools.size() == maxConnections) {
113 connection = pools.removeFirst();
114 }
115
116 // Now.. we might get a connection, but it might be that we need to
117 // dump it..
118 if (connection != null && connection.expiredCheck()) {
119 connection = null;
120 }
121
122 if (connection == null) {
123 ActiveMQConnection delegate = createConnection(key);
124 connection = createConnectionPool(delegate);
125 }
126 pools.add(connection);
127 return new PooledConnection(connection);
128 }
129
130 protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
131 ConnectionPool result = new ConnectionPool(connection, getPoolFactory());
132 result.setIdleTimeout(getIdleTimeout());
133 result.setExpiryTimeout(getExpiryTimeout());
134 return result;
135 }
136
137 protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
138 if (key.getUserName() == null && key.getPassword() == null) {
139 return (ActiveMQConnection)connectionFactory.createConnection();
140 } else {
141 return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
142 }
143 }
144
145 /**
146 * @see org.apache.activemq.service.Service#start()
147 */
148 public void start() {
149 try {
150 stopped.set(false);
151 createConnection();
152 } catch (JMSException e) {
153 LOG.warn("Create pooled connection during start failed.", e);
154 IOExceptionSupport.create(e);
155 }
156 }
157
158 public void stop() {
159 LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
160 stopped.set(true);
161 for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
162 for (ConnectionPool connection : iter.next()) {
163 try {
164 connection.close();
165 }catch(Exception e) {
166 LOG.warn("Close connection failed",e);
167 }
168 }
169 }
170 cache.clear();
171 }
172
173 public ObjectPoolFactory getPoolFactory() {
174 if (poolFactory == null) {
175 poolFactory = createPoolFactory();
176 }
177 return poolFactory;
178 }
179
180 /**
181 * Sets the object pool factory used to create individual session pools for
182 * each connection
183 */
184 public void setPoolFactory(ObjectPoolFactory poolFactory) {
185 this.poolFactory = poolFactory;
186 }
187
188 public int getMaximumActive() {
189 return maximumActive;
190 }
191
192 /**
193 * Sets the maximum number of active sessions per connection
194 */
195 public void setMaximumActive(int maximumActive) {
196 this.maximumActive = maximumActive;
197 }
198
199 /**
200 * Controls the behavior of the internal session pool. By default the call to
201 * Connection.getSession() will block if the session pool is full. If the
202 * argument false is given, it will change the default behavior and instead the
203 * call to getSession() will throw a JMSException.
204 *
205 * The size of the session pool is controlled by the @see #maximumActive
206 * property.
207 *
208 * @param block - if true, the call to getSession() blocks if the pool is full
209 * until a session object is available. defaults to true.
210 */
211 public void setBlockIfSessionPoolIsFull(boolean block) {
212 this.blockIfSessionPoolIsFull = block;
213 }
214
215 /**
216 * @return the maxConnections
217 */
218 public int getMaxConnections() {
219 return maxConnections;
220 }
221
222 /**
223 * @param maxConnections the maxConnections to set
224 */
225 public void setMaxConnections(int maxConnections) {
226 this.maxConnections = maxConnections;
227 }
228
229 /**
230 * Creates an ObjectPoolFactory. Its behavior is controlled by the two
231 * properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
232 *
233 * @return the newly created but empty ObjectPoolFactory
234 */
235 protected ObjectPoolFactory createPoolFactory() {
236 if (blockIfSessionPoolIsFull) {
237 return new GenericObjectPoolFactory(null, maximumActive);
238 } else {
239 return new GenericObjectPoolFactory(null,
240 maximumActive,
241 GenericObjectPool.WHEN_EXHAUSTED_FAIL,
242 GenericObjectPool.DEFAULT_MAX_WAIT);
243 }
244 }
245
246 public int getIdleTimeout() {
247 return idleTimeout;
248 }
249
250 public void setIdleTimeout(int idleTimeout) {
251 this.idleTimeout = idleTimeout;
252 }
253
254 /**
255 * allow connections to expire, irrespective of load or idle time. This is useful with failover
256 * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
257 *
258 * @param expiryTimeout non zero in milliseconds
259 */
260 public void setExpiryTimeout(long expiryTimeout) {
261 this.expiryTimeout = expiryTimeout;
262 }
263
264 public long getExpiryTimeout() {
265 return expiryTimeout;
266 }
267 }