001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.SQLException;
021 import java.util.Arrays;
022 import java.util.Iterator;
023 import java.util.LinkedHashMap;
024 import java.util.Map;
025 import java.util.concurrent.ConcurrentHashMap;
026 import java.util.concurrent.locks.ReentrantReadWriteLock;
027
028 import org.apache.activemq.ActiveMQMessageAudit;
029 import org.apache.activemq.broker.ConnectionContext;
030 import org.apache.activemq.command.ActiveMQDestination;
031 import org.apache.activemq.command.ActiveMQTopic;
032 import org.apache.activemq.command.Message;
033 import org.apache.activemq.command.MessageAck;
034 import org.apache.activemq.command.MessageId;
035 import org.apache.activemq.command.SubscriptionInfo;
036 import org.apache.activemq.store.MessageRecoveryListener;
037 import org.apache.activemq.store.TopicMessageStore;
038 import org.apache.activemq.util.ByteSequence;
039 import org.apache.activemq.util.IOExceptionSupport;
040 import org.apache.activemq.wireformat.WireFormat;
041 import org.slf4j.Logger;
042 import org.slf4j.LoggerFactory;
043
044 /**
045 *
046 */
047 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
048
049 private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
050 private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
051
052 public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
053 private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
054 PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
055 private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
056 private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
057 protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
058 return size() > SEQUENCE_ID_CACHE_SIZE;
059 }
060 };
061
062
063 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException {
064 super(persistenceAdapter, adapter, wireFormat, topic, audit);
065 }
066
067 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
068 if (ack != null && ack.isUnmatchedAck()) {
069 if (LOG.isTraceEnabled()) {
070 LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
071 }
072 return;
073 }
074 TransactionContext c = persistenceAdapter.getTransactionContext(context);
075 try {
076 long[] res = getCachedStoreSequenceId(c, destination, messageId);
077 if (this.isPrioritizedMessages()) {
078 adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
079 } else {
080 adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
081 }
082 if (LOG.isTraceEnabled()) {
083 LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
084 }
085 } catch (SQLException e) {
086 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
087 throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
088 } finally {
089 c.close();
090 }
091 }
092
093 private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
094 long[] val = null;
095 sequenceIdCacheSizeLock.readLock().lock();
096 try {
097 val = sequenceIdCache.get(messageId);
098 } finally {
099 sequenceIdCacheSizeLock.readLock().unlock();
100 }
101 if (val == null) {
102 val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
103 }
104 return val;
105 }
106
107 /**
108 * @throws Exception
109 */
110 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
111 TransactionContext c = persistenceAdapter.getTransactionContext();
112 try {
113 adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
114 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
115 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
116 msg.getMessageId().setBrokerSequenceId(sequenceId);
117 return listener.recoverMessage(msg);
118 }
119
120 public boolean recoverMessageReference(String reference) throws Exception {
121 return listener.recoverMessageReference(new MessageId(reference));
122 }
123
124 });
125 } catch (SQLException e) {
126 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
127 throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
128 } finally {
129 c.close();
130 }
131 }
132
133 private class LastRecovered implements Iterable<LastRecoveredEntry> {
134 LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
135 LastRecovered() {
136 for (int i=0; i<perPriority.length; i++) {
137 perPriority[i] = new LastRecoveredEntry(i);
138 }
139 }
140
141 public void updateStored(long sequence, int priority) {
142 perPriority[priority].stored = sequence;
143 }
144
145 public LastRecoveredEntry defaultPriority() {
146 return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
147 }
148
149 public String toString() {
150 return Arrays.deepToString(perPriority);
151 }
152
153 public Iterator<LastRecoveredEntry> iterator() {
154 return new PriorityIterator();
155 }
156
157 class PriorityIterator implements Iterator<LastRecoveredEntry> {
158 int current = 9;
159 public boolean hasNext() {
160 for (int i=current; i>=0; i--) {
161 if (perPriority[i].hasMessages()) {
162 current = i;
163 return true;
164 }
165 }
166 return false;
167 }
168
169 public LastRecoveredEntry next() {
170 return perPriority[current];
171 }
172
173 public void remove() {
174 throw new RuntimeException("not implemented");
175 }
176 }
177 }
178
179 private class LastRecoveredEntry {
180 final int priority;
181 long recovered = 0;
182 long stored = Integer.MAX_VALUE;
183
184 public LastRecoveredEntry(int priority) {
185 this.priority = priority;
186 }
187
188 public String toString() {
189 return priority + "-" + stored + ":" + recovered;
190 }
191
192 public void exhausted() {
193 stored = recovered;
194 }
195
196 public boolean hasMessages() {
197 return stored > recovered;
198 }
199 }
200
201 class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
202 final MessageRecoveryListener delegate;
203 final int maxMessages;
204 LastRecoveredEntry lastRecovered;
205 int recoveredCount;
206 int recoveredMarker;
207
208 public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
209 this.delegate = delegate;
210 this.maxMessages = maxMessages;
211 }
212
213 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
214 if (delegate.hasSpace() && recoveredCount < maxMessages) {
215 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
216 msg.getMessageId().setBrokerSequenceId(sequenceId);
217 lastRecovered.recovered = sequenceId;
218 if (delegate.recoverMessage(msg)) {
219 recoveredCount++;
220 return true;
221 }
222 }
223 return false;
224 }
225
226 public boolean recoverMessageReference(String reference) throws Exception {
227 return delegate.recoverMessageReference(new MessageId(reference));
228 }
229
230 public void setLastRecovered(LastRecoveredEntry lastRecovered) {
231 this.lastRecovered = lastRecovered;
232 recoveredMarker = recoveredCount;
233 }
234
235 public boolean complete() {
236 return !delegate.hasSpace() || recoveredCount == maxMessages;
237 }
238
239 public boolean stalled() {
240 return recoveredMarker == recoveredCount;
241 }
242 }
243
244 public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
245 throws Exception {
246 //Duration duration = new Duration("recoverNextMessages");
247 TransactionContext c = persistenceAdapter.getTransactionContext();
248
249 String key = getSubscriptionKey(clientId, subscriptionName);
250 if (!subscriberLastRecoveredMap.containsKey(key)) {
251 subscriberLastRecoveredMap.put(key, new LastRecovered());
252 }
253 final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
254 LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
255 try {
256 if (LOG.isTraceEnabled()) {
257 LOG.trace(key + " existing last recovered: " + lastRecovered);
258 }
259 if (isPrioritizedMessages()) {
260 Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
261 for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
262 LastRecoveredEntry entry = it.next();
263 recoveredAwareListener.setLastRecovered(entry);
264 //Duration microDuration = new Duration("recoverNextMessages:loop");
265 adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
266 entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
267 //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
268 if (recoveredAwareListener.stalled()) {
269 if (recoveredAwareListener.complete()) {
270 break;
271 } else {
272 entry.exhausted();
273 }
274 }
275 }
276 } else {
277 LastRecoveredEntry last = lastRecovered.defaultPriority();
278 recoveredAwareListener.setLastRecovered(last);
279 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
280 last.recovered, 0, maxReturned, recoveredAwareListener);
281 }
282 if (LOG.isTraceEnabled()) {
283 LOG.trace(key + " last recovered: " + lastRecovered);
284 }
285 //duration.end();
286 } catch (SQLException e) {
287 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
288 } finally {
289 c.close();
290 }
291 }
292
293 public void resetBatching(String clientId, String subscriptionName) {
294 subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
295 }
296
297 protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
298 // update last recovered state
299 for (LastRecovered last : subscriberLastRecoveredMap.values()) {
300 last.updateStored(sequenceId, priority);
301 }
302 sequenceIdCacheSizeLock.writeLock().lock();
303 try {
304 sequenceIdCache.put(messageId, new long[]{sequenceId, priority});
305 } finally {
306 sequenceIdCacheSizeLock.writeLock().unlock();
307 }
308 }
309
310
311 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
312 TransactionContext c = persistenceAdapter.getTransactionContext();
313 try {
314 c = persistenceAdapter.getTransactionContext();
315 adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
316 } catch (SQLException e) {
317 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
318 throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
319 } finally {
320 c.close();
321 }
322 }
323
324 /**
325 * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
326 * String)
327 */
328 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
329 TransactionContext c = persistenceAdapter.getTransactionContext();
330 try {
331 return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
332 } catch (SQLException e) {
333 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
334 throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
335 } finally {
336 c.close();
337 }
338 }
339
340 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
341 TransactionContext c = persistenceAdapter.getTransactionContext();
342 try {
343 adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
344 } catch (SQLException e) {
345 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
346 throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
347 } finally {
348 c.close();
349 resetBatching(clientId, subscriptionName);
350 }
351 }
352
353 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
354 TransactionContext c = persistenceAdapter.getTransactionContext();
355 try {
356 return adapter.doGetAllSubscriptions(c, destination);
357 } catch (SQLException e) {
358 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
359 throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
360 } finally {
361 c.close();
362 }
363 }
364
365 public int getMessageCount(String clientId, String subscriberName) throws IOException {
366 //Duration duration = new Duration("getMessageCount");
367 int result = 0;
368 TransactionContext c = persistenceAdapter.getTransactionContext();
369 try {
370 result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
371 } catch (SQLException e) {
372 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
373 throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
374 } finally {
375 c.close();
376 }
377 if (LOG.isTraceEnabled()) {
378 LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
379 }
380 //duration.end();
381 return result;
382 }
383
384 protected String getSubscriptionKey(String clientId, String subscriberName) {
385 String result = clientId + ":";
386 result += subscriberName != null ? subscriberName : "NOT_SET";
387 return result;
388 }
389
390 }