001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.HashSet;
022 import java.util.Iterator;
023 import java.util.Set;
024 import java.util.concurrent.ConcurrentHashMap;
025 import java.util.concurrent.atomic.AtomicLong;
026
027 import org.apache.activemq.broker.BrokerService;
028 import org.apache.activemq.broker.BrokerServiceAware;
029 import org.apache.activemq.broker.ConnectionContext;
030 import org.apache.activemq.command.ActiveMQDestination;
031 import org.apache.activemq.command.ActiveMQQueue;
032 import org.apache.activemq.command.ActiveMQTopic;
033 import org.apache.activemq.command.Message;
034 import org.apache.activemq.command.MessageId;
035 import org.apache.activemq.command.ProducerId;
036 import org.apache.activemq.kaha.CommandMarshaller;
037 import org.apache.activemq.kaha.ContainerId;
038 import org.apache.activemq.kaha.ListContainer;
039 import org.apache.activemq.kaha.MapContainer;
040 import org.apache.activemq.kaha.Marshaller;
041 import org.apache.activemq.kaha.MessageIdMarshaller;
042 import org.apache.activemq.kaha.MessageMarshaller;
043 import org.apache.activemq.kaha.Store;
044 import org.apache.activemq.kaha.StoreFactory;
045 import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
046 import org.apache.activemq.openwire.OpenWireFormat;
047 import org.apache.activemq.store.MessageStore;
048 import org.apache.activemq.store.PersistenceAdapter;
049 import org.apache.activemq.store.TopicMessageStore;
050 import org.apache.activemq.store.TransactionStore;
051 import org.apache.activemq.usage.SystemUsage;
052 import org.apache.activemq.util.IOHelper;
053 import org.slf4j.Logger;
054 import org.slf4j.LoggerFactory;
055
056 /**
057 * @org.apache.xbean.XBean
058 *
059 */
060 public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
061
062 private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
063 private static final Logger LOG = LoggerFactory.getLogger(KahaPersistenceAdapter.class);
064 private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
065
066 protected OpenWireFormat wireFormat = new OpenWireFormat();
067 protected KahaTransactionStore transactionStore;
068 protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
069 protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
070 protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
071
072 private long maxDataFileLength = 32 * 1024 * 1024;
073 private File directory;
074 private String brokerName;
075 private Store theStore;
076 private boolean initialized;
077 private final AtomicLong storeSize;
078 private boolean persistentIndex = true;
079 private BrokerService brokerService;
080
081
082 public KahaPersistenceAdapter(AtomicLong size) {
083 this.storeSize=size;
084 }
085
086 public KahaPersistenceAdapter() {
087 this(new AtomicLong());
088 }
089
090 public Set<ActiveMQDestination> getDestinations() {
091 Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
092 try {
093 Store store = getStore();
094 for (Iterator i = store.getMapContainerIds().iterator(); i.hasNext();) {
095 ContainerId id = (ContainerId)i.next();
096 Object obj = id.getKey();
097 if (obj instanceof ActiveMQDestination) {
098 rc.add((ActiveMQDestination)obj);
099 }
100 }
101 } catch (IOException e) {
102 LOG.error("Failed to get destinations ", e);
103 }
104 return rc;
105 }
106
107 public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
108 MessageStore rc = queues.get(destination);
109 if (rc == null) {
110 rc = new KahaMessageStore(getMapContainer(destination, "queue-data"), destination);
111 messageStores.put(destination, rc);
112 if (transactionStore != null) {
113 rc = transactionStore.proxy(rc);
114 }
115 queues.put(destination, rc);
116 }
117 return rc;
118 }
119
120 public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
121 throws IOException {
122 TopicMessageStore rc = topics.get(destination);
123 if (rc == null) {
124 Store store = getStore();
125 MapContainer messageContainer = getMapContainer(destination, "topic-data");
126 MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions",
127 "topic-subs");
128 ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(),
129 "topic-acks");
130 ackContainer.setMarshaller(new TopicSubAckMarshaller());
131 rc = new KahaTopicMessageStore(store, messageContainer, ackContainer, subsContainer, destination);
132 messageStores.put(destination, rc);
133 if (transactionStore != null) {
134 rc = transactionStore.proxy(rc);
135 }
136 topics.put(destination, rc);
137 }
138 return rc;
139 }
140
141 /**
142 * Cleanup method to remove any state associated with the given destination
143 *
144 * @param destination Destination to forget
145 */
146 public void removeQueueMessageStore(ActiveMQQueue destination) {
147 queues.remove(destination);
148 try{
149 if(theStore!=null){
150 theStore.deleteMapContainer(destination,"queue-data");
151 }
152 }catch(IOException e ){
153 LOG.error("Failed to remove store map container for queue:"+destination, e);
154 }
155 }
156
157 /**
158 * Cleanup method to remove any state associated with the given destination
159 *
160 * @param destination Destination to forget
161 */
162 public void removeTopicMessageStore(ActiveMQTopic destination) {
163 topics.remove(destination);
164 }
165
166 protected MessageStore retrieveMessageStore(Object id) {
167 MessageStore result = messageStores.get(id);
168 return result;
169 }
170
171 public TransactionStore createTransactionStore() throws IOException {
172 if (transactionStore == null) {
173 while (true) {
174 try {
175 Store store = getStore();
176 MapContainer container = store
177 .getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions");
178 container.setKeyMarshaller(new CommandMarshaller(wireFormat));
179 container.setValueMarshaller(new TransactionMarshaller(wireFormat));
180 container.load();
181 transactionStore = new KahaTransactionStore(this, container);
182 transactionStore.setBrokerService(brokerService);
183 break;
184 } catch (StoreLockedExcpetion e) {
185 LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
186 + " seconds for the Store to be unlocked.");
187 try {
188 Thread.sleep(STORE_LOCKED_WAIT_DELAY);
189 } catch (InterruptedException e1) {
190 }
191 }
192 }
193 }
194 return transactionStore;
195 }
196
197 public void beginTransaction(ConnectionContext context) {
198 }
199
200 public void commitTransaction(ConnectionContext context) throws IOException {
201 if (theStore != null) {
202 theStore.force();
203 }
204 }
205
206 public void rollbackTransaction(ConnectionContext context) {
207 }
208
209 public void start() throws Exception {
210 initialize();
211 }
212
213 public void stop() throws Exception {
214 if (theStore != null) {
215 theStore.close();
216 }
217 }
218
219 public long getLastMessageBrokerSequenceId() throws IOException {
220 return 0;
221 }
222
223 public void deleteAllMessages() throws IOException {
224 if (theStore != null) {
225 if (theStore.isInitialized()) {
226 theStore.clear();
227 } else {
228 theStore.delete();
229 }
230 } else {
231 StoreFactory.delete(getStoreDirectory());
232 }
233 }
234
235 protected MapContainer<MessageId, Message> getMapContainer(Object id, String containerName)
236 throws IOException {
237 Store store = getStore();
238 MapContainer<MessageId, Message> container = store.getMapContainer(id, containerName);
239 container.setKeyMarshaller(new MessageIdMarshaller());
240 container.setValueMarshaller(new MessageMarshaller(wireFormat));
241 container.load();
242 return container;
243 }
244
245 protected MapContainer getSubsMapContainer(Object id, String containerName)
246 throws IOException {
247 Store store = getStore();
248 MapContainer container = store.getMapContainer(id, containerName);
249 container.setKeyMarshaller(Store.STRING_MARSHALLER);
250 container.setValueMarshaller(createMessageMarshaller());
251 container.load();
252 return container;
253 }
254
255 protected Marshaller<Object> createMessageMarshaller() {
256 return new CommandMarshaller(wireFormat);
257 }
258
259 protected ListContainer<TopicSubAck> getListContainer(Object id, String containerName) throws IOException {
260 Store store = getStore();
261 ListContainer<TopicSubAck> container = store.getListContainer(id, containerName);
262 container.setMarshaller(createMessageMarshaller());
263 container.load();
264 return container;
265 }
266
267 /**
268 * @param usageManager The UsageManager that is controlling the broker's
269 * memory usage.
270 */
271 public void setUsageManager(SystemUsage usageManager) {
272 }
273
274 /**
275 * @return the maxDataFileLength
276 */
277 public long getMaxDataFileLength() {
278 return maxDataFileLength;
279 }
280
281 public boolean isPersistentIndex() {
282 return persistentIndex;
283 }
284
285 public void setPersistentIndex(boolean persistentIndex) {
286 this.persistentIndex = persistentIndex;
287 }
288
289 /**
290 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
291 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
292 */
293 public void setMaxDataFileLength(long maxDataFileLength) {
294 this.maxDataFileLength = maxDataFileLength;
295 }
296
297 protected final synchronized Store getStore() throws IOException {
298 if (theStore == null) {
299 theStore = createStore();
300 }
301 return theStore;
302 }
303
304 protected final Store createStore() throws IOException {
305 Store result = StoreFactory.open(getStoreDirectory(), "rw",storeSize);
306 result.setMaxDataFileLength(maxDataFileLength);
307 result.setPersistentIndex(isPersistentIndex());
308 result.setDefaultContainerName("container-roots");
309 return result;
310 }
311
312 private String getStoreName() {
313 initialize();
314 return directory.getAbsolutePath();
315 }
316
317 private File getStoreDirectory() {
318 initialize();
319 return directory;
320 }
321
322 public String toString() {
323 return "KahaPersistenceAdapter(" + getStoreName() + ")";
324 }
325
326 public void setBrokerName(String brokerName) {
327 this.brokerName = brokerName;
328 }
329
330 public String getBrokerName() {
331 return brokerName;
332 }
333
334 public File getDirectory() {
335 return this.directory;
336 }
337
338 public void setDirectory(File directory) {
339 this.directory = directory;
340 }
341
342 public void checkpoint(boolean sync) throws IOException {
343 if (sync) {
344 getStore().force();
345 }
346 }
347
348 public long size(){
349 return storeSize.get();
350 }
351
352 private void initialize() {
353 if (!initialized) {
354 initialized = true;
355 if (this.directory == null) {
356 File file = new File(IOHelper.getDefaultDataDirectory());
357 file = new File(file, IOHelper.toFileSystemSafeName(brokerName) + "-kahastore");
358 setDirectory(file);
359 }
360 try {
361 IOHelper.mkdirs(this.directory);
362 } catch (IOException e) {
363 throw new RuntimeException(e);
364 }
365 wireFormat.setCacheEnabled(false);
366 wireFormat.setTightEncodingEnabled(true);
367 }
368 }
369
370 public void setBrokerService(BrokerService brokerService) {
371 this.brokerService = brokerService;
372 }
373
374 public long getLastProducerSequenceId(ProducerId id) {
375 // reference store send has adequate duplicate suppression
376 return -1;
377 }
378
379
380 }