001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.HashMap;
022 import java.util.HashSet;
023 import java.util.Iterator;
024 import java.util.Map;
025 import java.util.Set;
026 import java.util.concurrent.atomic.AtomicBoolean;
027 import java.util.concurrent.atomic.AtomicInteger;
028 import java.util.concurrent.atomic.AtomicLong;
029
030 import org.apache.activemq.broker.ConnectionContext;
031 import org.apache.activemq.command.ActiveMQDestination;
032 import org.apache.activemq.command.ActiveMQQueue;
033 import org.apache.activemq.command.ActiveMQTopic;
034 import org.apache.activemq.command.MessageId;
035 import org.apache.activemq.command.SubscriptionInfo;
036 import org.apache.activemq.command.TransactionId;
037 import org.apache.activemq.kaha.CommandMarshaller;
038 import org.apache.activemq.kaha.ListContainer;
039 import org.apache.activemq.kaha.MapContainer;
040 import org.apache.activemq.kaha.MessageIdMarshaller;
041 import org.apache.activemq.kaha.Store;
042 import org.apache.activemq.kaha.StoreFactory;
043 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
044 import org.apache.activemq.store.MessageStore;
045 import org.apache.activemq.store.ReferenceStore;
046 import org.apache.activemq.store.ReferenceStoreAdapter;
047 import org.apache.activemq.store.TopicMessageStore;
048 import org.apache.activemq.store.TopicReferenceStore;
049 import org.apache.activemq.store.amq.AMQTx;
050 import org.apache.activemq.util.IOHelper;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
055
056
057
058 private static final Logger LOG = LoggerFactory.getLogger(KahaReferenceStoreAdapter.class);
059 private static final String STORE_STATE = "store-state";
060 private static final String QUEUE_DATA = "queue-data";
061 private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
062 private static final Integer INDEX_VERSION = new Integer(7);
063 private static final String RECORD_REFERENCES = "record-references";
064 private static final String TRANSACTIONS = "transactions-state";
065 private MapContainer stateMap;
066 private MapContainer<TransactionId, AMQTx> preparedTransactions;
067 private Map<Integer, AtomicInteger> recordReferences = new HashMap<Integer, AtomicInteger>();
068 private ListContainer<SubscriptionInfo> durableSubscribers;
069 private boolean storeValid;
070 private Store stateStore;
071 private boolean persistentIndex = true;
072 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
073 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
074 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
075 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
076 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
077
078
079 public KahaReferenceStoreAdapter(AtomicLong size){
080 super(size);
081 }
082
083 public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
084 throw new RuntimeException("Use createQueueReferenceStore instead");
085 }
086
087 public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
088 throws IOException {
089 throw new RuntimeException("Use createTopicReferenceStore instead");
090 }
091
092 @Override
093 public synchronized void start() throws Exception {
094 super.start();
095 Store store = getStateStore();
096 boolean empty = store.getMapContainerIds().isEmpty();
097 stateMap = store.getMapContainer("state", STORE_STATE);
098 stateMap.load();
099 storeValid=true;
100 if (!empty) {
101 AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE);
102 if (status != null) {
103 storeValid = status.get();
104 }
105
106 if (storeValid) {
107 //check what version the indexes are at
108 Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
109 if (indexVersion==null || indexVersion.intValue() < INDEX_VERSION.intValue()) {
110 storeValid = false;
111 LOG.warn("Indexes at an older version - need to regenerate");
112 }
113 }
114 if (storeValid) {
115 if (stateMap.containsKey(RECORD_REFERENCES)) {
116 recordReferences = (Map<Integer, AtomicInteger>)stateMap.get(RECORD_REFERENCES);
117 }
118 }
119 }
120 stateMap.put(STORE_STATE, new AtomicBoolean());
121 stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
122 durableSubscribers = store.getListContainer("durableSubscribers");
123 durableSubscribers.setMarshaller(new CommandMarshaller());
124 preparedTransactions = store.getMapContainer("transactions", TRANSACTIONS, false);
125 // need to set the Marshallers here
126 preparedTransactions.setKeyMarshaller(Store.COMMAND_MARSHALLER);
127 preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat));
128 }
129
130 @Override
131 public synchronized void stop() throws Exception {
132 stateMap.put(RECORD_REFERENCES, recordReferences);
133 stateMap.put(STORE_STATE, new AtomicBoolean(true));
134 stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
135 if (this.stateStore != null) {
136 this.stateStore.close();
137 this.stateStore = null;
138 this.stateMap = null;
139 }
140 super.stop();
141 }
142
143 public void commitTransaction(ConnectionContext context) throws IOException {
144 //we don;t need to force on a commit - as the reference store
145 //is rebuilt on a non clean shutdown
146 }
147
148 public boolean isStoreValid() {
149 return storeValid;
150 }
151
152 public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
153 ReferenceStore rc = (ReferenceStore)queues.get(destination);
154 if (rc == null) {
155 rc = new KahaReferenceStore(this, getMapReferenceContainer(destination, QUEUE_DATA),
156 destination);
157 messageStores.put(destination, rc);
158 // if(transactionStore!=null){
159 // rc=transactionStore.proxy(rc);
160 // }
161 queues.put(destination, rc);
162 }
163 return rc;
164 }
165
166 public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
167 TopicReferenceStore rc = (TopicReferenceStore)topics.get(destination);
168 if (rc == null) {
169 Store store = getStore();
170 MapContainer messageContainer = getMapReferenceContainer(destination.getPhysicalName(), "topic-data");
171 MapContainer subsContainer = getSubsMapContainer(destination.getPhysicalName() + "-Subscriptions", "blob");
172 ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.getPhysicalName(), "topic-acks");
173 ackContainer.setMarshaller(new TopicSubAckMarshaller());
174 rc = new KahaTopicReferenceStore(store, this, messageContainer, ackContainer, subsContainer,
175 destination);
176 messageStores.put(destination, rc);
177 // if(transactionStore!=null){
178 // rc=transactionStore.proxy(rc);
179 // }
180 topics.put(destination, rc);
181 }
182 return rc;
183 }
184
185 public void removeReferenceStore(KahaReferenceStore referenceStore) {
186 ActiveMQDestination destination = referenceStore.getDestination();
187 if (destination.isQueue()) {
188 queues.remove(destination);
189 try {
190 getStore().deleteMapContainer(destination, QUEUE_DATA);
191 } catch (IOException e) {
192 LOG.error("Failed to delete " + QUEUE_DATA + " map container for destination: " + destination, e);
193 }
194 } else {
195 topics.remove(destination);
196 }
197 messageStores.remove(destination);
198 }
199 /*
200 public void buildReferenceFileIdsInUse() throws IOException {
201 recordReferences = new HashMap<Integer, AtomicInteger>();
202 Set<ActiveMQDestination> destinations = getDestinations();
203 for (ActiveMQDestination destination : destinations) {
204 if (destination.isQueue()) {
205 KahaReferenceStore store = (KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination);
206 store.addReferenceFileIdsInUse();
207 } else {
208 KahaTopicReferenceStore store = (KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination);
209 store.addReferenceFileIdsInUse();
210 }
211 }
212 }
213 */
214
215 protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id,
216 String containerName)
217 throws IOException {
218 Store store = getStore();
219 MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id, containerName,persistentIndex);
220 container.setIndexBinSize(getIndexBinSize());
221 container.setIndexKeySize(getIndexKeySize());
222 container.setIndexPageSize(getIndexPageSize());
223 container.setIndexMaxBinSize(getIndexMaxBinSize());
224 container.setIndexLoadFactor(getIndexLoadFactor());
225 container.setKeyMarshaller(new MessageIdMarshaller());
226 container.setValueMarshaller(new ReferenceRecordMarshaller());
227 container.load();
228 return container;
229 }
230
231 synchronized void addInterestInRecordFile(int recordNumber) {
232 Integer key = Integer.valueOf(recordNumber);
233 AtomicInteger rr = recordReferences.get(key);
234 if (rr == null) {
235 rr = new AtomicInteger();
236 recordReferences.put(key, rr);
237 }
238 rr.incrementAndGet();
239 }
240
241 synchronized void removeInterestInRecordFile(int recordNumber) {
242 Integer key = Integer.valueOf(recordNumber);
243 AtomicInteger rr = recordReferences.get(key);
244 if (rr != null && rr.decrementAndGet() <= 0) {
245 recordReferences.remove(key);
246 }
247 }
248
249 /**
250 * @return
251 * @throws IOException
252 * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
253 */
254 public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException {
255 Set inUse = new HashSet<Integer>(recordReferences.keySet());
256
257 Iterator<Map.Entry<Integer, Set<Integer>>> ackReferences = ackMessageFileMap.entrySet().iterator();
258 while (ackReferences.hasNext()) {
259 Map.Entry<Integer, Set<Integer>> ackReference = ackReferences.next();
260 if (!inUse.contains(ackReference.getKey())) {
261 // should we keep this data file
262 for (Integer referencedFileId : ackReference.getValue()) {
263 if (inUse.contains(referencedFileId)) {
264 // keep this ack file
265 inUse.add(ackReference.getKey());
266 LOG.debug("not removing data file: " + ackReference.getKey()
267 + " as contained ack(s) refer to referencedFileId file: " + ackReference.getValue());
268 break;
269 }
270 }
271 }
272 if (!inUse.contains(ackReference.getKey())) {
273 ackReferences.remove();
274 }
275 }
276
277 return inUse;
278 }
279
280 Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
281 public synchronized void recordAckFileReferences(int ackDataFileId, int messageFileId) {
282 Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackDataFileId));
283 if (referenceFileIds == null) {
284 referenceFileIds = new HashSet<Integer>();
285 referenceFileIds.add(Integer.valueOf(messageFileId));
286 ackMessageFileMap.put(Integer.valueOf(ackDataFileId), referenceFileIds);
287 } else {
288 Integer id = Integer.valueOf(messageFileId);
289 if (!referenceFileIds.contains(id)) {
290 referenceFileIds.add(id);
291 }
292 }
293 }
294
295 /**
296 *
297 * @throws IOException
298 * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
299 */
300 public void clearMessages() throws IOException {
301 //don't delete messages as it will clear state - call base
302 //class method to clear out the data instead
303 super.deleteAllMessages();
304 }
305
306 /**
307 *
308 * @throws IOException
309 * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
310 */
311
312 public void recoverState() throws IOException {
313 Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);
314 for (SubscriptionInfo info:set) {
315 LOG.info("Recovering subscriber state for durable subscriber: " + info);
316 TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
317 ts.addSubsciption(info, false);
318 }
319 }
320
321 public void recoverSubscription(SubscriptionInfo info) throws IOException {
322 TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
323 LOG.info("Recovering subscriber state for durable subscriber: " + info);
324 ts.addSubsciption(info, false);
325 }
326
327
328 public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException {
329 Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>();
330 preparedTransactions.load();
331 for (Iterator<TransactionId> i = preparedTransactions.keySet().iterator(); i.hasNext();) {
332 TransactionId key = i.next();
333 AMQTx value = preparedTransactions.get(key);
334 result.put(key, value);
335 }
336 return result;
337 }
338
339 public void savePreparedState(Map<TransactionId, AMQTx> map) throws IOException {
340 preparedTransactions.clear();
341 for (Iterator<Map.Entry<TransactionId, AMQTx>> iter = map.entrySet().iterator(); iter.hasNext();) {
342 Map.Entry<TransactionId, AMQTx> entry = iter.next();
343 preparedTransactions.put(entry.getKey(), entry.getValue());
344 }
345 }
346
347 @Override
348 public synchronized void setDirectory(File directory) {
349 File file = new File(directory, "data");
350 super.setDirectory(file);
351 this.stateStore = createStateStore(directory);
352 }
353
354 protected synchronized Store getStateStore() throws IOException {
355 if (this.stateStore == null) {
356 File stateDirectory = new File(getDirectory(), "kr-state");
357 IOHelper.mkdirs(stateDirectory);
358 this.stateStore = createStateStore(getDirectory());
359 }
360 return this.stateStore;
361 }
362
363 public void deleteAllMessages() throws IOException {
364 super.deleteAllMessages();
365 if (stateStore != null) {
366 if (stateStore.isInitialized()) {
367 stateStore.clear();
368 } else {
369 stateStore.delete();
370 }
371 } else {
372 File stateDirectory = new File(getDirectory(), "kr-state");
373 StoreFactory.delete(stateDirectory);
374 }
375 }
376
377 public boolean isPersistentIndex() {
378 return persistentIndex;
379 }
380
381 public void setPersistentIndex(boolean persistentIndex) {
382 this.persistentIndex = persistentIndex;
383 }
384
385 private Store createStateStore(File directory) {
386 File stateDirectory = new File(directory, "state");
387 try {
388 IOHelper.mkdirs(stateDirectory);
389 return StoreFactory.open(stateDirectory, "rw");
390 } catch (IOException e) {
391 LOG.error("Failed to create the state store", e);
392 }
393 return null;
394 }
395
396 protected void addSubscriberState(SubscriptionInfo info) throws IOException {
397 durableSubscribers.add(info);
398 }
399
400 protected void removeSubscriberState(SubscriptionInfo info) {
401 durableSubscribers.remove(info);
402 }
403
404 public int getIndexBinSize() {
405 return indexBinSize;
406 }
407
408 public void setIndexBinSize(int indexBinSize) {
409 this.indexBinSize = indexBinSize;
410 }
411
412 public int getIndexKeySize() {
413 return indexKeySize;
414 }
415
416 public void setIndexKeySize(int indexKeySize) {
417 this.indexKeySize = indexKeySize;
418 }
419
420 public int getIndexPageSize() {
421 return indexPageSize;
422 }
423
424 public void setIndexPageSize(int indexPageSize) {
425 this.indexPageSize = indexPageSize;
426 }
427
428 public int getIndexMaxBinSize() {
429 return indexMaxBinSize;
430 }
431
432 public void setIndexMaxBinSize(int maxBinSize) {
433 this.indexMaxBinSize = maxBinSize;
434 }
435
436 /**
437 * @return the loadFactor
438 */
439 public int getIndexLoadFactor() {
440 return indexLoadFactor;
441 }
442
443 /**
444 * @param loadFactor the loadFactor to set
445 */
446 public void setIndexLoadFactor(int loadFactor) {
447 this.indexLoadFactor = loadFactor;
448 }
449
450
451 }