001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.pool;
018
019 import java.util.concurrent.CopyOnWriteArrayList;
020
021 import javax.jms.Connection;
022 import javax.jms.ConnectionConsumer;
023 import javax.jms.ConnectionMetaData;
024 import javax.jms.Destination;
025 import javax.jms.ExceptionListener;
026 import javax.jms.JMSException;
027 import javax.jms.Queue;
028 import javax.jms.QueueConnection;
029 import javax.jms.QueueSession;
030 import javax.jms.ServerSessionPool;
031 import javax.jms.Session;
032 import javax.jms.TemporaryQueue;
033 import javax.jms.TemporaryTopic;
034 import javax.jms.Topic;
035 import javax.jms.TopicConnection;
036 import javax.jms.TopicSession;
037
038 import org.apache.activemq.ActiveMQConnection;
039 import org.apache.activemq.ActiveMQSession;
040 import org.apache.activemq.AlreadyClosedException;
041 import org.apache.activemq.EnhancedConnection;
042 import org.apache.activemq.advisory.DestinationSource;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046 /**
047 * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
048 * {@link QueueConnection} which is pooled and on {@link #close()} will return
049 * itself to the sessionPool.
050 *
051 * <b>NOTE</b> this implementation is only intended for use when sending
052 * messages. It does not deal with pooling of consumers; for that look at a
053 * library like <a href="http://jencks.org/">Jencks</a> such as in <a
054 * href="http://jencks.org/Message+Driven+POJOs">this example</a>
055 *
056 */
057 public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
058 private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
059
060 private ConnectionPool pool;
061 private boolean stopped;
062 private final CopyOnWriteArrayList<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
063 private final CopyOnWriteArrayList<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
064
065 public PooledConnection(ConnectionPool pool) {
066 this.pool = pool;
067 this.pool.incrementReferenceCount();
068 }
069
070 /**
071 * Factory method to create a new instance.
072 */
073 public PooledConnection newInstance() {
074 return new PooledConnection(pool);
075 }
076
077 public void close() throws JMSException {
078 this.cleanupConnectionTemporaryDestinations();
079 if (this.pool != null) {
080 this.pool.decrementReferenceCount();
081 this.pool = null;
082 }
083 }
084
085 public void start() throws JMSException {
086 assertNotClosed();
087 pool.start();
088 }
089
090 public void stop() throws JMSException {
091 stopped = true;
092 }
093
094 public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
095 throws JMSException {
096 return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
097 }
098
099 public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
100 return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
101 }
102
103 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
104 throws JMSException {
105 return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
106 }
107
108 public String getClientID() throws JMSException {
109 return getConnection().getClientID();
110 }
111
112 public ExceptionListener getExceptionListener() throws JMSException {
113 return getConnection().getExceptionListener();
114 }
115
116 public ConnectionMetaData getMetaData() throws JMSException {
117 return getConnection().getMetaData();
118 }
119
120 public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
121 getConnection().setExceptionListener(exceptionListener);
122 }
123
124 public void setClientID(String clientID) throws JMSException {
125
126 // ignore repeated calls to setClientID() with the same client id
127 // this could happen when a JMS component such as Spring that uses a
128 // PooledConnectionFactory shuts down and reinitializes.
129 if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
130 getConnection().setClientID(clientID);
131 }
132 }
133
134 public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
135 return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
136 }
137
138 // Session factory methods
139 // -------------------------------------------------------------------------
140 public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
141 return (QueueSession) createSession(transacted, ackMode);
142 }
143
144 public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
145 return (TopicSession) createSession(transacted, ackMode);
146 }
147
148 public Session createSession(boolean transacted, int ackMode) throws JMSException {
149 PooledSession result;
150 result = (PooledSession) pool.createSession(transacted, ackMode);
151
152 // Add a temporary destination event listener to the session that notifies us when
153 // the session creates temporary destinations.
154 result.addTempDestEventListener(new PooledSessionEventListener() {
155
156 @Override
157 public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
158 connTempQueues.add(tempQueue);
159 }
160
161 @Override
162 public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
163 connTempTopics.add(tempTopic);
164 }
165 });
166
167 return (Session) result;
168 }
169
170 // EnhancedCollection API
171 // -------------------------------------------------------------------------
172
173 public DestinationSource getDestinationSource() throws JMSException {
174 return getConnection().getDestinationSource();
175 }
176
177 // Implementation methods
178 // -------------------------------------------------------------------------
179
180 public ActiveMQConnection getConnection() throws JMSException {
181 assertNotClosed();
182 return pool.getConnection();
183 }
184
185 protected void assertNotClosed() throws AlreadyClosedException {
186 if (stopped || pool == null) {
187 throw new AlreadyClosedException();
188 }
189 }
190
191 protected ActiveMQSession createSession(SessionKey key) throws JMSException {
192 return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
193 }
194
195 public String toString() {
196 return "PooledConnection { " + pool + " }";
197 }
198
199 /**
200 * Remove all of the temporary destinations created for this connection.
201 * This is important since the underlying connection may be reused over a
202 * long period of time, accumulating all of the temporary destinations from
203 * each use. However, from the perspective of the lifecycle from the
204 * client's view, close() closes the connection and, therefore, deletes all
205 * of the temporary destinations created.
206 */
207 protected void cleanupConnectionTemporaryDestinations() {
208
209 for (TemporaryQueue tempQueue : connTempQueues) {
210 try {
211 tempQueue.delete();
212 } catch (JMSException ex) {
213 LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
214 }
215 }
216 connTempQueues.clear();
217
218 for (TemporaryTopic tempTopic : connTempTopics) {
219 try {
220 tempTopic.delete();
221 } catch (JMSException ex) {
222 LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
223 }
224 }
225 connTempTopics.clear();
226 }
227 }