001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.pool;
018
019 import java.io.Serializable;
020 import java.util.Iterator;
021 import java.util.concurrent.CopyOnWriteArrayList;
022
023 import javax.jms.BytesMessage;
024 import javax.jms.Destination;
025 import javax.jms.JMSException;
026 import javax.jms.MapMessage;
027 import javax.jms.Message;
028 import javax.jms.MessageConsumer;
029 import javax.jms.MessageListener;
030 import javax.jms.MessageProducer;
031 import javax.jms.ObjectMessage;
032 import javax.jms.Queue;
033 import javax.jms.QueueBrowser;
034 import javax.jms.QueueReceiver;
035 import javax.jms.QueueSender;
036 import javax.jms.QueueSession;
037 import javax.jms.Session;
038 import javax.jms.StreamMessage;
039 import javax.jms.TemporaryQueue;
040 import javax.jms.TemporaryTopic;
041 import javax.jms.TextMessage;
042 import javax.jms.Topic;
043 import javax.jms.TopicPublisher;
044 import javax.jms.TopicSession;
045 import javax.jms.TopicSubscriber;
046 import javax.jms.XASession;
047 import javax.transaction.xa.XAResource;
048
049 import org.apache.activemq.ActiveMQMessageProducer;
050 import org.apache.activemq.ActiveMQQueueSender;
051 import org.apache.activemq.ActiveMQSession;
052 import org.apache.activemq.ActiveMQTopicPublisher;
053 import org.apache.activemq.AlreadyClosedException;
054 import org.slf4j.Logger;
055 import org.slf4j.LoggerFactory;
056
057 public class PooledSession implements Session, TopicSession, QueueSession, XASession {
058 private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
059
060 private ActiveMQSession session;
061 private SessionPool sessionPool;
062 private ActiveMQMessageProducer messageProducer;
063 private ActiveMQQueueSender queueSender;
064 private ActiveMQTopicPublisher topicPublisher;
065 private boolean transactional = true;
066 private boolean ignoreClose;
067
068 private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
069 private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
070 private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
071 new CopyOnWriteArrayList<PooledSessionEventListener>();
072 private boolean isXa;
073
074 public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
075 this.session = aSession;
076 this.sessionPool = sessionPool;
077 this.transactional = session.isTransacted();
078 }
079
080 public void addTempDestEventListener(PooledSessionEventListener listener) {
081 this.tempDestEventListeners.add(listener);
082 }
083
084 protected boolean isIgnoreClose() {
085 return ignoreClose;
086 }
087
088 protected void setIgnoreClose(boolean ignoreClose) {
089 this.ignoreClose = ignoreClose;
090 }
091
092 public void close() throws JMSException {
093 if (!ignoreClose) {
094 // TODO a cleaner way to reset??
095
096 boolean invalidate = false;
097 try {
098 // lets reset the session
099 getInternalSession().setMessageListener(null);
100
101 // Close any consumers and browsers that may have been created.
102 for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
103 MessageConsumer consumer = iter.next();
104 consumer.close();
105 }
106
107 for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
108 QueueBrowser browser = iter.next();
109 browser.close();
110 }
111
112 if (transactional && !isXa) {
113 try {
114 getInternalSession().rollback();
115 } catch (JMSException e) {
116 invalidate = true;
117 LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
118 }
119 }
120 } catch (JMSException ex) {
121 invalidate = true;
122 LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
123 } finally {
124 consumers.clear();
125 browsers.clear();
126 }
127
128 if (invalidate) {
129 // lets close the session and not put the session back into
130 // the pool
131 if (session != null) {
132 try {
133 session.close();
134 } catch (JMSException e1) {
135 LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
136 }
137 session = null;
138 }
139 sessionPool.invalidateSession(this);
140 } else {
141 sessionPool.returnSession(this);
142 }
143 }
144 }
145
146 public void commit() throws JMSException {
147 getInternalSession().commit();
148 }
149
150 public BytesMessage createBytesMessage() throws JMSException {
151 return getInternalSession().createBytesMessage();
152 }
153
154 public MapMessage createMapMessage() throws JMSException {
155 return getInternalSession().createMapMessage();
156 }
157
158 public Message createMessage() throws JMSException {
159 return getInternalSession().createMessage();
160 }
161
162 public ObjectMessage createObjectMessage() throws JMSException {
163 return getInternalSession().createObjectMessage();
164 }
165
166 public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
167 return getInternalSession().createObjectMessage(serializable);
168 }
169
170 public Queue createQueue(String s) throws JMSException {
171 return getInternalSession().createQueue(s);
172 }
173
174 public StreamMessage createStreamMessage() throws JMSException {
175 return getInternalSession().createStreamMessage();
176 }
177
178 public TemporaryQueue createTemporaryQueue() throws JMSException {
179 TemporaryQueue result;
180
181 result = getInternalSession().createTemporaryQueue();
182
183 // Notify all of the listeners of the created temporary Queue.
184 for (PooledSessionEventListener listener : this.tempDestEventListeners) {
185 listener.onTemporaryQueueCreate(result);
186 }
187
188 return result;
189 }
190
191 public TemporaryTopic createTemporaryTopic() throws JMSException {
192 TemporaryTopic result;
193
194 result = getInternalSession().createTemporaryTopic();
195
196 // Notify all of the listeners of the created temporary Topic.
197 for (PooledSessionEventListener listener : this.tempDestEventListeners) {
198 listener.onTemporaryTopicCreate(result);
199 }
200
201 return result;
202 }
203
204 public void unsubscribe(String s) throws JMSException {
205 getInternalSession().unsubscribe(s);
206 }
207
208 public TextMessage createTextMessage() throws JMSException {
209 return getInternalSession().createTextMessage();
210 }
211
212 public TextMessage createTextMessage(String s) throws JMSException {
213 return getInternalSession().createTextMessage(s);
214 }
215
216 public Topic createTopic(String s) throws JMSException {
217 return getInternalSession().createTopic(s);
218 }
219
220 public int getAcknowledgeMode() throws JMSException {
221 return getInternalSession().getAcknowledgeMode();
222 }
223
224 public boolean getTransacted() throws JMSException {
225 return getInternalSession().getTransacted();
226 }
227
228 public void recover() throws JMSException {
229 getInternalSession().recover();
230 }
231
232 public void rollback() throws JMSException {
233 getInternalSession().rollback();
234 }
235
236 public XAResource getXAResource() {
237 if (session == null) {
238 throw new IllegalStateException("Session is closed");
239 }
240 return session.getTransactionContext();
241 }
242
243 public Session getSession() {
244 return this;
245 }
246
247 public void run() {
248 if (session != null) {
249 session.run();
250 }
251 }
252
253 // Consumer related methods
254 // -------------------------------------------------------------------------
255 public QueueBrowser createBrowser(Queue queue) throws JMSException {
256 return addQueueBrowser(getInternalSession().createBrowser(queue));
257 }
258
259 public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
260 return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
261 }
262
263 public MessageConsumer createConsumer(Destination destination) throws JMSException {
264 return addConsumer(getInternalSession().createConsumer(destination));
265 }
266
267 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
268 return addConsumer(getInternalSession().createConsumer(destination, selector));
269 }
270
271 public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
272 return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
273 }
274
275 public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
276 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
277 }
278
279 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
280 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
281 }
282
283 public MessageListener getMessageListener() throws JMSException {
284 return getInternalSession().getMessageListener();
285 }
286
287 public void setMessageListener(MessageListener messageListener) throws JMSException {
288 getInternalSession().setMessageListener(messageListener);
289 }
290
291 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
292 return addTopicSubscriber(getInternalSession().createSubscriber(topic));
293 }
294
295 public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
296 return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
297 }
298
299 public QueueReceiver createReceiver(Queue queue) throws JMSException {
300 return addQueueReceiver(getInternalSession().createReceiver(queue));
301 }
302
303 public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
304 return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
305 }
306
307 // Producer related methods
308 // -------------------------------------------------------------------------
309 public MessageProducer createProducer(Destination destination) throws JMSException {
310 return new PooledProducer(getMessageProducer(), destination);
311 }
312
313 public QueueSender createSender(Queue queue) throws JMSException {
314 return new PooledQueueSender(getQueueSender(), queue);
315 }
316
317 public TopicPublisher createPublisher(Topic topic) throws JMSException {
318 return new PooledTopicPublisher(getTopicPublisher(), topic);
319 }
320
321 /**
322 * Callback invoked when the consumer is closed.
323 * <p/>
324 * This is used to keep track of an explicit closed consumer created by this
325 * session, by which we know do not need to keep track of the consumer, as
326 * its already closed.
327 *
328 * @param consumer
329 * the consumer which is being closed
330 */
331 protected void onConsumerClose(MessageConsumer consumer) {
332 consumers.remove(consumer);
333 }
334
335 public ActiveMQSession getInternalSession() throws AlreadyClosedException {
336 if (session == null) {
337 throw new AlreadyClosedException("The session has already been closed");
338 }
339 return session;
340 }
341
342 public ActiveMQMessageProducer getMessageProducer() throws JMSException {
343 if (messageProducer == null) {
344 messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
345 }
346 return messageProducer;
347 }
348
349 public ActiveMQQueueSender getQueueSender() throws JMSException {
350 if (queueSender == null) {
351 queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
352 }
353 return queueSender;
354 }
355
356 public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
357 if (topicPublisher == null) {
358 topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
359 }
360 return topicPublisher;
361 }
362
363 private QueueBrowser addQueueBrowser(QueueBrowser browser) {
364 browsers.add(browser);
365 return browser;
366 }
367
368 private MessageConsumer addConsumer(MessageConsumer consumer) {
369 consumers.add(consumer);
370 // must wrap in PooledMessageConsumer to ensure the onConsumerClose
371 // method is invoked
372 // when the returned consumer is closed, to avoid memory leak in this
373 // session class
374 // in case many consumers is created
375 return new PooledMessageConsumer(this, consumer);
376 }
377
378 private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
379 consumers.add(subscriber);
380 return subscriber;
381 }
382
383 private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
384 consumers.add(receiver);
385 return receiver;
386 }
387
388 public void setIsXa(boolean isXa) {
389 this.isXa = isXa;
390 }
391
392 public String toString() {
393 return "PooledSession { " + session + " }";
394 }
395 }