001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.jmx;
018
019 import javax.management.openmbean.CompositeData;
020 import javax.management.openmbean.OpenDataException;
021 import javax.jms.JMSException;
022
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.broker.region.Queue;
025 import org.apache.activemq.broker.region.QueueMessageReference;
026 import org.apache.activemq.command.ActiveMQDestination;
027 import org.apache.activemq.command.Message;
028 import org.apache.activemq.util.BrokerSupport;
029
030 /**
031 * Provides a JMX Management view of a Queue.
032 */
033 public class QueueView extends DestinationView implements QueueViewMBean {
034 public QueueView(ManagedRegionBroker broker, Queue destination) {
035 super(broker, destination);
036 }
037
038 public CompositeData getMessage(String messageId) throws OpenDataException {
039 QueueMessageReference ref = ((Queue)destination).getMessage(messageId);
040 Message rc = ref.getMessage();
041 if (rc == null) {
042 return null;
043 }
044 return OpenTypeSupport.convert(rc);
045 }
046
047 public void purge() throws Exception {
048 ((Queue)destination).purge();
049 }
050
051 public boolean removeMessage(String messageId) throws Exception {
052 return ((Queue)destination).removeMessage(messageId);
053 }
054
055 public int removeMatchingMessages(String selector) throws Exception {
056 return ((Queue)destination).removeMatchingMessages(selector);
057 }
058
059 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
060 return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
061 }
062
063 public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
064 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
065 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
066 return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
067 }
068
069 public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
070 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
071 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
072 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
073 }
074
075 public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
076 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
077 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
078 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
079 }
080
081 public boolean moveMessageTo(String messageId, String destinationName) throws Exception {
082 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
083 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
084 return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
085 }
086
087 public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception {
088 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
089 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
090 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination);
091 }
092
093 public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
094 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
095 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
096 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
097 }
098
099 /**
100 * Moves a message back to its original destination
101 */
102 public boolean retryMessage(String messageId) throws Exception {
103 Queue queue = (Queue) destination;
104 QueueMessageReference ref = queue.getMessage(messageId);
105 Message rc = ref.getMessage();
106 if (rc != null) {
107 ActiveMQDestination originalDestination = rc.getOriginalDestination();
108 if (originalDestination != null) {
109 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
110 return queue.moveMessageTo(context, ref, originalDestination);
111 }
112 else {
113 throw new JMSException("No original destination for message: "+ messageId);
114 }
115 }
116 else {
117 throw new JMSException("Could not find message: "+ messageId);
118 }
119 }
120
121 public int cursorSize() {
122 Queue queue = (Queue) destination;
123 if (queue.getMessages() != null){
124 return queue.getMessages().size();
125 }
126 return 0;
127 }
128
129
130 public boolean doesCursorHaveMessagesBuffered() {
131 Queue queue = (Queue) destination;
132 if (queue.getMessages() != null){
133 return queue.getMessages().hasMessagesBufferedToDeliver();
134 }
135 return false;
136
137 }
138
139
140 public boolean doesCursorHaveSpace() {
141 Queue queue = (Queue) destination;
142 if (queue.getMessages() != null){
143 return queue.getMessages().hasSpace();
144 }
145 return false;
146 }
147
148
149 public long getCursorMemoryUsage() {
150 Queue queue = (Queue) destination;
151 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){
152 return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage();
153 }
154 return 0;
155 }
156
157 public int getCursorPercentUsage() {
158 Queue queue = (Queue) destination;
159 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){
160 return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage();
161 }
162 return 0;
163 }
164
165 public boolean isCursorFull() {
166 Queue queue = (Queue) destination;
167 if (queue.getMessages() != null){
168 return queue.getMessages().isFull();
169 }
170 return false;
171 }
172
173 public boolean isCacheEnabled() {
174 Queue queue = (Queue) destination;
175 if (queue.getMessages() != null){
176 return queue.getMessages().isCacheEnabled();
177 }
178 return false;
179 }
180 }