001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.jmx;
018
019 import java.io.IOException;
020 import java.util.Set;
021
022 import javax.jms.InvalidSelectorException;
023 import javax.management.ObjectName;
024
025 import org.apache.activemq.broker.BrokerService;
026 import org.apache.activemq.broker.ConnectionContext;
027 import org.apache.activemq.broker.region.Subscription;
028 import org.apache.activemq.command.ActiveMQDestination;
029 import org.apache.activemq.command.ActiveMQQueue;
030 import org.apache.activemq.command.ActiveMQTopic;
031 import org.apache.activemq.command.ConsumerInfo;
032 import org.apache.activemq.filter.DestinationFilter;
033 import org.apache.activemq.util.IOExceptionSupport;
034 import org.apache.activemq.util.JMXSupport;
035
036 /**
037 *
038 */
039 public class SubscriptionView implements SubscriptionViewMBean {
040
041 protected final Subscription subscription;
042 protected final String clientId;
043 protected final String userName;
044
045 /**
046 * Constructor
047 *
048 * @param subs
049 */
050 public SubscriptionView(String clientId, String userName, Subscription subs) {
051 this.clientId = clientId;
052 this.subscription = subs;
053 this.userName = userName;
054 }
055
056 /**
057 * @return the clientId
058 */
059 public String getClientId() {
060 return clientId;
061 }
062
063 /**
064 * @returns the ObjectName of the Connection that created this subscription
065 */
066 public ObjectName getConnection() {
067 ObjectName result = null;
068
069 if (clientId != null && subscription != null) {
070 ConnectionContext ctx = subscription.getContext();
071 if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) {
072 BrokerService service = ctx.getBroker().getBrokerService();
073 ManagementContext managementCtx = service.getManagementContext();
074 if (managementCtx != null) {
075
076 try {
077 ObjectName query = createConnectionQueury(managementCtx, service.getBrokerName());
078 Set<ObjectName> names = managementCtx.queryNames(query, null);
079 if (names.size() == 1) {
080 result = names.iterator().next();
081 }
082 } catch (Exception e) {
083 }
084 }
085 }
086 }
087 return result;
088 }
089
090 private ObjectName createConnectionQueury(ManagementContext ctx, String brokerName) throws IOException {
091 try {
092 return new ObjectName(ctx.getJmxDomainName() + ":" + "BrokerName="
093 + JMXSupport.encodeObjectNamePart(brokerName) + ","
094 + "Type=Connection," + "ConnectorName=*,"
095 + "Connection=" + JMXSupport.encodeObjectNamePart(clientId));
096 } catch (Throwable e) {
097 throw IOExceptionSupport.create(e);
098 }
099 }
100
101 /**
102 * @return the id of the Connection the Subscription is on
103 */
104 public String getConnectionId() {
105 ConsumerInfo info = getConsumerInfo();
106 if (info != null) {
107 return info.getConsumerId().getConnectionId();
108 }
109 return "NOTSET";
110 }
111
112 /**
113 * @return the id of the Session the subscription is on
114 */
115 public long getSessionId() {
116 ConsumerInfo info = getConsumerInfo();
117 if (info != null) {
118 return info.getConsumerId().getSessionId();
119 }
120 return 0;
121 }
122
123 /**
124 * @return the id of the Subscription
125 */
126 public long getSubcriptionId() {
127 ConsumerInfo info = getConsumerInfo();
128 if (info != null) {
129 return info.getConsumerId().getValue();
130 }
131 return 0;
132 }
133
134 /**
135 * @return the destination name
136 */
137 public String getDestinationName() {
138 ConsumerInfo info = getConsumerInfo();
139 if (info != null) {
140 ActiveMQDestination dest = info.getDestination();
141 return dest.getPhysicalName();
142 }
143 return "NOTSET";
144 }
145
146 public String getSelector() {
147 if (subscription != null) {
148 return subscription.getSelector();
149 }
150 return null;
151 }
152
153 public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
154 if (subscription != null) {
155 subscription.setSelector(selector);
156 } else {
157 throw new UnsupportedOperationException("No subscription object");
158 }
159 }
160
161 /**
162 * @return true if the destination is a Queue
163 */
164 public boolean isDestinationQueue() {
165 ConsumerInfo info = getConsumerInfo();
166 if (info != null) {
167 ActiveMQDestination dest = info.getDestination();
168 return dest.isQueue();
169 }
170 return false;
171 }
172
173 /**
174 * @return true of the destination is a Topic
175 */
176 public boolean isDestinationTopic() {
177 ConsumerInfo info = getConsumerInfo();
178 if (info != null) {
179 ActiveMQDestination dest = info.getDestination();
180 return dest.isTopic();
181 }
182 return false;
183 }
184
185 /**
186 * @return true if the destination is temporary
187 */
188 public boolean isDestinationTemporary() {
189 ConsumerInfo info = getConsumerInfo();
190 if (info != null) {
191 ActiveMQDestination dest = info.getDestination();
192 return dest.isTemporary();
193 }
194 return false;
195 }
196
197 /**
198 * @return true if the subscriber is active
199 */
200 public boolean isActive() {
201 return true;
202 }
203
204 /**
205 * The subscription should release as may references as it can to help the
206 * garbage collector reclaim memory.
207 */
208 public void gc() {
209 if (subscription != null) {
210 subscription.gc();
211 }
212 }
213
214 /**
215 * @return whether or not the subscriber is retroactive or not
216 */
217 public boolean isRetroactive() {
218 ConsumerInfo info = getConsumerInfo();
219 return info != null ? info.isRetroactive() : false;
220 }
221
222 /**
223 * @return whether or not the subscriber is an exclusive consumer
224 */
225 public boolean isExclusive() {
226 ConsumerInfo info = getConsumerInfo();
227 return info != null ? info.isExclusive() : false;
228 }
229
230 /**
231 * @return whether or not the subscriber is durable (persistent)
232 */
233 public boolean isDurable() {
234 ConsumerInfo info = getConsumerInfo();
235 return info != null ? info.isDurable() : false;
236 }
237
238 /**
239 * @return whether or not the subscriber ignores local messages
240 */
241 public boolean isNoLocal() {
242 ConsumerInfo info = getConsumerInfo();
243 return info != null ? info.isNoLocal() : false;
244 }
245
246 /**
247 * @return the maximum number of pending messages allowed in addition to the
248 * prefetch size. If enabled to a non-zero value then this will
249 * perform eviction of messages for slow consumers on non-durable
250 * topics.
251 */
252 public int getMaximumPendingMessageLimit() {
253 ConsumerInfo info = getConsumerInfo();
254 return info != null ? info.getMaximumPendingMessageLimit() : 0;
255 }
256
257 /**
258 * @return the consumer priority
259 */
260 public byte getPriority() {
261 ConsumerInfo info = getConsumerInfo();
262 return info != null ? info.getPriority() : 0;
263 }
264
265 /**
266 * @return the name of the consumer which is only used for durable
267 * consumers.
268 */
269 public String getSubcriptionName() {
270 ConsumerInfo info = getConsumerInfo();
271 return info != null ? info.getSubscriptionName() : null;
272 }
273
274 /**
275 * @return number of messages pending delivery
276 */
277 public int getPendingQueueSize() {
278 return subscription != null ? subscription.getPendingQueueSize() : 0;
279 }
280
281 /**
282 * @return number of messages dispatched
283 */
284 public int getDispatchedQueueSize() {
285 return subscription != null ? subscription.getDispatchedQueueSize() : 0;
286 }
287
288 public int getMessageCountAwaitingAcknowledge() {
289 return getDispatchedQueueSize();
290 }
291
292 /**
293 * @return number of messages that matched the subscription
294 */
295 public long getDispatchedCounter() {
296 return subscription != null ? subscription.getDispatchedCounter() : 0;
297 }
298
299 /**
300 * @return number of messages that matched the subscription
301 */
302 public long getEnqueueCounter() {
303 return subscription != null ? subscription.getEnqueueCounter() : 0;
304 }
305
306 /**
307 * @return number of messages queued by the client
308 */
309 public long getDequeueCounter() {
310 return subscription != null ? subscription.getDequeueCounter() : 0;
311 }
312
313 protected ConsumerInfo getConsumerInfo() {
314 return subscription != null ? subscription.getConsumerInfo() : null;
315 }
316
317 /**
318 * @return pretty print
319 */
320 public String toString() {
321 return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
322 }
323
324 /**
325 */
326 public int getPrefetchSize() {
327 return subscription != null ? subscription.getPrefetchSize() : 0;
328 }
329
330 public boolean isMatchingQueue(String queueName) {
331 if (isDestinationQueue()) {
332 return matchesDestination(new ActiveMQQueue(queueName));
333 }
334 return false;
335 }
336
337 public boolean isMatchingTopic(String topicName) {
338 if (isDestinationTopic()) {
339 return matchesDestination(new ActiveMQTopic(topicName));
340 }
341 return false;
342 }
343
344 /**
345 * Return true if this subscription matches the given destination
346 *
347 * @param destination the destination to compare against
348 * @return true if this subscription matches the given destination
349 */
350 public boolean matchesDestination(ActiveMQDestination destination) {
351 ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination();
352 DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination);
353 return filter.matches(destination);
354 }
355
356 @Override
357 public boolean isSlowConsumer() {
358 return subscription.isSlowConsumer();
359 }
360
361 @Override
362 public String getUserName() {
363 return userName;
364 }
365 }