001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.sql.Connection;
022 import java.sql.SQLException;
023 import java.util.Collections;
024 import java.util.Set;
025 import java.util.concurrent.ScheduledFuture;
026 import java.util.concurrent.ScheduledThreadPoolExecutor;
027 import java.util.concurrent.ThreadFactory;
028 import java.util.concurrent.TimeUnit;
029
030 import javax.sql.DataSource;
031
032 import org.apache.activemq.ActiveMQMessageAudit;
033 import org.apache.activemq.broker.BrokerService;
034 import org.apache.activemq.broker.BrokerServiceAware;
035 import org.apache.activemq.broker.ConnectionContext;
036 import org.apache.activemq.command.ActiveMQDestination;
037 import org.apache.activemq.command.ActiveMQQueue;
038 import org.apache.activemq.command.ActiveMQTopic;
039 import org.apache.activemq.command.Message;
040 import org.apache.activemq.command.MessageId;
041 import org.apache.activemq.command.ProducerId;
042 import org.apache.activemq.openwire.OpenWireFormat;
043 import org.apache.activemq.store.MessageStore;
044 import org.apache.activemq.store.PersistenceAdapter;
045 import org.apache.activemq.store.TopicMessageStore;
046 import org.apache.activemq.store.TransactionStore;
047 import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
048 import org.apache.activemq.store.memory.MemoryTransactionStore;
049 import org.apache.activemq.usage.SystemUsage;
050 import org.apache.activemq.util.ByteSequence;
051 import org.apache.activemq.util.FactoryFinder;
052 import org.apache.activemq.util.IOExceptionSupport;
053 import org.apache.activemq.util.LongSequenceGenerator;
054 import org.apache.activemq.wireformat.WireFormat;
055 import org.slf4j.Logger;
056 import org.slf4j.LoggerFactory;
057
058 /**
059 * A {@link PersistenceAdapter} implementation using JDBC for persistence
060 * storage.
061 *
062 * This persistence adapter will correctly remember prepared XA transactions,
063 * but it will not keep track of local transaction commits so that operations
064 * performed against the Message store are done as a single uow.
065 *
066 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
067 *
068 *
069 */
070 public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
071 BrokerServiceAware {
072
073 private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
074 private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
075 "META-INF/services/org/apache/activemq/store/jdbc/");
076 private static FactoryFinder lockFactoryFinder = new FactoryFinder(
077 "META-INF/services/org/apache/activemq/store/jdbc/lock/");
078
079 private WireFormat wireFormat = new OpenWireFormat();
080 private BrokerService brokerService;
081 private Statements statements;
082 private JDBCAdapter adapter;
083 private MemoryTransactionStore transactionStore;
084 private ScheduledThreadPoolExecutor clockDaemon;
085 private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
086 private int cleanupPeriod = 1000 * 60 * 5;
087 private boolean useExternalMessageReferences;
088 private boolean useDatabaseLock = true;
089 private long lockKeepAlivePeriod = 1000*30;
090 private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
091 private DatabaseLocker databaseLocker;
092 private boolean createTablesOnStartup = true;
093 private DataSource lockDataSource;
094 private int transactionIsolation;
095 private File directory;
096
097 protected int maxProducersToAudit=1024;
098 protected int maxAuditDepth=1000;
099 protected boolean enableAudit=false;
100 protected int auditRecoveryDepth = 1024;
101 protected ActiveMQMessageAudit audit;
102
103 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
104 protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
105
106 public JDBCPersistenceAdapter() {
107 }
108
109 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
110 super(ds);
111 this.wireFormat = wireFormat;
112 }
113
114 public Set<ActiveMQDestination> getDestinations() {
115 TransactionContext c = null;
116 try {
117 c = getTransactionContext();
118 return getAdapter().doGetDestinations(c);
119 } catch (IOException e) {
120 return emptyDestinationSet();
121 } catch (SQLException e) {
122 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
123 return emptyDestinationSet();
124 } finally {
125 if (c != null) {
126 try {
127 c.close();
128 } catch (Throwable e) {
129 }
130 }
131 }
132 }
133
134 @SuppressWarnings("unchecked")
135 private Set<ActiveMQDestination> emptyDestinationSet() {
136 return Collections.EMPTY_SET;
137 }
138
139 protected void createMessageAudit() {
140 if (enableAudit && audit == null) {
141 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
142 TransactionContext c = null;
143
144 try {
145 c = getTransactionContext();
146 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
147 public void messageId(MessageId id) {
148 audit.isDuplicate(id);
149 }
150 });
151 } catch (Exception e) {
152 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
153 } finally {
154 if (c != null) {
155 try {
156 c.close();
157 } catch (Throwable e) {
158 }
159 }
160 }
161 }
162 }
163
164 public void initSequenceIdGenerator() {
165 TransactionContext c = null;
166 try {
167 c = getTransactionContext();
168 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
169 public void messageId(MessageId id) {
170 audit.isDuplicate(id);
171 }
172 });
173 } catch (Exception e) {
174 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
175 } finally {
176 if (c != null) {
177 try {
178 c.close();
179 } catch (Throwable e) {
180 }
181 }
182 }
183
184 }
185
186 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
187 MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
188 if (transactionStore != null) {
189 rc = transactionStore.proxy(rc);
190 }
191 return rc;
192 }
193
194 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
195 TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
196 if (transactionStore != null) {
197 rc = transactionStore.proxy(rc);
198 }
199 return rc;
200 }
201
202 /**
203 * Cleanup method to remove any state associated with the given destination
204 * @param destination Destination to forget
205 */
206 public void removeQueueMessageStore(ActiveMQQueue destination) {
207 if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
208 try {
209 removeConsumerDestination(destination);
210 } catch (IOException ioe) {
211 LOG.error("Failed to remove consumer destination: " + destination, ioe);
212 }
213 }
214 }
215
216 private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
217 TransactionContext c = getTransactionContext();
218 try {
219 String id = destination.getQualifiedName();
220 getAdapter().doDeleteSubscription(c, destination, id, id);
221 } catch (SQLException e) {
222 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
223 throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
224 } finally {
225 c.close();
226 }
227 }
228
229 /**
230 * Cleanup method to remove any state associated with the given destination
231 * No state retained.... nothing to do
232 *
233 * @param destination Destination to forget
234 */
235 public void removeTopicMessageStore(ActiveMQTopic destination) {
236 }
237
238 public TransactionStore createTransactionStore() throws IOException {
239 if (transactionStore == null) {
240 transactionStore = new MemoryTransactionStore(this);
241 }
242 return this.transactionStore;
243 }
244
245 public long getLastMessageBrokerSequenceId() throws IOException {
246 TransactionContext c = getTransactionContext();
247 try {
248 long seq = getAdapter().doGetLastMessageStoreSequenceId(c);
249 sequenceGenerator.setLastSequenceId(seq);
250 long brokerSeq = 0;
251 if (seq != 0) {
252 byte[] msg = getAdapter().doGetMessageById(c, seq);
253 if (msg != null) {
254 Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
255 brokerSeq = last.getMessageId().getBrokerSequenceId();
256 } else {
257 LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
258 }
259 }
260 return brokerSeq;
261 } catch (SQLException e) {
262 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
263 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
264 } finally {
265 c.close();
266 }
267 }
268
269 public long getLastProducerSequenceId(ProducerId id) throws IOException {
270 TransactionContext c = getTransactionContext();
271 try {
272 return getAdapter().doGetLastProducerSequenceId(c, id);
273 } catch (SQLException e) {
274 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
275 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
276 } finally {
277 c.close();
278 }
279 }
280
281
282 public void start() throws Exception {
283 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
284
285 if (isCreateTablesOnStartup()) {
286 TransactionContext transactionContext = getTransactionContext();
287 transactionContext.begin();
288 try {
289 try {
290 getAdapter().doCreateTables(transactionContext);
291 } catch (SQLException e) {
292 LOG.warn("Cannot create tables due to: " + e);
293 JDBCPersistenceAdapter.log("Failure Details: ", e);
294 }
295 } finally {
296 transactionContext.commit();
297 }
298 }
299
300 if (isUseDatabaseLock()) {
301 DatabaseLocker service = getDatabaseLocker();
302 if (service == null) {
303 LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
304 } else {
305 service.start();
306 if (lockKeepAlivePeriod > 0) {
307 keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
308 public void run() {
309 databaseLockKeepAlive();
310 }
311 }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
312 }
313 if (brokerService != null) {
314 brokerService.getBroker().nowMasterBroker();
315 }
316 }
317 }
318
319 // Cleanup the db periodically.
320 if (cleanupPeriod > 0) {
321 cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
322 public void run() {
323 cleanup();
324 }
325 }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
326 }
327
328 createMessageAudit();
329 }
330
331 public synchronized void stop() throws Exception {
332 if (cleanupTicket != null) {
333 cleanupTicket.cancel(true);
334 cleanupTicket = null;
335 }
336 if (keepAliveTicket != null) {
337 keepAliveTicket.cancel(false);
338 keepAliveTicket = null;
339 }
340
341 // do not shutdown clockDaemon as it may kill the thread initiating shutdown
342 DatabaseLocker service = getDatabaseLocker();
343 if (service != null) {
344 service.stop();
345 }
346 }
347
348 public void cleanup() {
349 TransactionContext c = null;
350 try {
351 LOG.debug("Cleaning up old messages.");
352 c = getTransactionContext();
353 getAdapter().doDeleteOldMessages(c);
354 } catch (IOException e) {
355 LOG.warn("Old message cleanup failed due to: " + e, e);
356 } catch (SQLException e) {
357 LOG.warn("Old message cleanup failed due to: " + e);
358 JDBCPersistenceAdapter.log("Failure Details: ", e);
359 } finally {
360 if (c != null) {
361 try {
362 c.close();
363 } catch (Throwable e) {
364 }
365 }
366 LOG.debug("Cleanup done.");
367 }
368 }
369
370 public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
371 this.clockDaemon = clockDaemon;
372 }
373
374 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
375 if (clockDaemon == null) {
376 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
377 public Thread newThread(Runnable runnable) {
378 Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
379 thread.setDaemon(true);
380 return thread;
381 }
382 });
383 }
384 return clockDaemon;
385 }
386
387 public JDBCAdapter getAdapter() throws IOException {
388 if (adapter == null) {
389 setAdapter(createAdapter());
390 }
391 return adapter;
392 }
393
394 public DatabaseLocker getDatabaseLocker() throws IOException {
395 if (databaseLocker == null && isUseDatabaseLock()) {
396 setDatabaseLocker(loadDataBaseLocker());
397 }
398 return databaseLocker;
399 }
400
401 /**
402 * Sets the database locker strategy to use to lock the database on startup
403 * @throws IOException
404 */
405 public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
406 databaseLocker = locker;
407 databaseLocker.setPersistenceAdapter(this);
408 databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
409 }
410
411 public DataSource getLockDataSource() throws IOException {
412 if (lockDataSource == null) {
413 lockDataSource = getDataSource();
414 if (lockDataSource == null) {
415 throw new IllegalArgumentException(
416 "No dataSource property has been configured");
417 }
418 } else {
419 LOG.info("Using a separate dataSource for locking: "
420 + lockDataSource);
421 }
422 return lockDataSource;
423 }
424
425 public void setLockDataSource(DataSource dataSource) {
426 this.lockDataSource = dataSource;
427 }
428
429 public BrokerService getBrokerService() {
430 return brokerService;
431 }
432
433 public void setBrokerService(BrokerService brokerService) {
434 this.brokerService = brokerService;
435 }
436
437 /**
438 * @throws IOException
439 */
440 protected JDBCAdapter createAdapter() throws IOException {
441
442 adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
443
444 // Use the default JDBC adapter if the
445 // Database type is not recognized.
446 if (adapter == null) {
447 adapter = new DefaultJDBCAdapter();
448 LOG.debug("Using default JDBC Adapter: " + adapter);
449 }
450 return adapter;
451 }
452
453 private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
454 Object adapter = null;
455 TransactionContext c = getTransactionContext();
456 try {
457 try {
458 // Make the filename file system safe.
459 String dirverName = c.getConnection().getMetaData().getDriverName();
460 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
461
462 try {
463 adapter = finder.newInstance(dirverName);
464 LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
465 } catch (Throwable e) {
466 LOG.info("Database " + kind + " driver override not found for : [" + dirverName
467 + "]. Will use default implementation.");
468 }
469 } catch (SQLException e) {
470 LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
471 + e.getMessage());
472 JDBCPersistenceAdapter.log("Failure Details: ", e);
473 }
474 } finally {
475 c.close();
476 }
477 return adapter;
478 }
479
480 public void setAdapter(JDBCAdapter adapter) {
481 this.adapter = adapter;
482 this.adapter.setStatements(getStatements());
483 this.adapter.setMaxRows(getMaxRows());
484 }
485
486 public WireFormat getWireFormat() {
487 return wireFormat;
488 }
489
490 public void setWireFormat(WireFormat wireFormat) {
491 this.wireFormat = wireFormat;
492 }
493
494 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
495 if (context == null) {
496 return getTransactionContext();
497 } else {
498 TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
499 if (answer == null) {
500 answer = getTransactionContext();
501 context.setLongTermStoreContext(answer);
502 }
503 return answer;
504 }
505 }
506
507 public TransactionContext getTransactionContext() throws IOException {
508 TransactionContext answer = new TransactionContext(this);
509 if (transactionIsolation > 0) {
510 answer.setTransactionIsolation(transactionIsolation);
511 }
512 return answer;
513 }
514
515 public void beginTransaction(ConnectionContext context) throws IOException {
516 TransactionContext transactionContext = getTransactionContext(context);
517 transactionContext.begin();
518 }
519
520 public void commitTransaction(ConnectionContext context) throws IOException {
521 TransactionContext transactionContext = getTransactionContext(context);
522 transactionContext.commit();
523 }
524
525 public void rollbackTransaction(ConnectionContext context) throws IOException {
526 TransactionContext transactionContext = getTransactionContext(context);
527 transactionContext.rollback();
528 }
529
530 public int getCleanupPeriod() {
531 return cleanupPeriod;
532 }
533
534 /**
535 * Sets the number of milliseconds until the database is attempted to be
536 * cleaned up for durable topics
537 */
538 public void setCleanupPeriod(int cleanupPeriod) {
539 this.cleanupPeriod = cleanupPeriod;
540 }
541
542 public void deleteAllMessages() throws IOException {
543 TransactionContext c = getTransactionContext();
544 try {
545 getAdapter().doDropTables(c);
546 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
547 getAdapter().doCreateTables(c);
548 LOG.info("Persistence store purged.");
549 } catch (SQLException e) {
550 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
551 throw IOExceptionSupport.create(e);
552 } finally {
553 c.close();
554 }
555 }
556
557 public boolean isUseExternalMessageReferences() {
558 return useExternalMessageReferences;
559 }
560
561 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
562 this.useExternalMessageReferences = useExternalMessageReferences;
563 }
564
565 public boolean isCreateTablesOnStartup() {
566 return createTablesOnStartup;
567 }
568
569 /**
570 * Sets whether or not tables are created on startup
571 */
572 public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
573 this.createTablesOnStartup = createTablesOnStartup;
574 }
575
576 public boolean isUseDatabaseLock() {
577 return useDatabaseLock;
578 }
579
580 /**
581 * Sets whether or not an exclusive database lock should be used to enable
582 * JDBC Master/Slave. Enabled by default.
583 */
584 public void setUseDatabaseLock(boolean useDatabaseLock) {
585 this.useDatabaseLock = useDatabaseLock;
586 }
587
588 public static void log(String msg, SQLException e) {
589 String s = msg + e.getMessage();
590 while (e.getNextException() != null) {
591 e = e.getNextException();
592 s += ", due to: " + e.getMessage();
593 }
594 LOG.warn(s, e);
595 }
596
597 public Statements getStatements() {
598 if (statements == null) {
599 statements = new Statements();
600 }
601 return statements;
602 }
603
604 public void setStatements(Statements statements) {
605 this.statements = statements;
606 }
607
608 /**
609 * @param usageManager The UsageManager that is controlling the
610 * destination's memory usage.
611 */
612 public void setUsageManager(SystemUsage usageManager) {
613 }
614
615 protected void databaseLockKeepAlive() {
616 boolean stop = false;
617 try {
618 DatabaseLocker locker = getDatabaseLocker();
619 if (locker != null) {
620 if (!locker.keepAlive()) {
621 stop = true;
622 }
623 }
624 } catch (IOException e) {
625 LOG.error("Failed to get database when trying keepalive: " + e, e);
626 }
627 if (stop) {
628 stopBroker();
629 }
630 }
631
632 protected void stopBroker() {
633 // we can no longer keep the lock so lets fail
634 LOG.info("No longer able to keep the exclusive lock so giving up being a master");
635 try {
636 brokerService.stop();
637 } catch (Exception e) {
638 LOG.warn("Failure occurred while stopping broker");
639 }
640 }
641
642 protected DatabaseLocker loadDataBaseLocker() throws IOException {
643 DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
644 if (locker == null) {
645 locker = new DefaultDatabaseLocker();
646 LOG.debug("Using default JDBC Locker: " + locker);
647 }
648 return locker;
649 }
650
651 public void setBrokerName(String brokerName) {
652 }
653
654 public String toString() {
655 return "JDBCPersistenceAdapter(" + super.toString() + ")";
656 }
657
658 public void setDirectory(File dir) {
659 this.directory=dir;
660 }
661
662 public File getDirectory(){
663 if (this.directory==null && brokerService != null){
664 this.directory=brokerService.getBrokerDataDirectory();
665 }
666 return this.directory;
667 }
668
669 // interesting bit here is proof that DB is ok
670 public void checkpoint(boolean sync) throws IOException {
671 // by pass TransactionContext to avoid IO Exception handler
672 Connection connection = null;
673 try {
674 connection = getDataSource().getConnection();
675 } catch (SQLException e) {
676 LOG.debug("Could not get JDBC connection for checkpoint: " + e);
677 throw IOExceptionSupport.create(e);
678 } finally {
679 if (connection != null) {
680 try {
681 connection.close();
682 } catch (Throwable ignored) {
683 }
684 }
685 }
686 }
687
688 public long size(){
689 return 0;
690 }
691
692 public long getLockKeepAlivePeriod() {
693 return lockKeepAlivePeriod;
694 }
695
696 public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
697 this.lockKeepAlivePeriod = lockKeepAlivePeriod;
698 }
699
700 public long getLockAcquireSleepInterval() {
701 return lockAcquireSleepInterval;
702 }
703
704 /**
705 * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
706 * not applied if DataBaseLocker is injected.
707 */
708 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
709 this.lockAcquireSleepInterval = lockAcquireSleepInterval;
710 }
711
712 /**
713 * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
714 * This allowable dirty isolation level may not be achievable in clustered DB environments
715 * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
716 * see isolation level constants in {@link java.sql.Connection}
717 * @param transactionIsolation the isolation level to use
718 */
719 public void setTransactionIsolation(int transactionIsolation) {
720 this.transactionIsolation = transactionIsolation;
721 }
722
723 public int getMaxProducersToAudit() {
724 return maxProducersToAudit;
725 }
726
727 public void setMaxProducersToAudit(int maxProducersToAudit) {
728 this.maxProducersToAudit = maxProducersToAudit;
729 }
730
731 public int getMaxAuditDepth() {
732 return maxAuditDepth;
733 }
734
735 public void setMaxAuditDepth(int maxAuditDepth) {
736 this.maxAuditDepth = maxAuditDepth;
737 }
738
739 public boolean isEnableAudit() {
740 return enableAudit;
741 }
742
743 public void setEnableAudit(boolean enableAudit) {
744 this.enableAudit = enableAudit;
745 }
746
747 public int getAuditRecoveryDepth() {
748 return auditRecoveryDepth;
749 }
750
751 public void setAuditRecoveryDepth(int auditRecoveryDepth) {
752 this.auditRecoveryDepth = auditRecoveryDepth;
753 }
754
755 public long getNextSequenceId() {
756 synchronized(sequenceGenerator) {
757 return sequenceGenerator.getNextSequenceId();
758 }
759 }
760
761 public int getMaxRows() {
762 return maxRows;
763 }
764
765 /*
766 * the max rows return from queries, with sparse selectors this may need to be increased
767 */
768 public void setMaxRows(int maxRows) {
769 this.maxRows = maxRows;
770 }
771 }