001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc.adapter;
018
019 import java.io.IOException;
020 import java.io.PrintStream;
021 import java.sql.Connection;
022 import java.sql.PreparedStatement;
023 import java.sql.ResultSet;
024 import java.sql.ResultSetMetaData;
025 import java.sql.SQLException;
026 import java.sql.Statement;
027 import java.util.ArrayList;
028 import java.util.HashSet;
029 import java.util.LinkedList;
030 import java.util.Set;
031 import java.util.concurrent.locks.ReadWriteLock;
032 import java.util.concurrent.locks.ReentrantReadWriteLock;
033
034 import org.apache.activemq.command.ActiveMQDestination;
035 import org.apache.activemq.command.MessageId;
036 import org.apache.activemq.command.ProducerId;
037 import org.apache.activemq.command.SubscriptionInfo;
038 import org.apache.activemq.store.jdbc.JDBCAdapter;
039 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
040 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
041 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
042 import org.apache.activemq.store.jdbc.Statements;
043 import org.apache.activemq.store.jdbc.TransactionContext;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 /**
048 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
049 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
050 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
051 * The databases/JDBC drivers that use this adapter are:
052 * <ul>
053 * <li></li>
054 * </ul>
055 *
056 * @org.apache.xbean.XBean element="defaultJDBCAdapter"
057 *
058 *
059 */
060 public class DefaultJDBCAdapter implements JDBCAdapter {
061 private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
062 public static final int MAX_ROWS = 10000;
063 protected Statements statements;
064 protected boolean batchStatments = true;
065 protected boolean prioritizedMessages;
066 protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
067 // needs to be min twice the prefetch for a durable sub and large enough for selector range
068 protected int maxRows = MAX_ROWS;
069
070 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
071 s.setBytes(index, data);
072 }
073
074 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
075 return rs.getBytes(index);
076 }
077
078 public void doCreateTables(TransactionContext c) throws SQLException, IOException {
079 Statement s = null;
080 cleanupExclusiveLock.writeLock().lock();
081 try {
082 // Check to see if the table already exists. If it does, then don't
083 // log warnings during startup.
084 // Need to run the scripts anyways since they may contain ALTER
085 // statements that upgrade a previous version
086 // of the table
087 boolean alreadyExists = false;
088 ResultSet rs = null;
089 try {
090 rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
091 new String[] { "TABLE" });
092 alreadyExists = rs.next();
093 } catch (Throwable ignore) {
094 } finally {
095 close(rs);
096 }
097 s = c.getConnection().createStatement();
098 String[] createStatments = this.statements.getCreateSchemaStatements();
099 for (int i = 0; i < createStatments.length; i++) {
100 // This will fail usually since the tables will be
101 // created already.
102 try {
103 LOG.debug("Executing SQL: " + createStatments[i]);
104 s.execute(createStatments[i]);
105 } catch (SQLException e) {
106 if (alreadyExists) {
107 LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
108 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
109 + " Vendor code: " + e.getErrorCode());
110 } else {
111 LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
112 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
113 + " Vendor code: " + e.getErrorCode());
114 JDBCPersistenceAdapter.log("Failure details: ", e);
115 }
116 }
117 }
118 c.getConnection().commit();
119 } finally {
120 cleanupExclusiveLock.writeLock().unlock();
121 try {
122 s.close();
123 } catch (Throwable e) {
124 }
125 }
126 }
127
128 public void doDropTables(TransactionContext c) throws SQLException, IOException {
129 Statement s = null;
130 cleanupExclusiveLock.writeLock().lock();
131 try {
132 s = c.getConnection().createStatement();
133 String[] dropStatments = this.statements.getDropSchemaStatements();
134 for (int i = 0; i < dropStatments.length; i++) {
135 // This will fail usually since the tables will be
136 // created already.
137 try {
138 LOG.debug("Executing SQL: " + dropStatments[i]);
139 s.execute(dropStatments[i]);
140 } catch (SQLException e) {
141 LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
142 + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
143 + e.getErrorCode());
144 JDBCPersistenceAdapter.log("Failure details: ", e);
145 }
146 }
147 c.getConnection().commit();
148 } finally {
149 cleanupExclusiveLock.writeLock().unlock();
150 try {
151 s.close();
152 } catch (Throwable e) {
153 }
154 }
155 }
156
157 public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
158 PreparedStatement s = null;
159 ResultSet rs = null;
160 cleanupExclusiveLock.readLock().lock();
161 try {
162 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
163 rs = s.executeQuery();
164 long seq1 = 0;
165 if (rs.next()) {
166 seq1 = rs.getLong(1);
167 }
168 rs.close();
169 s.close();
170 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
171 rs = s.executeQuery();
172 long seq2 = 0;
173 if (rs.next()) {
174 seq2 = rs.getLong(1);
175 }
176 long seq = Math.max(seq1, seq2);
177 return seq;
178 } finally {
179 cleanupExclusiveLock.readLock().unlock();
180 close(rs);
181 close(s);
182 }
183 }
184
185 public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
186 PreparedStatement s = null;
187 ResultSet rs = null;
188 cleanupExclusiveLock.readLock().lock();
189 try {
190 s = c.getConnection().prepareStatement(
191 this.statements.getFindMessageByIdStatement());
192 s.setLong(1, storeSequenceId);
193 rs = s.executeQuery();
194 if (!rs.next()) {
195 return null;
196 }
197 return getBinaryData(rs, 1);
198 } finally {
199 cleanupExclusiveLock.readLock().unlock();
200 close(rs);
201 close(s);
202 }
203 }
204
205
206 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
207 long expiration, byte priority) throws SQLException, IOException {
208 PreparedStatement s = c.getAddMessageStatement();
209 cleanupExclusiveLock.readLock().lock();
210 try {
211 if (s == null) {
212 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
213 if (this.batchStatments) {
214 c.setAddMessageStatement(s);
215 }
216 }
217 s.setLong(1, sequence);
218 s.setString(2, messageID.getProducerId().toString());
219 s.setLong(3, messageID.getProducerSequenceId());
220 s.setString(4, destination.getQualifiedName());
221 s.setLong(5, expiration);
222 s.setLong(6, priority);
223 setBinaryData(s, 7, data);
224 if (this.batchStatments) {
225 s.addBatch();
226 } else if (s.executeUpdate() != 1) {
227 throw new SQLException("Failed add a message");
228 }
229 } finally {
230 cleanupExclusiveLock.readLock().unlock();
231 if (!this.batchStatments) {
232 if (s != null) {
233 s.close();
234 }
235 }
236 }
237 }
238
239 public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
240 long expirationTime, String messageRef) throws SQLException, IOException {
241 PreparedStatement s = c.getAddMessageStatement();
242 cleanupExclusiveLock.readLock().lock();
243 try {
244 if (s == null) {
245 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
246 if (this.batchStatments) {
247 c.setAddMessageStatement(s);
248 }
249 }
250 s.setLong(1, messageID.getBrokerSequenceId());
251 s.setString(2, messageID.getProducerId().toString());
252 s.setLong(3, messageID.getProducerSequenceId());
253 s.setString(4, destination.getQualifiedName());
254 s.setLong(5, expirationTime);
255 s.setString(6, messageRef);
256 if (this.batchStatments) {
257 s.addBatch();
258 } else if (s.executeUpdate() != 1) {
259 throw new SQLException("Failed add a message");
260 }
261 } finally {
262 cleanupExclusiveLock.readLock().unlock();
263 if (!this.batchStatments) {
264 s.close();
265 }
266 }
267 }
268
269 public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
270 PreparedStatement s = null;
271 ResultSet rs = null;
272 cleanupExclusiveLock.readLock().lock();
273 try {
274 s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
275 s.setString(1, messageID.getProducerId().toString());
276 s.setLong(2, messageID.getProducerSequenceId());
277 s.setString(3, destination.getQualifiedName());
278 rs = s.executeQuery();
279 if (!rs.next()) {
280 return new long[]{0,0};
281 }
282 return new long[]{rs.getLong(1), rs.getLong(2)};
283 } finally {
284 cleanupExclusiveLock.readLock().unlock();
285 close(rs);
286 close(s);
287 }
288 }
289
290 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
291 PreparedStatement s = null;
292 ResultSet rs = null;
293 cleanupExclusiveLock.readLock().lock();
294 try {
295 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
296 s.setString(1, id.getProducerId().toString());
297 s.setLong(2, id.getProducerSequenceId());
298 rs = s.executeQuery();
299 if (!rs.next()) {
300 return null;
301 }
302 return getBinaryData(rs, 1);
303 } finally {
304 cleanupExclusiveLock.readLock().unlock();
305 close(rs);
306 close(s);
307 }
308 }
309
310 public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
311 PreparedStatement s = null;
312 ResultSet rs = null;
313 cleanupExclusiveLock.readLock().lock();
314 try {
315 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
316 s.setLong(1, seq);
317 rs = s.executeQuery();
318 if (!rs.next()) {
319 return null;
320 }
321 return rs.getString(1);
322 } finally {
323 cleanupExclusiveLock.readLock().unlock();
324 close(rs);
325 close(s);
326 }
327 }
328
329 public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
330 PreparedStatement s = c.getRemovedMessageStatement();
331 cleanupExclusiveLock.readLock().lock();
332 try {
333 if (s == null) {
334 s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
335 if (this.batchStatments) {
336 c.setRemovedMessageStatement(s);
337 }
338 }
339 s.setLong(1, seq);
340 if (this.batchStatments) {
341 s.addBatch();
342 } else if (s.executeUpdate() != 1) {
343 throw new SQLException("Failed to remove message");
344 }
345 } finally {
346 cleanupExclusiveLock.readLock().unlock();
347 if (!this.batchStatments && s != null) {
348 s.close();
349 }
350 }
351 }
352
353 public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
354 throws Exception {
355 PreparedStatement s = null;
356 ResultSet rs = null;
357 cleanupExclusiveLock.readLock().lock();
358 try {
359 s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
360 s.setString(1, destination.getQualifiedName());
361 rs = s.executeQuery();
362 if (this.statements.isUseExternalMessageReferences()) {
363 while (rs.next()) {
364 if (!listener.recoverMessageReference(rs.getString(2))) {
365 break;
366 }
367 }
368 } else {
369 while (rs.next()) {
370 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
371 break;
372 }
373 }
374 }
375 } finally {
376 cleanupExclusiveLock.readLock().unlock();
377 close(rs);
378 close(s);
379 }
380 }
381
382 public void doMessageIdScan(TransactionContext c, int limit,
383 JDBCMessageIdScanListener listener) throws SQLException, IOException {
384 PreparedStatement s = null;
385 ResultSet rs = null;
386 cleanupExclusiveLock.readLock().lock();
387 try {
388 s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
389 s.setMaxRows(limit);
390 rs = s.executeQuery();
391 // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
392 LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
393 while (rs.next()) {
394 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
395 }
396 if (LOG.isDebugEnabled()) {
397 LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
398 }
399 for (MessageId id : reverseOrderIds) {
400 listener.messageId(id);
401 }
402 } finally {
403 cleanupExclusiveLock.readLock().unlock();
404 close(rs);
405 close(s);
406 }
407 }
408
409 public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
410 String subscriptionName, long seq, long prio) throws SQLException, IOException {
411 PreparedStatement s = c.getUpdateLastAckStatement();
412 cleanupExclusiveLock.readLock().lock();
413 try {
414 if (s == null) {
415 s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
416 if (this.batchStatments) {
417 c.setUpdateLastAckStatement(s);
418 }
419 }
420 s.setLong(1, seq);
421 s.setString(2, destination.getQualifiedName());
422 s.setString(3, clientId);
423 s.setString(4, subscriptionName);
424 s.setLong(5, prio);
425 if (this.batchStatments) {
426 s.addBatch();
427 } else if (s.executeUpdate() != 1) {
428 throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
429 }
430 } finally {
431 cleanupExclusiveLock.readLock().unlock();
432 if (!this.batchStatments) {
433 close(s);
434 }
435 }
436 }
437
438
439 public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
440 String subscriptionName, long seq, long priority) throws SQLException, IOException {
441 PreparedStatement s = c.getUpdateLastAckStatement();
442 cleanupExclusiveLock.readLock().lock();
443 try {
444 if (s == null) {
445 s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
446 if (this.batchStatments) {
447 c.setUpdateLastAckStatement(s);
448 }
449 }
450 s.setLong(1, seq);
451 s.setString(2, destination.getQualifiedName());
452 s.setString(3, clientId);
453 s.setString(4, subscriptionName);
454
455 if (this.batchStatments) {
456 s.addBatch();
457 } else if (s.executeUpdate() != 1) {
458 throw new IOException("Could not update last ack seq : "
459 + seq + ", for sub: " + subscriptionName);
460 }
461 } finally {
462 cleanupExclusiveLock.readLock().unlock();
463 if (!this.batchStatments) {
464 close(s);
465 }
466 }
467 }
468
469 public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
470 String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
471 // dumpTables(c,
472 // destination.getQualifiedName(),clientId,subscriptionName);
473 PreparedStatement s = null;
474 ResultSet rs = null;
475 cleanupExclusiveLock.readLock().lock();
476 try {
477 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
478 s.setString(1, destination.getQualifiedName());
479 s.setString(2, clientId);
480 s.setString(3, subscriptionName);
481 rs = s.executeQuery();
482 if (this.statements.isUseExternalMessageReferences()) {
483 while (rs.next()) {
484 if (!listener.recoverMessageReference(rs.getString(2))) {
485 break;
486 }
487 }
488 } else {
489 while (rs.next()) {
490 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
491 break;
492 }
493 }
494 }
495 } finally {
496 cleanupExclusiveLock.readLock().unlock();
497 close(rs);
498 close(s);
499 }
500 }
501
502 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
503 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
504
505 PreparedStatement s = null;
506 ResultSet rs = null;
507 cleanupExclusiveLock.readLock().lock();
508 try {
509 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
510 s.setMaxRows(Math.max(maxReturned * 2, maxRows));
511 s.setString(1, destination.getQualifiedName());
512 s.setString(2, clientId);
513 s.setString(3, subscriptionName);
514 s.setLong(4, seq);
515 rs = s.executeQuery();
516 int count = 0;
517 if (this.statements.isUseExternalMessageReferences()) {
518 while (rs.next() && count < maxReturned) {
519 if (listener.recoverMessageReference(rs.getString(1))) {
520 count++;
521 }
522 }
523 } else {
524 while (rs.next() && count < maxReturned) {
525 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
526 count++;
527 }
528 }
529 }
530 } finally {
531 cleanupExclusiveLock.readLock().unlock();
532 close(rs);
533 close(s);
534 }
535 }
536
537 public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
538 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
539
540 PreparedStatement s = null;
541 ResultSet rs = null;
542 cleanupExclusiveLock.readLock().lock();
543 try {
544 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
545 s.setMaxRows(Math.max(maxReturned * 2, maxRows));
546 s.setString(1, destination.getQualifiedName());
547 s.setString(2, clientId);
548 s.setString(3, subscriptionName);
549 s.setLong(4, seq);
550 s.setLong(5, priority);
551 rs = s.executeQuery();
552 int count = 0;
553 if (this.statements.isUseExternalMessageReferences()) {
554 while (rs.next() && count < maxReturned) {
555 if (listener.recoverMessageReference(rs.getString(1))) {
556 count++;
557 }
558 }
559 } else {
560 while (rs.next() && count < maxReturned) {
561 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
562 count++;
563 }
564 }
565 }
566 } finally {
567 cleanupExclusiveLock.readLock().unlock();
568 close(rs);
569 close(s);
570 }
571 }
572
573 public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
574 String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
575 PreparedStatement s = null;
576 ResultSet rs = null;
577 int result = 0;
578 cleanupExclusiveLock.readLock().lock();
579 try {
580 if (isPrioritizedMessages) {
581 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
582 } else {
583 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
584 }
585 s.setString(1, destination.getQualifiedName());
586 s.setString(2, clientId);
587 s.setString(3, subscriptionName);
588 rs = s.executeQuery();
589 if (rs.next()) {
590 result = rs.getInt(1);
591 }
592 } finally {
593 cleanupExclusiveLock.readLock().unlock();
594 close(rs);
595 close(s);
596 }
597 return result;
598 }
599
600 /**
601 * @param c
602 * @param info
603 * @param retroactive
604 * @throws SQLException
605 * @throws IOException
606 */
607 public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
608 throws SQLException, IOException {
609 // dumpTables(c, destination.getQualifiedName(), clientId,
610 // subscriptionName);
611 PreparedStatement s = null;
612 cleanupExclusiveLock.readLock().lock();
613 try {
614 long lastMessageId = -1;
615 if (!retroactive) {
616 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
617 ResultSet rs = null;
618 try {
619 rs = s.executeQuery();
620 if (rs.next()) {
621 lastMessageId = rs.getLong(1);
622 }
623 } finally {
624 close(rs);
625 close(s);
626 }
627 }
628 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
629 int maxPriority = 1;
630 if (isPrioritizedMessages) {
631 maxPriority = 10;
632 }
633
634 for (int priority = 0; priority < maxPriority; priority++) {
635 s.setString(1, info.getDestination().getQualifiedName());
636 s.setString(2, info.getClientId());
637 s.setString(3, info.getSubscriptionName());
638 s.setString(4, info.getSelector());
639 s.setLong(5, lastMessageId);
640 s.setString(6, info.getSubscribedDestination().getQualifiedName());
641 s.setLong(7, priority);
642
643 if (s.executeUpdate() != 1) {
644 throw new IOException("Could not create durable subscription for: " + info.getClientId());
645 }
646 }
647
648 } finally {
649 cleanupExclusiveLock.readLock().unlock();
650 close(s);
651 }
652 }
653
654 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
655 String clientId, String subscriptionName) throws SQLException, IOException {
656 PreparedStatement s = null;
657 ResultSet rs = null;
658 cleanupExclusiveLock.readLock().lock();
659 try {
660 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
661 s.setString(1, destination.getQualifiedName());
662 s.setString(2, clientId);
663 s.setString(3, subscriptionName);
664 rs = s.executeQuery();
665 if (!rs.next()) {
666 return null;
667 }
668 SubscriptionInfo subscription = new SubscriptionInfo();
669 subscription.setDestination(destination);
670 subscription.setClientId(clientId);
671 subscription.setSubscriptionName(subscriptionName);
672 subscription.setSelector(rs.getString(1));
673 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
674 ActiveMQDestination.QUEUE_TYPE));
675 return subscription;
676 } finally {
677 cleanupExclusiveLock.readLock().unlock();
678 close(rs);
679 close(s);
680 }
681 }
682
683 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
684 throws SQLException, IOException {
685 PreparedStatement s = null;
686 ResultSet rs = null;
687 cleanupExclusiveLock.readLock().lock();
688 try {
689 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
690 s.setString(1, destination.getQualifiedName());
691 rs = s.executeQuery();
692 ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
693 while (rs.next()) {
694 SubscriptionInfo subscription = new SubscriptionInfo();
695 subscription.setDestination(destination);
696 subscription.setSelector(rs.getString(1));
697 subscription.setSubscriptionName(rs.getString(2));
698 subscription.setClientId(rs.getString(3));
699 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
700 ActiveMQDestination.QUEUE_TYPE));
701 rc.add(subscription);
702 }
703 return rc.toArray(new SubscriptionInfo[rc.size()]);
704 } finally {
705 cleanupExclusiveLock.readLock().unlock();
706 close(rs);
707 close(s);
708 }
709 }
710
711 public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
712 IOException {
713 PreparedStatement s = null;
714 cleanupExclusiveLock.readLock().lock();
715 try {
716 s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
717 s.setString(1, destinationName.getQualifiedName());
718 s.executeUpdate();
719 s.close();
720 s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
721 s.setString(1, destinationName.getQualifiedName());
722 s.executeUpdate();
723 } finally {
724 cleanupExclusiveLock.readLock().unlock();
725 close(s);
726 }
727 }
728
729 public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
730 String subscriptionName) throws SQLException, IOException {
731 PreparedStatement s = null;
732 cleanupExclusiveLock.readLock().lock();
733 try {
734 s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
735 s.setString(1, destination.getQualifiedName());
736 s.setString(2, clientId);
737 s.setString(3, subscriptionName);
738 s.executeUpdate();
739 } finally {
740 cleanupExclusiveLock.readLock().unlock();
741 close(s);
742 }
743 }
744
745 int priorityIterator = 0;
746 public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
747 PreparedStatement s = null;
748 cleanupExclusiveLock.writeLock().lock();
749 try {
750 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
751 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
752 int priority = priorityIterator++%10;
753 s.setInt(1, priority);
754 s.setInt(2, priority);
755 int i = s.executeUpdate();
756 LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
757 } finally {
758 cleanupExclusiveLock.writeLock().unlock();
759 close(s);
760 }
761 }
762
763 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
764 String clientId, String subscriberName) throws SQLException, IOException {
765 PreparedStatement s = null;
766 ResultSet rs = null;
767 long result = -1;
768 cleanupExclusiveLock.readLock().lock();
769 try {
770 s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
771 s.setString(1, destination.getQualifiedName());
772 s.setString(2, clientId);
773 s.setString(3, subscriberName);
774 rs = s.executeQuery();
775 if (rs.next()) {
776 result = rs.getLong(1);
777 if (result == 0 && rs.wasNull()) {
778 result = -1;
779 }
780 }
781 } finally {
782 cleanupExclusiveLock.readLock().unlock();
783 close(rs);
784 close(s);
785 }
786 return result;
787 }
788
789 protected static void close(PreparedStatement s) {
790 try {
791 s.close();
792 } catch (Throwable e) {
793 }
794 }
795
796 protected static void close(ResultSet rs) {
797 try {
798 rs.close();
799 } catch (Throwable e) {
800 }
801 }
802
803 public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
804 HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
805 PreparedStatement s = null;
806 ResultSet rs = null;
807 cleanupExclusiveLock.readLock().lock();
808 try {
809 s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
810 rs = s.executeQuery();
811 while (rs.next()) {
812 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
813 }
814 } finally {
815 cleanupExclusiveLock.readLock().unlock();
816 close(rs);
817 close(s);
818 }
819 return rc;
820 }
821
822 /**
823 * @return true if batchStements
824 */
825 public boolean isBatchStatments() {
826 return this.batchStatments;
827 }
828
829 /**
830 * @param batchStatments
831 */
832 public void setBatchStatments(boolean batchStatments) {
833 this.batchStatments = batchStatments;
834 }
835
836 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
837 this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
838 }
839
840 /**
841 * @return the statements
842 */
843 public Statements getStatements() {
844 return this.statements;
845 }
846
847 public void setStatements(Statements statements) {
848 this.statements = statements;
849 }
850
851 public int getMaxRows() {
852 return maxRows;
853 }
854
855 public void setMaxRows(int maxRows) {
856 this.maxRows = maxRows;
857 }
858
859 @Override
860 public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
861 PreparedStatement s = null;
862 cleanupExclusiveLock.readLock().lock();
863 try {
864 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
865 s.setString(1, destination.getQualifiedName());
866 s.setString(2, destination.getQualifiedName());
867 s.setString(3, destination.getQualifiedName());
868 s.setString(4, null);
869 s.setLong(5, 0);
870 s.setString(6, destination.getQualifiedName());
871 s.setLong(7, 11); // entry out of priority range
872
873 if (s.executeUpdate() != 1) {
874 throw new IOException("Could not create ack record for destination: " + destination);
875 }
876 } finally {
877 cleanupExclusiveLock.readLock().unlock();
878 close(s);
879 }
880 }
881
882 /**
883 * @param c
884 * @param destination
885 * @param clientId
886 * @param subscriberName
887 * @return
888 * @throws SQLException
889 * @throws IOException
890 */
891 public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
892 String clientId, String subscriberName) throws SQLException, IOException {
893 PreparedStatement s = null;
894 ResultSet rs = null;
895 cleanupExclusiveLock.readLock().lock();
896 try {
897 s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
898 s.setString(1, destination.getQualifiedName());
899 s.setString(2, clientId);
900 s.setString(3, subscriberName);
901 rs = s.executeQuery();
902 if (!rs.next()) {
903 return null;
904 }
905 return getBinaryData(rs, 1);
906 } finally {
907 close(rs);
908 cleanupExclusiveLock.readLock().unlock();
909 close(s);
910 }
911 }
912
913 public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
914 IOException {
915 PreparedStatement s = null;
916 ResultSet rs = null;
917 int result = 0;
918 cleanupExclusiveLock.readLock().lock();
919 try {
920 s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
921 s.setString(1, destination.getQualifiedName());
922 rs = s.executeQuery();
923 if (rs.next()) {
924 result = rs.getInt(1);
925 }
926 } finally {
927 cleanupExclusiveLock.readLock().unlock();
928 close(rs);
929 close(s);
930 }
931 return result;
932 }
933
934 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
935 long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
936 PreparedStatement s = null;
937 ResultSet rs = null;
938 cleanupExclusiveLock.readLock().lock();
939 try {
940 if (isPrioritizedMessages) {
941 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
942 } else {
943 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
944 }
945 s.setMaxRows(Math.max(maxReturned * 2, maxRows));
946 s.setString(1, destination.getQualifiedName());
947 s.setLong(2, nextSeq);
948 if (isPrioritizedMessages) {
949 s.setLong(3, priority);
950 s.setLong(4, priority);
951 }
952 rs = s.executeQuery();
953 int count = 0;
954 if (this.statements.isUseExternalMessageReferences()) {
955 while (rs.next() && count < maxReturned) {
956 if (listener.recoverMessageReference(rs.getString(1))) {
957 count++;
958 } else {
959 LOG.debug("Stopped recover next messages");
960 break;
961 }
962 }
963 } else {
964 while (rs.next() && count < maxReturned) {
965 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
966 count++;
967 } else {
968 LOG.debug("Stopped recover next messages");
969 break;
970 }
971 }
972 }
973 } catch (Exception e) {
974 e.printStackTrace();
975 } finally {
976 cleanupExclusiveLock.readLock().unlock();
977 close(rs);
978 close(s);
979 }
980 }
981
982 /* public void dumpTables(Connection c, String destinationName, String clientId, String
983 subscriptionName) throws SQLException {
984 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
985 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
986 PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM "
987 + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D "
988 + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
989 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
990 + " ORDER BY M.ID");
991 s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
992 printQuery(s,System.out); }
993
994 public void dumpTables(Connection c) throws SQLException {
995 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
996 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
997 }
998
999 private void printQuery(Connection c, String query, PrintStream out)
1000 throws SQLException {
1001 printQuery(c.prepareStatement(query), out);
1002 }
1003
1004 private void printQuery(PreparedStatement s, PrintStream out)
1005 throws SQLException {
1006
1007 ResultSet set = null;
1008 try {
1009 set = s.executeQuery();
1010 ResultSetMetaData metaData = set.getMetaData();
1011 for (int i = 1; i <= metaData.getColumnCount(); i++) {
1012 if (i == 1)
1013 out.print("||");
1014 out.print(metaData.getColumnName(i) + "||");
1015 }
1016 out.println();
1017 while (set.next()) {
1018 for (int i = 1; i <= metaData.getColumnCount(); i++) {
1019 if (i == 1)
1020 out.print("|");
1021 out.print(set.getString(i) + "|");
1022 }
1023 out.println();
1024 }
1025 } finally {
1026 try {
1027 set.close();
1028 } catch (Throwable ignore) {
1029 }
1030 try {
1031 s.close();
1032 } catch (Throwable ignore) {
1033 }
1034 }
1035 } */
1036
1037 public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1038 throws SQLException, IOException {
1039 PreparedStatement s = null;
1040 ResultSet rs = null;
1041 cleanupExclusiveLock.readLock().lock();
1042 try {
1043 s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1044 s.setString(1, id.toString());
1045 rs = s.executeQuery();
1046 long seq = -1;
1047 if (rs.next()) {
1048 seq = rs.getLong(1);
1049 }
1050 return seq;
1051 } finally {
1052 cleanupExclusiveLock.readLock().unlock();
1053 close(rs);
1054 close(s);
1055 }
1056 }
1057
1058 }