001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.File;
020 import java.io.FileFilter;
021 import java.io.IOException;
022 import java.nio.charset.Charset;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.LinkedList;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Set;
029 import org.apache.activemq.broker.BrokerService;
030 import org.apache.activemq.broker.BrokerServiceAware;
031 import org.apache.activemq.broker.ConnectionContext;
032 import org.apache.activemq.command.ActiveMQDestination;
033 import org.apache.activemq.command.ActiveMQQueue;
034 import org.apache.activemq.command.ActiveMQTopic;
035 import org.apache.activemq.command.LocalTransactionId;
036 import org.apache.activemq.command.ProducerId;
037 import org.apache.activemq.command.TransactionId;
038 import org.apache.activemq.command.XATransactionId;
039 import org.apache.activemq.filter.AnyDestination;
040 import org.apache.activemq.filter.DestinationMap;
041 import org.apache.activemq.protobuf.Buffer;
042 import org.apache.activemq.store.MessageStore;
043 import org.apache.activemq.store.PersistenceAdapter;
044 import org.apache.activemq.store.TopicMessageStore;
045 import org.apache.activemq.store.TransactionStore;
046 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
047 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
048 import org.apache.activemq.usage.SystemUsage;
049 import org.apache.activemq.util.IOHelper;
050 import org.apache.activemq.util.IntrospectionSupport;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 /**
055 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports
056 * distribution of destinations across multiple kahaDB persistence adapters
057 *
058 * @org.apache.xbean.XBean element="mKahaDB"
059 */
060 public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter, BrokerServiceAware {
061 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
062
063 final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
064 final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
065
066 BrokerService brokerService;
067 List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
068 private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
069
070 MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
071
072 // all local store transactions are XA, 2pc if more than one adapter involved
073 TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
074 @Override
075 public KahaTransactionInfo transform(TransactionId txid) {
076 if (txid == null) {
077 return null;
078 }
079 KahaTransactionInfo rc = new KahaTransactionInfo();
080 KahaXATransactionId kahaTxId = new KahaXATransactionId();
081 if (txid.isLocalTransaction()) {
082 LocalTransactionId t = (LocalTransactionId) txid;
083 kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
084 kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
085 kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
086 } else {
087 XATransactionId t = (XATransactionId) txid;
088 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
089 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
090 kahaTxId.setFormatId(t.getFormatId());
091 }
092 rc.setXaTransacitonId(kahaTxId);
093 return rc;
094 }
095 };
096
097 /**
098 * Sets the FilteredKahaDBPersistenceAdapter entries
099 *
100 * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
101 */
102 @SuppressWarnings({ "rawtypes", "unchecked" })
103 public void setFilteredPersistenceAdapters(List entries) {
104 for (Object entry : entries) {
105 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
106 KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
107 if (filteredAdapter.getDestination() == null) {
108 filteredAdapter.setDestination(matchAll);
109 }
110
111 if (filteredAdapter.isPerDestination()) {
112 configureDirectory(adapter, null);
113 // per destination adapters will be created on demand or during recovery
114 continue;
115 } else {
116 configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
117 }
118
119 configureAdapter(adapter);
120 adapters.add(adapter);
121 }
122 super.setEntries(entries);
123 }
124
125 private String nameFromDestinationFilter(ActiveMQDestination destination) {
126 return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
127 }
128
129 public boolean isLocalXid(TransactionId xid) {
130 return xid instanceof XATransactionId &&
131 ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
132 }
133
134 public void beginTransaction(ConnectionContext context) throws IOException {
135 throw new IllegalStateException();
136 }
137
138 public void checkpoint(final boolean sync) throws IOException {
139 for (PersistenceAdapter persistenceAdapter : adapters) {
140 persistenceAdapter.checkpoint(sync);
141 }
142 }
143
144 public void commitTransaction(ConnectionContext context) throws IOException {
145 throw new IllegalStateException();
146 }
147
148 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
149 PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
150 return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
151 }
152
153 private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) {
154 Object result = this.chooseValue(destination);
155 if (result == null) {
156 throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
157 }
158 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
159 if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
160 result = addAdapter(filteredAdapter, destination);
161 startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(), destination.getQualifiedName());
162 if (LOG.isTraceEnabled()) {
163 LOG.info("created per destination adapter for: " + destination + ", " + result);
164 }
165 }
166 return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
167 }
168
169 private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
170 try {
171 kahaDBPersistenceAdapter.start();
172 } catch (Exception e) {
173 RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
174 LOG.error(detail.toString(), e);
175 throw detail;
176 }
177 }
178
179 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
180 PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
181 return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
182 }
183
184 public TransactionStore createTransactionStore() throws IOException {
185 return transactionStore;
186 }
187
188 public void deleteAllMessages() throws IOException {
189 for (PersistenceAdapter persistenceAdapter : adapters) {
190 persistenceAdapter.deleteAllMessages();
191 }
192 transactionStore.deleteAllMessages();
193 IOHelper.deleteChildren(getDirectory());
194 }
195
196 public Set<ActiveMQDestination> getDestinations() {
197 Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
198 for (PersistenceAdapter persistenceAdapter : adapters) {
199 results.addAll(persistenceAdapter.getDestinations());
200 }
201 return results;
202 }
203
204 public long getLastMessageBrokerSequenceId() throws IOException {
205 long maxId = -1;
206 for (PersistenceAdapter persistenceAdapter : adapters) {
207 maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
208 }
209 return maxId;
210 }
211
212 public long getLastProducerSequenceId(ProducerId id) throws IOException {
213 long maxId = -1;
214 for (PersistenceAdapter persistenceAdapter : adapters) {
215 maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
216 }
217 return maxId;
218 }
219
220 public void removeQueueMessageStore(ActiveMQQueue destination) {
221 getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination);
222 }
223
224 public void removeTopicMessageStore(ActiveMQTopic destination) {
225 getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination);
226 }
227
228 public void rollbackTransaction(ConnectionContext context) throws IOException {
229 throw new IllegalStateException();
230 }
231
232 public void setBrokerName(String brokerName) {
233 for (PersistenceAdapter persistenceAdapter : adapters) {
234 persistenceAdapter.setBrokerName(brokerName);
235 }
236 }
237
238 public void setUsageManager(SystemUsage usageManager) {
239 for (PersistenceAdapter persistenceAdapter : adapters) {
240 persistenceAdapter.setUsageManager(usageManager);
241 }
242 }
243
244 public long size() {
245 long size = 0;
246 for (PersistenceAdapter persistenceAdapter : adapters) {
247 size += persistenceAdapter.size();
248 }
249 return size;
250 }
251
252 public void start() throws Exception {
253 Object result = this.chooseValue(matchAll);
254 if (result != null) {
255 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
256 if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
257 findAndRegisterExistingAdapters(filteredAdapter);
258 }
259 }
260 for (PersistenceAdapter persistenceAdapter : adapters) {
261 persistenceAdapter.start();
262 }
263 }
264
265 private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
266 FileFilter destinationNames = new FileFilter() {
267 @Override
268 public boolean accept(File file) {
269 return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
270 }
271 };
272 File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
273 if (candidates != null) {
274 for (File candidate : candidates) {
275 registerExistingAdapter(template, candidate);
276 }
277 }
278 }
279
280 private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) {
281 KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
282 startAdapter(adapter, candidate.getName());
283 registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]);
284 }
285
286 private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) {
287 KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
288 return registerAdapter(adapter, destination);
289 }
290
291 private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
292 KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
293 configureAdapter(adapter);
294 configureDirectory(adapter, destinationName);
295 return adapter;
296 }
297
298 private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
299 File directory = null;
300 if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
301 // not set so inherit from mkahadb
302 directory = getDirectory();
303 } else {
304 directory = adapter.getDirectory();
305 }
306 if (fileName != null) {
307 directory = new File(directory, fileName);
308 }
309 adapter.setDirectory(directory);
310 }
311
312 private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
313 adapters.add(adapter);
314 FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
315 put(destination, result);
316 return result;
317 }
318
319 private void configureAdapter(KahaDBPersistenceAdapter adapter) {
320 // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
321 adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
322 adapter.setBrokerService(getBrokerService());
323 }
324
325 private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
326 Map<String, Object> configuration = new HashMap<String, Object>();
327 IntrospectionSupport.getProperties(template, configuration, null);
328 KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
329 IntrospectionSupport.setProperties(adapter, configuration);
330 return adapter;
331 }
332
333 public void stop() throws Exception {
334 for (PersistenceAdapter persistenceAdapter : adapters) {
335 persistenceAdapter.stop();
336 }
337 }
338
339 public File getDirectory() {
340 return this.directory;
341 }
342
343 @Override
344 public void setDirectory(File directory) {
345 this.directory = directory;
346 }
347
348 public void setBrokerService(BrokerService brokerService) {
349 for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
350 persistenceAdapter.setBrokerService(brokerService);
351 }
352 this.brokerService = brokerService;
353 }
354
355 public BrokerService getBrokerService() {
356 return brokerService;
357 }
358
359 public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
360 this.transactionStore = transactionStore;
361 }
362
363 /**
364 * Set the max file length of the transaction journal
365 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
366 * be used
367 *
368 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
369 */
370 public void setJournalMaxFileLength(int maxFileLength) {
371 transactionStore.setJournalMaxFileLength(maxFileLength);
372 }
373
374 public int getJournalMaxFileLength() {
375 return transactionStore.getJournalMaxFileLength();
376 }
377
378 /**
379 * Set the max write batch size of the transaction journal
380 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
381 * be used
382 *
383 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
384 */
385 public void setJournalWriteBatchSize(int journalWriteBatchSize) {
386 transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
387 }
388
389 public int getJournalWriteBatchSize() {
390 return transactionStore.getJournalMaxWriteBatchSize();
391 }
392
393 @Override
394 public String toString() {
395 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
396 return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
397 }
398
399 }