001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import org.apache.activemq.broker.BrokerService;
020 import org.apache.activemq.broker.BrokerServiceAware;
021 import org.apache.activemq.broker.ConnectionContext;
022 import org.apache.activemq.command.ActiveMQDestination;
023 import org.apache.activemq.command.ActiveMQQueue;
024 import org.apache.activemq.command.ActiveMQTopic;
025 import org.apache.activemq.command.LocalTransactionId;
026 import org.apache.activemq.command.ProducerId;
027 import org.apache.activemq.command.TransactionId;
028 import org.apache.activemq.command.XATransactionId;
029 import org.apache.activemq.protobuf.Buffer;
030 import org.apache.activemq.store.MessageStore;
031 import org.apache.activemq.store.PersistenceAdapter;
032 import org.apache.activemq.store.TopicMessageStore;
033 import org.apache.activemq.store.TransactionStore;
034 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
035 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
036 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
037 import org.apache.activemq.usage.SystemUsage;
038
039 import java.io.File;
040 import java.io.IOException;
041 import java.util.Set;
042
043 /**
044 * An implementation of {@link PersistenceAdapter} designed for use with
045 * KahaDB - Embedded Lightweight Non-Relational Database
046 *
047 * @org.apache.xbean.XBean element="kahaDB"
048 *
049 */
050 public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
051 private final KahaDBStore letter = new KahaDBStore();
052
053 /**
054 * @param context
055 * @throws IOException
056 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
057 */
058 public void beginTransaction(ConnectionContext context) throws IOException {
059 this.letter.beginTransaction(context);
060 }
061
062 /**
063 * @param sync
064 * @throws IOException
065 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
066 */
067 public void checkpoint(boolean sync) throws IOException {
068 this.letter.checkpoint(sync);
069 }
070
071 /**
072 * @param context
073 * @throws IOException
074 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
075 */
076 public void commitTransaction(ConnectionContext context) throws IOException {
077 this.letter.commitTransaction(context);
078 }
079
080 /**
081 * @param destination
082 * @return MessageStore
083 * @throws IOException
084 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
085 */
086 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
087 return this.letter.createQueueMessageStore(destination);
088 }
089
090 /**
091 * @param destination
092 * @return TopicMessageStore
093 * @throws IOException
094 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
095 */
096 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
097 return this.letter.createTopicMessageStore(destination);
098 }
099
100 /**
101 * @return TransactionStore
102 * @throws IOException
103 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
104 */
105 public TransactionStore createTransactionStore() throws IOException {
106 return this.letter.createTransactionStore();
107 }
108
109 /**
110 * @throws IOException
111 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
112 */
113 public void deleteAllMessages() throws IOException {
114 this.letter.deleteAllMessages();
115 }
116
117 /**
118 * @return destinations
119 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
120 */
121 public Set<ActiveMQDestination> getDestinations() {
122 return this.letter.getDestinations();
123 }
124
125 /**
126 * @return lastMessageBrokerSequenceId
127 * @throws IOException
128 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
129 */
130 public long getLastMessageBrokerSequenceId() throws IOException {
131 return this.letter.getLastMessageBrokerSequenceId();
132 }
133
134 public long getLastProducerSequenceId(ProducerId id) throws IOException {
135 return this.letter.getLastProducerSequenceId(id);
136 }
137
138 /**
139 * @param destination
140 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
141 */
142 public void removeQueueMessageStore(ActiveMQQueue destination) {
143 this.letter.removeQueueMessageStore(destination);
144 }
145
146 /**
147 * @param destination
148 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
149 */
150 public void removeTopicMessageStore(ActiveMQTopic destination) {
151 this.letter.removeTopicMessageStore(destination);
152 }
153
154 /**
155 * @param context
156 * @throws IOException
157 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
158 */
159 public void rollbackTransaction(ConnectionContext context) throws IOException {
160 this.letter.rollbackTransaction(context);
161 }
162
163 /**
164 * @param brokerName
165 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
166 */
167 public void setBrokerName(String brokerName) {
168 this.letter.setBrokerName(brokerName);
169 }
170
171 /**
172 * @param usageManager
173 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
174 */
175 public void setUsageManager(SystemUsage usageManager) {
176 this.letter.setUsageManager(usageManager);
177 }
178
179 /**
180 * @return the size of the store
181 * @see org.apache.activemq.store.PersistenceAdapter#size()
182 */
183 public long size() {
184 return this.letter.size();
185 }
186
187 /**
188 * @throws Exception
189 * @see org.apache.activemq.Service#start()
190 */
191 public void start() throws Exception {
192 this.letter.start();
193 }
194
195 /**
196 * @throws Exception
197 * @see org.apache.activemq.Service#stop()
198 */
199 public void stop() throws Exception {
200 this.letter.stop();
201 }
202
203 /**
204 * Get the journalMaxFileLength
205 *
206 * @return the journalMaxFileLength
207 */
208 public int getJournalMaxFileLength() {
209 return this.letter.getJournalMaxFileLength();
210 }
211
212 /**
213 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
214 * be used
215 *
216 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
217 */
218 public void setJournalMaxFileLength(int journalMaxFileLength) {
219 this.letter.setJournalMaxFileLength(journalMaxFileLength);
220 }
221
222 /**
223 * Set the max number of producers (LRU cache) to track for duplicate sends
224 */
225 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
226 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
227 }
228
229 public int getMaxFailoverProducersToTrack() {
230 return this.letter.getMaxFailoverProducersToTrack();
231 }
232
233 /**
234 * set the audit window depth for duplicate suppression (should exceed the max transaction
235 * batch)
236 */
237 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
238 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
239 }
240
241 public int getFailoverProducersAuditDepth() {
242 return this.letter.getFailoverProducersAuditDepth();
243 }
244
245 /**
246 * Get the checkpointInterval
247 *
248 * @return the checkpointInterval
249 */
250 public long getCheckpointInterval() {
251 return this.letter.getCheckpointInterval();
252 }
253
254 /**
255 * Set the checkpointInterval
256 *
257 * @param checkpointInterval
258 * the checkpointInterval to set
259 */
260 public void setCheckpointInterval(long checkpointInterval) {
261 this.letter.setCheckpointInterval(checkpointInterval);
262 }
263
264 /**
265 * Get the cleanupInterval
266 *
267 * @return the cleanupInterval
268 */
269 public long getCleanupInterval() {
270 return this.letter.getCleanupInterval();
271 }
272
273 /**
274 * Set the cleanupInterval
275 *
276 * @param cleanupInterval
277 * the cleanupInterval to set
278 */
279 public void setCleanupInterval(long cleanupInterval) {
280 this.letter.setCleanupInterval(cleanupInterval);
281 }
282
283 /**
284 * Get the indexWriteBatchSize
285 *
286 * @return the indexWriteBatchSize
287 */
288 public int getIndexWriteBatchSize() {
289 return this.letter.getIndexWriteBatchSize();
290 }
291
292 /**
293 * Set the indexWriteBatchSize
294 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
295 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
296 * @param indexWriteBatchSize
297 * the indexWriteBatchSize to set
298 */
299 public void setIndexWriteBatchSize(int indexWriteBatchSize) {
300 this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
301 }
302
303 /**
304 * Get the journalMaxWriteBatchSize
305 *
306 * @return the journalMaxWriteBatchSize
307 */
308 public int getJournalMaxWriteBatchSize() {
309 return this.letter.getJournalMaxWriteBatchSize();
310 }
311
312 /**
313 * Set the journalMaxWriteBatchSize
314 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
315 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
316 * @param journalMaxWriteBatchSize
317 * the journalMaxWriteBatchSize to set
318 */
319 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
320 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
321 }
322
323 /**
324 * Get the enableIndexWriteAsync
325 *
326 * @return the enableIndexWriteAsync
327 */
328 public boolean isEnableIndexWriteAsync() {
329 return this.letter.isEnableIndexWriteAsync();
330 }
331
332 /**
333 * Set the enableIndexWriteAsync
334 *
335 * @param enableIndexWriteAsync
336 * the enableIndexWriteAsync to set
337 */
338 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
339 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
340 }
341
342 /**
343 * Get the directory
344 *
345 * @return the directory
346 */
347 public File getDirectory() {
348 return this.letter.getDirectory();
349 }
350
351 /**
352 * @param dir
353 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
354 */
355 public void setDirectory(File dir) {
356 this.letter.setDirectory(dir);
357 }
358
359 /**
360 * Get the enableJournalDiskSyncs
361 *
362 * @return the enableJournalDiskSyncs
363 */
364 public boolean isEnableJournalDiskSyncs() {
365 return this.letter.isEnableJournalDiskSyncs();
366 }
367
368 /**
369 * Set the enableJournalDiskSyncs
370 *
371 * @param enableJournalDiskSyncs
372 * the enableJournalDiskSyncs to set
373 */
374 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
375 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
376 }
377
378 /**
379 * Get the indexCacheSize
380 *
381 * @return the indexCacheSize
382 */
383 public int getIndexCacheSize() {
384 return this.letter.getIndexCacheSize();
385 }
386
387 /**
388 * Set the indexCacheSize
389 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
390 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
391 * @param indexCacheSize
392 * the indexCacheSize to set
393 */
394 public void setIndexCacheSize(int indexCacheSize) {
395 this.letter.setIndexCacheSize(indexCacheSize);
396 }
397
398 /**
399 * Get the ignoreMissingJournalfiles
400 *
401 * @return the ignoreMissingJournalfiles
402 */
403 public boolean isIgnoreMissingJournalfiles() {
404 return this.letter.isIgnoreMissingJournalfiles();
405 }
406
407 /**
408 * Set the ignoreMissingJournalfiles
409 *
410 * @param ignoreMissingJournalfiles
411 * the ignoreMissingJournalfiles to set
412 */
413 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
414 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
415 }
416
417 public boolean isChecksumJournalFiles() {
418 return letter.isChecksumJournalFiles();
419 }
420
421 public boolean isCheckForCorruptJournalFiles() {
422 return letter.isCheckForCorruptJournalFiles();
423 }
424
425 public void setChecksumJournalFiles(boolean checksumJournalFiles) {
426 letter.setChecksumJournalFiles(checksumJournalFiles);
427 }
428
429 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
430 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
431 }
432
433 public void setBrokerService(BrokerService brokerService) {
434 letter.setBrokerService(brokerService);
435 }
436
437 public boolean isArchiveDataLogs() {
438 return letter.isArchiveDataLogs();
439 }
440
441 public void setArchiveDataLogs(boolean archiveDataLogs) {
442 letter.setArchiveDataLogs(archiveDataLogs);
443 }
444
445 public File getDirectoryArchive() {
446 return letter.getDirectoryArchive();
447 }
448
449 public void setDirectoryArchive(File directoryArchive) {
450 letter.setDirectoryArchive(directoryArchive);
451 }
452
453 public boolean isConcurrentStoreAndDispatchQueues() {
454 return letter.isConcurrentStoreAndDispatchQueues();
455 }
456
457 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
458 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
459 }
460
461 public boolean isConcurrentStoreAndDispatchTopics() {
462 return letter.isConcurrentStoreAndDispatchTopics();
463 }
464
465 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
466 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
467 }
468
469 public int getMaxAsyncJobs() {
470 return letter.getMaxAsyncJobs();
471 }
472 /**
473 * @param maxAsyncJobs
474 * the maxAsyncJobs to set
475 */
476 public void setMaxAsyncJobs(int maxAsyncJobs) {
477 letter.setMaxAsyncJobs(maxAsyncJobs);
478 }
479
480 /**
481 * @return the databaseLockedWaitDelay
482 */
483 public int getDatabaseLockedWaitDelay() {
484 return letter.getDatabaseLockedWaitDelay();
485 }
486
487 /**
488 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
489 */
490 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
491 letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
492 }
493
494 public boolean getForceRecoverIndex() {
495 return letter.getForceRecoverIndex();
496 }
497
498 public void setForceRecoverIndex(boolean forceRecoverIndex) {
499 letter.setForceRecoverIndex(forceRecoverIndex);
500 }
501
502 public boolean isArchiveCorruptedIndex() {
503 return letter.isArchiveCorruptedIndex();
504 }
505
506 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
507 letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
508 }
509
510 /**
511 * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
512 * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true
513 */
514 public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
515 letter.setRewriteOnRedelivery(rewriteOnRedelivery);
516 }
517
518 public boolean isRewriteOnRedelivery() {
519 return letter.isRewriteOnRedelivery();
520 }
521
522 public float getIndexLFUEvictionFactor() {
523 return letter.getIndexLFUEvictionFactor();
524 }
525
526 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
527 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
528 }
529
530 public boolean isUseIndexLFRUEviction() {
531 return letter.isUseIndexLFRUEviction();
532 }
533
534 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
535 letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
536 }
537
538 public void setEnableIndexDiskSyncs(boolean diskSyncs) {
539 letter.setEnableIndexDiskSyncs(diskSyncs);
540 }
541
542 public boolean isEnableIndexDiskSyncs() {
543 return letter.isEnableIndexDiskSyncs();
544 }
545
546 public void setEnableIndexRecoveryFile(boolean enable) {
547 letter.setEnableIndexRecoveryFile(enable);
548 }
549
550 public boolean isEnableIndexRecoveryFile() {
551 return letter.isEnableIndexRecoveryFile();
552 }
553
554 public void setEnableIndexPageCaching(boolean enable) {
555 letter.setEnableIndexPageCaching(enable);
556 }
557
558 public boolean isEnableIndexPageCaching() {
559 return letter.isEnableIndexPageCaching();
560 }
561
562 public KahaDBStore getStore() {
563 return letter;
564 }
565
566 public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
567 if (txid == null) {
568 return null;
569 }
570 KahaTransactionInfo rc = new KahaTransactionInfo();
571
572 if (txid.isLocalTransaction()) {
573 LocalTransactionId t = (LocalTransactionId) txid;
574 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
575 kahaTxId.setConnectionId(t.getConnectionId().getValue());
576 kahaTxId.setTransacitonId(t.getValue());
577 rc.setLocalTransacitonId(kahaTxId);
578 } else {
579 XATransactionId t = (XATransactionId) txid;
580 KahaXATransactionId kahaTxId = new KahaXATransactionId();
581 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
582 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
583 kahaTxId.setFormatId(t.getFormatId());
584 rc.setXaTransacitonId(kahaTxId);
585 }
586 return rc;
587 }
588
589 @Override
590 public String toString() {
591 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
592 return "KahaDBPersistenceAdapter[" + path + "]";
593 }
594
595 }