001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.io.IOException;
020 import java.util.List;
021
022 import javax.jms.InvalidSelectorException;
023 import javax.management.ObjectName;
024
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.command.ActiveMQDestination;
027 import org.apache.activemq.command.ConsumerInfo;
028 import org.apache.activemq.command.MessageAck;
029 import org.apache.activemq.command.MessageDispatchNotification;
030 import org.apache.activemq.command.MessagePull;
031 import org.apache.activemq.command.Response;
032 import org.apache.activemq.filter.MessageEvaluationContext;
033
034 /**
035 *
036 */
037 public interface Subscription extends SubscriptionRecovery {
038
039 /**
040 * Used to add messages that match the subscription.
041 * @param node
042 * @throws Exception
043 * @throws InterruptedException
044 * @throws IOException
045 */
046 void add(MessageReference node) throws Exception;
047
048 /**
049 * Used when client acknowledge receipt of dispatched message.
050 * @param node
051 * @throws IOException
052 * @throws Exception
053 */
054 void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception;
055
056
057 /**
058 * Allows a consumer to pull a message on demand
059 */
060 Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception;
061
062 /**
063 * Is the subscription interested in the message?
064 * @param node
065 * @param context
066 * @return
067 * @throws IOException
068 */
069 boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException;
070
071 /**
072 * Is the subscription interested in messages in the destination?
073 * @param context
074 * @return
075 */
076 boolean matches(ActiveMQDestination destination);
077
078 /**
079 * The subscription will be receiving messages from the destination.
080 * @param context
081 * @param destination
082 * @throws Exception
083 */
084 void add(ConnectionContext context, Destination destination) throws Exception;
085
086 /**
087 * The subscription will be no longer be receiving messages from the destination.
088 * @param context
089 * @param destination
090 * @return a list of un-acked messages that were added to the subscription.
091 */
092 List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
093
094 /**
095 * The ConsumerInfo object that created the subscription.
096 * @param destination
097 */
098 ConsumerInfo getConsumerInfo();
099
100 /**
101 * The subscription should release as may references as it can to help the garbage collector
102 * reclaim memory.
103 */
104 void gc();
105
106 /**
107 * Used by a Slave Broker to update dispatch infomation
108 * @param mdn
109 * @throws Exception
110 */
111 void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception;
112
113 /**
114 * @return true if the broker is currently in slave mode
115 */
116 boolean isSlave();
117
118 /**
119 * @return number of messages pending delivery
120 */
121 int getPendingQueueSize();
122
123 /**
124 * @return number of messages dispatched to the client
125 */
126 int getDispatchedQueueSize();
127
128 /**
129 * @return number of messages dispatched to the client
130 */
131 long getDispatchedCounter();
132
133 /**
134 * @return number of messages that matched the subscription
135 */
136 long getEnqueueCounter();
137
138 /**
139 * @return number of messages queued by the client
140 */
141 long getDequeueCounter();
142
143 /**
144 * @return the JMS selector on the current subscription
145 */
146 String getSelector();
147
148 /**
149 * Attempts to change the current active selector on the subscription.
150 * This operation is not supported for persistent topics.
151 */
152 void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException;
153
154 /**
155 * @return the JMX object name that this subscription was registered as if applicable
156 */
157 ObjectName getObjectName();
158
159 /**
160 * Set when the subscription is registered in JMX
161 */
162 void setObjectName(ObjectName objectName);
163
164 /**
165 * @return true when 60% or more room is left for dispatching messages
166 */
167 boolean isLowWaterMark();
168
169 /**
170 * @return true when 10% or less room is left for dispatching messages
171 */
172 boolean isHighWaterMark();
173
174 /**
175 * @return true if there is no space to dispatch messages
176 */
177 boolean isFull();
178
179 /**
180 * inform the MessageConsumer on the client to change it's prefetch
181 * @param newPrefetch
182 */
183 void updateConsumerPrefetch(int newPrefetch);
184
185
186 /**
187 * Called when the subscription is destroyed.
188 */
189 void destroy();
190
191 /**
192 * @return the prefetch size that is configured for the subscription
193 */
194 int getPrefetchSize();
195
196 /**
197 * @return the number of messages awaiting acknowledgement
198 */
199 int getInFlightSize();
200
201 /**
202 * @return the in flight messages as a percentage of the prefetch size
203 */
204 int getInFlightUsage();
205
206 /**
207 * Informs the Broker if the subscription needs to intervention to recover it's state
208 * e.g. DurableTopicSubscriber may do
209 * @see org.apache.activemq.region.cursors.PendingMessageCursor
210 * @return true if recovery required
211 */
212 boolean isRecoveryRequired();
213
214
215 /**
216 * @return true if a browser
217 */
218 boolean isBrowser();
219
220 /**
221 * @return the number of messages this subscription can accept before its full
222 */
223 int countBeforeFull();
224
225 ConnectionContext getContext();
226
227 public int getCursorMemoryHighWaterMark();
228
229 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
230
231 boolean isSlowConsumer();
232
233 void unmatched(MessageReference node) throws IOException;
234 }