001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.io.RandomAccessFile;
022 import java.nio.channels.FileLock;
023 import java.util.Date;
024 import java.util.HashSet;
025 import java.util.Iterator;
026 import java.util.Map;
027 import java.util.Set;
028 import java.util.concurrent.ConcurrentHashMap;
029 import java.util.concurrent.atomic.AtomicLong;
030
031 import org.apache.activemq.kaha.ContainerId;
032 import org.apache.activemq.kaha.ListContainer;
033 import org.apache.activemq.kaha.MapContainer;
034 import org.apache.activemq.kaha.Store;
035 import org.apache.activemq.kaha.StoreLocation;
036 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
037 import org.apache.activemq.kaha.impl.async.DataManagerFacade;
038 import org.apache.activemq.kaha.impl.container.ListContainerImpl;
039 import org.apache.activemq.kaha.impl.container.MapContainerImpl;
040 import org.apache.activemq.kaha.impl.data.DataManagerImpl;
041 import org.apache.activemq.kaha.impl.data.Item;
042 import org.apache.activemq.kaha.impl.data.RedoListener;
043 import org.apache.activemq.kaha.impl.index.IndexItem;
044 import org.apache.activemq.kaha.impl.index.IndexManager;
045 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
046 import org.apache.activemq.util.IOHelper;
047 import org.slf4j.Logger;
048 import org.slf4j.LoggerFactory;
049
050 /**
051 * Store Implementation
052 *
053 *
054 */
055 public class KahaStore implements Store {
056
057 private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store";
058 private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
059 + ".FileLockBroken",
060 "false"));
061 private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
062 + ".DisableLocking",
063 "false"));
064 //according to the String javadoc, all constant strings are interned so this will be the same object throughout the vm
065 //and we can use it as a monitor for the lockset.
066 private final static String LOCKSET_MONITOR = PROPERTY_PREFIX + ".Lock.Monitor";
067 private static final Logger LOG = LoggerFactory.getLogger(KahaStore.class);
068
069 private final File directory;
070 private final String mode;
071 private IndexRootContainer mapsContainer;
072 private IndexRootContainer listsContainer;
073 private final Map<ContainerId, ListContainerImpl> lists = new ConcurrentHashMap<ContainerId, ListContainerImpl>();
074 private final Map<ContainerId, MapContainerImpl> maps = new ConcurrentHashMap<ContainerId, MapContainerImpl>();
075 private final Map<String, DataManager> dataManagers = new ConcurrentHashMap<String, DataManager>();
076 private final Map<String, IndexManager> indexManagers = new ConcurrentHashMap<String, IndexManager>();
077 private boolean closed;
078 private boolean initialized;
079 private boolean logIndexChanges;
080 private boolean useAsyncDataManager;
081 private long maxDataFileLength = 1024 * 1024 * 32;
082 private FileLock lock;
083 private boolean persistentIndex = true;
084 private RandomAccessFile lockFile;
085 private final AtomicLong storeSize;
086 private String defaultContainerName = DEFAULT_CONTAINER_NAME;
087
088
089 public KahaStore(String name, String mode) throws IOException {
090 this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, new AtomicLong());
091 }
092
093 public KahaStore(File directory, String mode) throws IOException {
094 this(directory, mode, new AtomicLong());
095 }
096
097 public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
098 this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, storeSize);
099 }
100
101 public KahaStore(File directory, String mode, AtomicLong storeSize) throws IOException {
102 this.mode = mode;
103 this.storeSize = storeSize;
104 this.directory = directory;
105 IOHelper.mkdirs(this.directory);
106 }
107
108 public synchronized void close() throws IOException {
109 if (!closed) {
110 closed = true;
111 if (initialized) {
112 unlock();
113 for (ListContainerImpl container : lists.values()) {
114 container.close();
115 }
116 lists.clear();
117 for (MapContainerImpl container : maps.values()) {
118 container.close();
119 }
120 maps.clear();
121 for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
122 IndexManager im = iter.next();
123 im.close();
124 iter.remove();
125 }
126 for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
127 DataManager dm = iter.next();
128 dm.close();
129 iter.remove();
130 }
131 }
132 if (lockFile!=null) {
133 lockFile.close();
134 lockFile=null;
135 }
136 }
137 }
138
139 public synchronized void force() throws IOException {
140 if (initialized) {
141 for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
142 IndexManager im = iter.next();
143 im.force();
144 }
145 for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
146 DataManager dm = iter.next();
147 dm.force();
148 }
149 }
150 }
151
152 public synchronized void clear() throws IOException {
153 initialize();
154 for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
155 ContainerId id = (ContainerId)i.next();
156 MapContainer container = getMapContainer(id.getKey(), id.getDataContainerName());
157 container.clear();
158 }
159 for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
160 ContainerId id = (ContainerId)i.next();
161 ListContainer container = getListContainer(id.getKey(), id.getDataContainerName());
162 container.clear();
163 }
164
165 }
166
167 public synchronized boolean delete() throws IOException {
168 boolean result = true;
169 if (initialized) {
170 clear();
171 for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
172 IndexManager im = iter.next();
173 result &= im.delete();
174 iter.remove();
175 }
176 for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
177 DataManager dm = iter.next();
178 result &= dm.delete();
179 iter.remove();
180 }
181 }
182 if (directory != null && directory.isDirectory()) {
183 result =IOHelper.deleteChildren(directory);
184 String str = result ? "successfully deleted" : "failed to delete";
185 LOG.info("Kaha Store " + str + " data directory " + directory);
186 }
187 return result;
188 }
189
190 public synchronized boolean isInitialized() {
191 return initialized;
192 }
193
194 public boolean doesMapContainerExist(Object id) throws IOException {
195 return doesMapContainerExist(id, defaultContainerName);
196 }
197
198 public synchronized boolean doesMapContainerExist(Object id, String containerName) throws IOException {
199 initialize();
200 ContainerId containerId = new ContainerId(id, containerName);
201 return maps.containsKey(containerId) || mapsContainer.doesRootExist(containerId);
202 }
203
204 public MapContainer getMapContainer(Object id) throws IOException {
205 return getMapContainer(id, defaultContainerName);
206 }
207
208 public MapContainer getMapContainer(Object id, String containerName) throws IOException {
209 return getMapContainer(id, containerName, persistentIndex);
210 }
211
212 public synchronized MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex)
213 throws IOException {
214 initialize();
215 ContainerId containerId = new ContainerId(id, containerName);
216 MapContainerImpl result = maps.get(containerId);
217 if (result == null) {
218 DataManager dm = getDataManager(containerName);
219 IndexManager im = getIndexManager(dm, containerName);
220
221 IndexItem root = mapsContainer.getRoot(im, containerId);
222 if (root == null) {
223 root = mapsContainer.addRoot(im, containerId);
224 }
225 result = new MapContainerImpl(directory, containerId, root, im, dm, persistentIndex);
226 maps.put(containerId, result);
227 }
228 return result;
229 }
230
231 public void deleteMapContainer(Object id) throws IOException {
232 deleteMapContainer(id, defaultContainerName);
233 }
234
235 public void deleteMapContainer(Object id, String containerName) throws IOException {
236 ContainerId containerId = new ContainerId(id, containerName);
237 deleteMapContainer(containerId);
238 }
239
240 public synchronized void deleteMapContainer(ContainerId containerId) throws IOException {
241 initialize();
242 MapContainerImpl container = maps.remove(containerId);
243 if (container != null) {
244 container.clear();
245 mapsContainer.removeRoot(container.getIndexManager(), containerId);
246 container.close();
247 }
248 }
249
250 public synchronized Set<ContainerId> getMapContainerIds() throws IOException {
251 initialize();
252 Set<ContainerId> set = new HashSet<ContainerId>();
253 for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
254 ContainerId id = (ContainerId)i.next();
255 set.add(id);
256 }
257 return set;
258 }
259
260 public boolean doesListContainerExist(Object id) throws IOException {
261 return doesListContainerExist(id, defaultContainerName);
262 }
263
264 public synchronized boolean doesListContainerExist(Object id, String containerName) throws IOException {
265 initialize();
266 ContainerId containerId = new ContainerId(id, containerName);
267 return lists.containsKey(containerId) || listsContainer.doesRootExist(containerId);
268 }
269
270 public ListContainer getListContainer(Object id) throws IOException {
271 return getListContainer(id, defaultContainerName);
272 }
273
274 public ListContainer getListContainer(Object id, String containerName) throws IOException {
275 return getListContainer(id, containerName, persistentIndex);
276 }
277
278 public synchronized ListContainer getListContainer(Object id, String containerName,
279 boolean persistentIndex) throws IOException {
280 initialize();
281 ContainerId containerId = new ContainerId(id, containerName);
282 ListContainerImpl result = lists.get(containerId);
283 if (result == null) {
284 DataManager dm = getDataManager(containerName);
285 IndexManager im = getIndexManager(dm, containerName);
286
287 IndexItem root = listsContainer.getRoot(im, containerId);
288 if (root == null) {
289 root = listsContainer.addRoot(im, containerId);
290 }
291 result = new ListContainerImpl(containerId, root, im, dm, persistentIndex);
292 lists.put(containerId, result);
293 }
294 return result;
295 }
296
297 public void deleteListContainer(Object id) throws IOException {
298 deleteListContainer(id, defaultContainerName);
299 }
300
301 public synchronized void deleteListContainer(Object id, String containerName) throws IOException {
302 ContainerId containerId = new ContainerId(id, containerName);
303 deleteListContainer(containerId);
304 }
305
306 public synchronized void deleteListContainer(ContainerId containerId) throws IOException {
307 initialize();
308 ListContainerImpl container = lists.remove(containerId);
309 if (container != null) {
310 listsContainer.removeRoot(container.getIndexManager(), containerId);
311 container.clear();
312 container.close();
313 }
314 }
315
316 public synchronized Set<ContainerId> getListContainerIds() throws IOException {
317 initialize();
318 Set<ContainerId> set = new HashSet<ContainerId>();
319 for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
320 ContainerId id = (ContainerId)i.next();
321 set.add(id);
322 }
323 return set;
324 }
325
326 /**
327 * @return the listsContainer
328 */
329 public IndexRootContainer getListsContainer() {
330 return this.listsContainer;
331 }
332
333 /**
334 * @return the mapsContainer
335 */
336 public IndexRootContainer getMapsContainer() {
337 return this.mapsContainer;
338 }
339
340 public synchronized DataManager getDataManager(String name) throws IOException {
341 DataManager dm = dataManagers.get(name);
342 if (dm == null) {
343 if (isUseAsyncDataManager()) {
344 AsyncDataManager t = new AsyncDataManager(storeSize);
345 t.setDirectory(directory);
346 t.setFilePrefix("async-data-" + name + "-");
347 t.setMaxFileLength((int)maxDataFileLength);
348 t.start();
349 dm = new DataManagerFacade(t, name);
350 } else {
351 DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
352 t.setMaxFileLength(maxDataFileLength);
353 dm = t;
354 }
355 if (logIndexChanges) {
356 recover(dm);
357 }
358 dataManagers.put(name, dm);
359 }
360 return dm;
361 }
362
363 public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
364 IndexManager im = indexManagers.get(name);
365 if (im == null) {
366 im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
367 indexManagers.put(name, im);
368 }
369 return im;
370 }
371
372 private void recover(final DataManager dm) throws IOException {
373 dm.recoverRedoItems(new RedoListener() {
374 public void onRedoItem(StoreLocation item, Object o) throws Exception {
375 RedoStoreIndexItem redo = (RedoStoreIndexItem)o;
376 // IndexManager im = getIndexManager(dm, redo.getIndexName());
377 IndexManager im = getIndexManager(dm, dm.getName());
378 im.redo(redo);
379 }
380 });
381 }
382
383 public synchronized boolean isLogIndexChanges() {
384 return logIndexChanges;
385 }
386
387 public synchronized void setLogIndexChanges(boolean logIndexChanges) {
388 this.logIndexChanges = logIndexChanges;
389 }
390
391 /**
392 * @return the maxDataFileLength
393 */
394 public synchronized long getMaxDataFileLength() {
395 return maxDataFileLength;
396 }
397
398 /**
399 * @param maxDataFileLength the maxDataFileLength to set
400 */
401 public synchronized void setMaxDataFileLength(long maxDataFileLength) {
402 this.maxDataFileLength = maxDataFileLength;
403 }
404
405 /**
406 * @return the default index type
407 */
408 public synchronized String getIndexTypeAsString() {
409 return persistentIndex ? "PERSISTENT" : "VM";
410 }
411
412 /**
413 * Set the default index type
414 *
415 * @param type "PERSISTENT" or "VM"
416 */
417 public synchronized void setIndexTypeAsString(String type) {
418 if (type.equalsIgnoreCase("VM")) {
419 persistentIndex = false;
420 } else {
421 persistentIndex = true;
422 }
423 }
424
425 public boolean isPersistentIndex() {
426 return persistentIndex;
427 }
428
429 public void setPersistentIndex(boolean persistentIndex) {
430 this.persistentIndex = persistentIndex;
431 }
432
433
434 public synchronized boolean isUseAsyncDataManager() {
435 return useAsyncDataManager;
436 }
437
438 public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
439 this.useAsyncDataManager = useAsyncWriter;
440 }
441
442 /**
443 * @return size of store
444 * @see org.apache.activemq.kaha.Store#size()
445 */
446 public long size(){
447 return storeSize.get();
448 }
449
450 public String getDefaultContainerName() {
451 return defaultContainerName;
452 }
453
454 public void setDefaultContainerName(String defaultContainerName) {
455 this.defaultContainerName = defaultContainerName;
456 }
457
458 public synchronized void initialize() throws IOException {
459 if (closed) {
460 throw new IOException("Store has been closed.");
461 }
462 if (!initialized) {
463 LOG.info("Kaha Store using data directory " + directory);
464 lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
465 lock();
466 DataManager defaultDM = getDataManager(defaultContainerName);
467 IndexManager rootIndexManager = getIndexManager(defaultDM, defaultContainerName);
468 IndexItem mapRoot = new IndexItem();
469 IndexItem listRoot = new IndexItem();
470 if (rootIndexManager.isEmpty()) {
471 mapRoot.setOffset(0);
472 rootIndexManager.storeIndex(mapRoot);
473 listRoot.setOffset(IndexItem.INDEX_SIZE);
474 rootIndexManager.storeIndex(listRoot);
475 rootIndexManager.setLength(IndexItem.INDEX_SIZE * 2);
476 } else {
477 mapRoot = rootIndexManager.getIndex(0);
478 listRoot = rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
479 }
480 initialized = true;
481 mapsContainer = new IndexRootContainer(mapRoot, rootIndexManager, defaultDM);
482 listsContainer = new IndexRootContainer(listRoot, rootIndexManager, defaultDM);
483 /**
484 * Add interest in data files - then consolidate them
485 */
486 generateInterestInMapDataFiles();
487 generateInterestInListDataFiles();
488 for (Iterator<DataManager> i = dataManagers.values().iterator(); i.hasNext();) {
489 DataManager dm = i.next();
490 dm.consolidateDataFiles();
491 }
492 }
493 }
494
495 private void lock() throws IOException {
496 synchronized (LOCKSET_MONITOR) {
497 if (!DISABLE_LOCKING && directory != null && lock == null) {
498 String key = getPropertyKey();
499 String property = System.getProperty(key);
500 if (null == property) {
501 if (!BROKEN_FILE_LOCK) {
502 lock = lockFile.getChannel().tryLock(0, Math.max(1, lockFile.getChannel().size()), false);
503 if (lock == null) {
504 throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by another application");
505 } else
506 System.setProperty(key, new Date().toString());
507 }
508 } else { //already locked
509 throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application.");
510 }
511 }
512 }
513 }
514
515 private void unlock() throws IOException {
516 synchronized (LOCKSET_MONITOR) {
517 if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
518 System.getProperties().remove(getPropertyKey());
519 if (lock.isValid()) {
520 lock.release();
521 }
522 lock = null;
523 }
524 }
525 }
526
527
528 private String getPropertyKey() throws IOException {
529 return getClass().getName() + ".lock." + directory.getCanonicalPath();
530 }
531
532 /**
533 * scans the directory and builds up the IndexManager and DataManager
534 *
535 * @throws IOException if there is a problem accessing an index or data file
536 */
537 private void generateInterestInListDataFiles() throws IOException {
538 for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
539 ContainerId id = (ContainerId)i.next();
540 DataManager dm = getDataManager(id.getDataContainerName());
541 IndexManager im = getIndexManager(dm, id.getDataContainerName());
542 IndexItem theRoot = listsContainer.getRoot(im, id);
543 long nextItem = theRoot.getNextItem();
544 while (nextItem != Item.POSITION_NOT_SET) {
545 IndexItem item = im.getIndex(nextItem);
546 item.setOffset(nextItem);
547 dm.addInterestInFile(item.getKeyFile());
548 dm.addInterestInFile(item.getValueFile());
549 nextItem = item.getNextItem();
550 }
551 }
552 }
553
554 /**
555 * scans the directory and builds up the IndexManager and DataManager
556 *
557 * @throws IOException if there is a problem accessing an index or data file
558 */
559 private void generateInterestInMapDataFiles() throws IOException {
560 for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
561 ContainerId id = (ContainerId)i.next();
562 DataManager dm = getDataManager(id.getDataContainerName());
563 IndexManager im = getIndexManager(dm, id.getDataContainerName());
564 IndexItem theRoot = mapsContainer.getRoot(im, id);
565 long nextItem = theRoot.getNextItem();
566 while (nextItem != Item.POSITION_NOT_SET) {
567 IndexItem item = im.getIndex(nextItem);
568 item.setOffset(nextItem);
569 dm.addInterestInFile(item.getKeyFile());
570 dm.addInterestInFile(item.getValueFile());
571 nextItem = item.getNextItem();
572 }
573
574 }
575 }
576 }