001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.amq;
018
019 import java.io.IOException;
020
021 import org.apache.activemq.broker.ConnectionContext;
022 import org.apache.activemq.command.ActiveMQTopic;
023 import org.apache.activemq.command.JournalTopicAck;
024 import org.apache.activemq.command.Message;
025 import org.apache.activemq.command.MessageAck;
026 import org.apache.activemq.command.MessageId;
027 import org.apache.activemq.command.SubscriptionInfo;
028 import org.apache.activemq.filter.BooleanExpression;
029 import org.apache.activemq.filter.MessageEvaluationContext;
030 import org.apache.activemq.kaha.impl.async.Location;
031 import org.apache.activemq.selector.SelectorParser;
032 import org.apache.activemq.store.MessageRecoveryListener;
033 import org.apache.activemq.store.TopicMessageStore;
034 import org.apache.activemq.store.TopicReferenceStore;
035 import org.apache.activemq.transaction.Synchronization;
036 import org.apache.activemq.util.IOExceptionSupport;
037 import org.apache.activemq.util.SubscriptionKey;
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040
041 /**
042 * A MessageStore that uses a Journal to store it's messages.
043 *
044 *
045 */
046 public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore {
047
048 private static final Logger LOG = LoggerFactory.getLogger(AMQTopicMessageStore.class);
049 private TopicReferenceStore topicReferenceStore;
050 public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
051 super(adapter, topicReferenceStore, destinationName);
052 this.topicReferenceStore = topicReferenceStore;
053 }
054
055 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
056 flush();
057 topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
058 }
059
060 public void recoverNextMessages(String clientId, String subscriptionName,
061 int maxReturned, final MessageRecoveryListener listener)
062 throws Exception {
063 RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
064 topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener);
065 if (recoveryListener.size() == 0) {
066 flush();
067 topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener);
068 }
069 }
070
071 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
072 return topicReferenceStore.lookupSubscription(clientId, subscriptionName);
073 }
074
075 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
076 peristenceAdapter.writeCommand(subscriptionInfo, false);
077 topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
078 }
079
080 /**
081 */
082 public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName,
083 final MessageId messageId, final MessageAck originalAck) throws IOException {
084 final boolean debug = LOG.isDebugEnabled();
085 JournalTopicAck ack = new JournalTopicAck();
086 ack.setDestination(destination);
087 ack.setMessageId(messageId);
088 ack.setMessageSequenceId(messageId.getBrokerSequenceId());
089 ack.setSubscritionName(subscriptionName);
090 ack.setClientId(clientId);
091 ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null);
092 final Location location = peristenceAdapter.writeCommand(ack, false);
093 final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
094 if (!context.isInTransaction()) {
095 if (debug) {
096 LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
097 }
098 acknowledge(context,messageId, location, clientId,subscriptionName);
099 } else {
100 if (debug) {
101 LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
102 }
103 lock.lock();
104 try {
105 inFlightTxLocations.add(location);
106 }finally {
107 lock.unlock();
108 }
109 transactionStore.acknowledge(this, ack, location);
110 context.getTransaction().addSynchronization(new Synchronization() {
111
112 public void afterCommit() throws Exception {
113 if (debug) {
114 LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
115 }
116 lock.lock();
117 try {
118 inFlightTxLocations.remove(location);
119 acknowledge(context,messageId, location, clientId,subscriptionName);
120 }finally {
121 lock.unlock();
122 }
123 }
124
125 public void afterRollback() throws Exception {
126 if (debug) {
127 LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
128 }
129 lock.lock();
130 try{
131 inFlightTxLocations.remove(location);
132 }finally {
133 lock.unlock();
134 }
135 }
136 });
137 }
138 }
139
140 public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
141 try {
142 SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
143 if (sub != null) {
144 topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId, null);
145 return true;
146 }
147 } catch (Throwable e) {
148 LOG.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
149 }
150 return false;
151 }
152
153 /**
154 * @param messageId
155 * @param location
156 * @param key
157 * @throws IOException
158 */
159 protected void acknowledge(final ConnectionContext context, MessageId messageId,
160 Location location, String clientId, String subscriptionName)
161 throws IOException {
162 MessageAck ack = null;
163 lock.lock();
164 try {
165 lastLocation = location;
166 }finally {
167 lock.unlock();
168 }
169
170 if (topicReferenceStore.acknowledgeReference(context, clientId,
171 subscriptionName, messageId)) {
172 ack = new MessageAck();
173 ack.setLastMessageId(messageId);
174
175 }
176
177 if (ack != null) {
178 removeMessage(context, ack);
179 }
180 }
181
182 /**
183 * @return Returns the longTermStore.
184 */
185 public TopicReferenceStore getTopicReferenceStore() {
186 return topicReferenceStore;
187 }
188
189 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
190 topicReferenceStore.deleteSubscription(clientId, subscriptionName);
191 }
192
193 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
194 return topicReferenceStore.getAllSubscriptions();
195 }
196
197 public int getMessageCount(String clientId, String subscriberName) throws IOException {
198 flush();
199 SubscriptionInfo info = lookupSubscription(clientId, subscriberName);
200 try {
201 MessageCounter counter = new MessageCounter(info, this);
202 topicReferenceStore.recoverSubscription(clientId, subscriberName, counter);
203 return counter.count;
204 } catch (Exception e) {
205 throw IOExceptionSupport.create(e);
206 }
207 }
208
209 private class MessageCounter implements MessageRecoveryListener {
210
211 int count = 0;
212 SubscriptionInfo info;
213 BooleanExpression selectorExpression;
214 TopicMessageStore store;
215
216 public MessageCounter(SubscriptionInfo info, TopicMessageStore store) throws Exception {
217 this.info = info;
218 if (info != null) {
219 String selector = info.getSelector();
220 if (selector != null) {
221 this.selectorExpression = SelectorParser.parse(selector);
222 }
223 }
224 this.store = store;
225 }
226
227 public boolean recoverMessageReference(MessageId ref) throws Exception {
228 if (selectorExpression != null) {
229 MessageEvaluationContext ctx = new MessageEvaluationContext();
230 ctx.setMessageReference(store.getMessage(ref));
231 if (selectorExpression.matches(ctx)) {
232 count++;
233 }
234 } else {
235 count ++;
236 }
237 return true;
238 }
239
240 public boolean recoverMessage(Message message) throws Exception {
241 if (selectorExpression != null) {
242 MessageEvaluationContext ctx = new MessageEvaluationContext();
243 ctx.setMessageReference(store.getMessage(message.getMessageId()));
244 if (selectorExpression.matches(ctx)) {
245 count++;
246 }
247 } else {
248 count++;
249 }
250 return true;
251 }
252
253 public boolean isDuplicate(MessageId ref) {
254 return false;
255 }
256
257 public boolean hasSpace() {
258 return true;
259 }
260 }
261
262 public void resetBatching(String clientId, String subscriptionName) {
263 topicReferenceStore.resetBatching(clientId, subscriptionName);
264 }
265 }