001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq;
019
020 import java.util.Collections;
021 import java.util.LinkedList;
022 import java.util.List;
023
024 import javax.jms.ConnectionConsumer;
025 import javax.jms.IllegalStateException;
026 import javax.jms.JMSException;
027 import javax.jms.ServerSession;
028 import javax.jms.ServerSessionPool;
029 import javax.jms.Session;
030
031 import org.apache.activemq.command.ConsumerInfo;
032 import org.apache.activemq.command.MessageDispatch;
033
034 /**
035 * For application servers, <CODE>Connection</CODE> objects provide a special
036 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
037 * messages it is to consume are specified by a <CODE>Destination</CODE> and a
038 * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
039 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
040 * <p/>
041 * <P>
042 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
043 * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
044 * and starts it. As traffic picks up, messages can back up. If this happens, a
045 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
046 * with more than one message. This reduces the thread context switches and
047 * minimizes resource use at the expense of some serialization of message
048 * processing.
049 *
050 * @see javax.jms.Connection#createConnectionConsumer
051 * @see javax.jms.Connection#createDurableConnectionConsumer
052 * @see javax.jms.QueueConnection#createConnectionConsumer
053 * @see javax.jms.TopicConnection#createConnectionConsumer
054 * @see javax.jms.TopicConnection#createDurableConnectionConsumer
055 */
056
057 public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher {
058
059 private ActiveMQConnection connection;
060 private ServerSessionPool sessionPool;
061 private ConsumerInfo consumerInfo;
062 private boolean closed;
063
064 /**
065 * Create a ConnectionConsumer
066 *
067 * @param theConnection
068 * @param theSessionPool
069 * @param theConsumerInfo
070 * @throws JMSException
071 */
072 protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException {
073 this.connection = theConnection;
074 this.sessionPool = theSessionPool;
075 this.consumerInfo = theConsumerInfo;
076
077 this.connection.addConnectionConsumer(this);
078 this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
079 this.connection.asyncSendPacket(this.consumerInfo);
080 }
081
082 /**
083 * Gets the server session pool associated with this connection consumer.
084 *
085 * @return the server session pool used by this connection consumer
086 * @throws JMSException if the JMS provider fails to get the server session
087 * pool associated with this consumer due to some internal
088 * error.
089 */
090
091 public ServerSessionPool getServerSessionPool() throws JMSException {
092 if (closed) {
093 throw new IllegalStateException("The Connection Consumer is closed");
094 }
095 return this.sessionPool;
096 }
097
098 /**
099 * Closes the connection consumer. <p/>
100 * <P>
101 * Since a provider may allocate some resources on behalf of a connection
102 * consumer outside the Java virtual machine, clients should close these
103 * resources when they are not needed. Relying on garbage collection to
104 * eventually reclaim these resources may not be timely enough.
105 *
106 * @throws JMSException
107 */
108
109 public void close() throws JMSException {
110 if (!closed) {
111 dispose();
112 this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand());
113 }
114
115 }
116
117 public void dispose() {
118 if (!closed) {
119 this.connection.removeDispatcher(consumerInfo.getConsumerId());
120 this.connection.removeConnectionConsumer(this);
121 closed = true;
122 }
123 }
124
125 public void dispatch(MessageDispatch messageDispatch) {
126 try {
127 messageDispatch.setConsumer(this);
128
129 ServerSession serverSession = sessionPool.getServerSession();
130 Session s = serverSession.getSession();
131 ActiveMQSession session = null;
132
133 if (s instanceof ActiveMQSession) {
134 session = (ActiveMQSession)s;
135 } else if (s instanceof ActiveMQTopicSession) {
136 ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
137 session = (ActiveMQSession)topicSession.getNext();
138 } else if (s instanceof ActiveMQQueueSession) {
139 ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
140 session = (ActiveMQSession)queueSession.getNext();
141 } else {
142 connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
143 return;
144 }
145
146 session.dispatch(messageDispatch);
147 serverSession.start();
148 } catch (JMSException e) {
149 connection.onAsyncException(e);
150 }
151 }
152
153 public String toString() {
154 return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
155 }
156
157 public void clearMessagesInProgress() {
158 // future: may want to deal with rollback of in progress messages to track re deliveries
159 // before indicating that all is complete.
160 // Till there is a need, lets immediately allow dispatch
161 this.connection.transportInterruptionProcessingComplete();
162 }
163 }