001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.pool;
019
020 import java.io.IOException;
021 import java.util.Iterator;
022 import java.util.concurrent.ConcurrentHashMap;
023 import java.util.concurrent.ConcurrentLinkedQueue;
024 import java.util.concurrent.atomic.AtomicBoolean;
025
026 import javax.jms.JMSException;
027 import javax.jms.Session;
028
029 import org.apache.activemq.ActiveMQConnection;
030 import org.apache.activemq.transport.TransportListener;
031 import org.apache.commons.pool.ObjectPoolFactory;
032
033 /**
034 * Holds a real JMS connection along with the session pools associated with it.
035 *
036 *
037 */
038 public class ConnectionPool {
039
040 private ActiveMQConnection connection;
041 private ConcurrentHashMap<SessionKey, SessionPool> cache;
042 private ConcurrentLinkedQueue<PooledSession> loanedSessions = new ConcurrentLinkedQueue<PooledSession>();
043 private AtomicBoolean started = new AtomicBoolean(false);
044 private int referenceCount;
045 private ObjectPoolFactory poolFactory;
046 private long lastUsed = System.currentTimeMillis();
047 private long firstUsed = lastUsed;
048 private boolean hasFailed;
049 private boolean hasExpired;
050 private int idleTimeout = 30 * 1000;
051 private long expiryTimeout = 0l;
052
053 public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
054 this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
055 // Add a transport Listener so that we can notice if this connection
056 // should be expired due to a connection failure.
057 connection.addTransportListener(new TransportListener() {
058 public void onCommand(Object command) {
059 }
060
061 public void onException(IOException error) {
062 synchronized (ConnectionPool.this) {
063 hasFailed = true;
064 }
065 }
066
067 public void transportInterupted() {
068 }
069
070 public void transportResumed() {
071 }
072 });
073
074 // make sure that we set the hasFailed flag, in case the transport already failed
075 // prior to the addition of our new TransportListener
076 if(connection.isTransportFailed()) {
077 hasFailed = true;
078 }
079 }
080
081 public ConnectionPool(ActiveMQConnection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
082 this.connection = connection;
083 this.cache = cache;
084 this.poolFactory = poolFactory;
085 }
086
087 public void start() throws JMSException {
088 if (started.compareAndSet(false, true)) {
089 try {
090 connection.start();
091 } catch (JMSException e) {
092 started.set(false);
093 throw(e);
094 }
095 }
096 }
097
098 public synchronized ActiveMQConnection getConnection() {
099 return connection;
100 }
101
102 public Session createSession(boolean transacted, int ackMode) throws JMSException {
103 SessionKey key = new SessionKey(transacted, ackMode);
104 SessionPool pool = null;
105 pool = cache.get(key);
106 if (pool == null) {
107 SessionPool newPool = createSessionPool(key);
108 SessionPool prevPool = cache.putIfAbsent(key, newPool);
109 if (prevPool != null && prevPool != newPool) {
110 // newPool was not the first one to be associated with this
111 // key... close created session pool
112 try {
113 newPool.close();
114 } catch (Exception e) {
115 throw new JMSException(e.getMessage());
116 }
117 }
118 pool = cache.get(key); // this will return a non-null value...
119 }
120 PooledSession session = pool.borrowSession();
121 this.loanedSessions.add(session);
122 return session;
123 }
124
125 public synchronized void close() {
126 if (connection != null) {
127 try {
128 Iterator<SessionPool> i = cache.values().iterator();
129 while (i.hasNext()) {
130 SessionPool pool = i.next();
131 i.remove();
132 try {
133 pool.close();
134 } catch (Exception e) {
135 }
136 }
137 } finally {
138 try {
139 connection.close();
140 } catch (Exception e) {
141 } finally {
142 connection = null;
143 }
144 }
145 }
146 }
147
148 public synchronized void incrementReferenceCount() {
149 referenceCount++;
150 lastUsed = System.currentTimeMillis();
151 }
152
153 public synchronized void decrementReferenceCount() {
154 referenceCount--;
155 lastUsed = System.currentTimeMillis();
156 if (referenceCount == 0) {
157 expiredCheck();
158
159 for (PooledSession session : this.loanedSessions) {
160 try {
161 session.close();
162 } catch (Exception e) {
163 }
164 }
165 this.loanedSessions.clear();
166
167 // only clean up temp destinations when all users
168 // of this connection have called close
169 if (getConnection() != null) {
170 getConnection().cleanUpTempDestinations();
171 }
172 }
173 }
174
175 /**
176 * @return true if this connection has expired.
177 */
178 public synchronized boolean expiredCheck() {
179 if (connection == null) {
180 return true;
181 }
182 if (hasExpired) {
183 if (referenceCount == 0) {
184 close();
185 }
186 return true;
187 }
188 if (hasFailed
189 || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
190 || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
191 hasExpired = true;
192 if (referenceCount == 0) {
193 close();
194 }
195 return true;
196 }
197 return false;
198 }
199
200 public int getIdleTimeout() {
201 return idleTimeout;
202 }
203
204 public void setIdleTimeout(int idleTimeout) {
205 this.idleTimeout = idleTimeout;
206 }
207
208 protected SessionPool createSessionPool(SessionKey key) {
209 return new SessionPool(this, key, poolFactory.createPool());
210 }
211
212 public void setExpiryTimeout(long expiryTimeout) {
213 this.expiryTimeout = expiryTimeout;
214 }
215
216 public long getExpiryTimeout() {
217 return expiryTimeout;
218 }
219
220 void onSessionReturned(PooledSession session) {
221 this.loanedSessions.remove(session);
222 }
223
224 void onSessionInvalidated(PooledSession session) {
225 this.loanedSessions.remove(session);
226 }
227 }