001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb.plist;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.File;
022 import java.io.IOException;
023 import java.util.ArrayList;
024 import java.util.HashMap;
025 import java.util.Iterator;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Map.Entry;
029 import java.util.Set;
030
031 import org.apache.activemq.broker.BrokerService;
032 import org.apache.activemq.broker.BrokerServiceAware;
033 import org.apache.activemq.thread.Scheduler;
034 import org.apache.activemq.util.IOHelper;
035 import org.apache.activemq.util.ServiceStopper;
036 import org.apache.activemq.util.ServiceSupport;
037 import org.apache.kahadb.index.BTreeIndex;
038 import org.apache.kahadb.journal.Journal;
039 import org.apache.kahadb.journal.Location;
040 import org.apache.kahadb.page.Page;
041 import org.apache.kahadb.page.PageFile;
042 import org.apache.kahadb.page.Transaction;
043 import org.apache.kahadb.util.ByteSequence;
044 import org.apache.kahadb.util.LockFile;
045 import org.apache.kahadb.util.StringMarshaller;
046 import org.apache.kahadb.util.VariableMarshaller;
047 import org.slf4j.Logger;
048 import org.slf4j.LoggerFactory;
049
050 /**
051 * @org.apache.xbean.XBean
052 */
053 public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable {
054 static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
055 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
056
057 static final int CLOSED_STATE = 1;
058 static final int OPEN_STATE = 2;
059
060 private File directory;
061 PageFile pageFile;
062 private Journal journal;
063 private LockFile lockFile;
064 private boolean failIfDatabaseIsLocked;
065 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
066 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
067 private boolean enableIndexWriteAsync = false;
068 private boolean initialized = false;
069 private boolean lazyInit = true;
070 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
071 MetaData metaData = new MetaData(this);
072 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
073 Map<String, PList> persistentLists = new HashMap<String, PList>();
074 final Object indexLock = new Object();
075 private Scheduler scheduler;
076 private long cleanupInterval = 30000;
077
078 private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
079 private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
080 private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
081 private boolean indexEnablePageCaching = true;
082
083 public Object getIndexLock() {
084 return indexLock;
085 }
086
087 @Override
088 public void setBrokerService(BrokerService brokerService) {
089 this.scheduler = brokerService.getScheduler();
090 }
091
092 public int getIndexPageSize() {
093 return indexPageSize;
094 }
095
096 public int getIndexCacheSize() {
097 return indexCacheSize;
098 }
099
100 public int getIndexWriteBatchSize() {
101 return indexWriteBatchSize;
102 }
103
104 public void setIndexPageSize(int indexPageSize) {
105 this.indexPageSize = indexPageSize;
106 }
107
108 public void setIndexCacheSize(int indexCacheSize) {
109 this.indexCacheSize = indexCacheSize;
110 }
111
112 public void setIndexWriteBatchSize(int indexWriteBatchSize) {
113 this.indexWriteBatchSize = indexWriteBatchSize;
114 }
115
116 public boolean getIndexEnablePageCaching() {
117 return indexEnablePageCaching;
118 }
119
120 public void setIndexEnablePageCaching(boolean indexEnablePageCaching) {
121 this.indexEnablePageCaching = indexEnablePageCaching;
122 }
123
124 protected class MetaData {
125 protected MetaData(PListStore store) {
126 this.store = store;
127 }
128
129 private final PListStore store;
130 Page<MetaData> page;
131 BTreeIndex<String, PList> lists;
132
133 void createIndexes(Transaction tx) throws IOException {
134 this.lists = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
135 }
136
137 void load(Transaction tx) throws IOException {
138 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
139 this.lists.setValueMarshaller(new PListMarshaller(this.store));
140 this.lists.load(tx);
141 }
142
143 void loadLists(Transaction tx, Map<String, PList> lists) throws IOException {
144 for (Iterator<Entry<String, PList>> i = this.lists.iterator(tx); i.hasNext();) {
145 Entry<String, PList> entry = i.next();
146 entry.getValue().load(tx);
147 lists.put(entry.getKey(), entry.getValue());
148 }
149 }
150
151 public void read(DataInput is) throws IOException {
152 this.lists = new BTreeIndex<String, PList>(pageFile, is.readLong());
153 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
154 this.lists.setValueMarshaller(new PListMarshaller(this.store));
155 }
156
157 public void write(DataOutput os) throws IOException {
158 os.writeLong(this.lists.getPageId());
159 }
160 }
161
162 class MetaDataMarshaller extends VariableMarshaller<MetaData> {
163 private final PListStore store;
164
165 MetaDataMarshaller(PListStore store) {
166 this.store = store;
167 }
168 public MetaData readPayload(DataInput dataIn) throws IOException {
169 MetaData rc = new MetaData(this.store);
170 rc.read(dataIn);
171 return rc;
172 }
173
174 public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
175 object.write(dataOut);
176 }
177 }
178
179 class PListMarshaller extends VariableMarshaller<PList> {
180 private final PListStore store;
181 PListMarshaller(PListStore store) {
182 this.store = store;
183 }
184 public PList readPayload(DataInput dataIn) throws IOException {
185 PList result = new PList(this.store);
186 result.read(dataIn);
187 return result;
188 }
189
190 public void writePayload(PList list, DataOutput dataOut) throws IOException {
191 list.write(dataOut);
192 }
193 }
194
195 public Journal getJournal() {
196 return this.journal;
197 }
198
199 public File getDirectory() {
200 return directory;
201 }
202
203 public void setDirectory(File directory) {
204 this.directory = directory;
205 }
206
207 public long size() {
208 synchronized (this) {
209 if (!initialized) {
210 return 0;
211 }
212 }
213 try {
214 return journal.getDiskSize() + pageFile.getDiskSize();
215 } catch (IOException e) {
216 throw new RuntimeException(e);
217 }
218 }
219
220 public PList getPList(final String name) throws Exception {
221 if (!isStarted()) {
222 throw new IllegalStateException("Not started");
223 }
224 intialize();
225 synchronized (indexLock) {
226 synchronized (this) {
227 PList result = this.persistentLists.get(name);
228 if (result == null) {
229 final PList pl = new PList(this);
230 pl.setName(name);
231 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
232 public void execute(Transaction tx) throws IOException {
233 pl.setHeadPageId(tx.allocate().getPageId());
234 pl.load(tx);
235 metaData.lists.put(tx, name, pl);
236 }
237 });
238 result = pl;
239 this.persistentLists.put(name, pl);
240 }
241 final PList toLoad = result;
242 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
243 public void execute(Transaction tx) throws IOException {
244 toLoad.load(tx);
245 }
246 });
247
248 return result;
249 }
250 }
251 }
252
253 public boolean removePList(final String name) throws Exception {
254 boolean result = false;
255 synchronized (indexLock) {
256 synchronized (this) {
257 final PList pl = this.persistentLists.remove(name);
258 result = pl != null;
259 if (result) {
260 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
261 public void execute(Transaction tx) throws IOException {
262 metaData.lists.remove(tx, name);
263 pl.destroy();
264 }
265 });
266 }
267 }
268 }
269 return result;
270 }
271
272 protected synchronized void intialize() throws Exception {
273 if (isStarted()) {
274 if (this.initialized == false) {
275 if (this.directory == null) {
276 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
277 }
278 IOHelper.mkdirs(this.directory);
279 lock();
280 this.journal = new Journal();
281 this.journal.setDirectory(directory);
282 this.journal.setMaxFileLength(getJournalMaxFileLength());
283 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
284 this.journal.start();
285 this.pageFile = new PageFile(directory, "tmpDB");
286 this.pageFile.setEnablePageCaching(getIndexEnablePageCaching());
287 this.pageFile.setPageSize(getIndexPageSize());
288 this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
289 this.pageFile.setPageCacheSize(getIndexCacheSize());
290 this.pageFile.load();
291
292 this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
293 public void execute(Transaction tx) throws IOException {
294 if (pageFile.getPageCount() == 0) {
295 Page<MetaData> page = tx.allocate();
296 assert page.getPageId() == 0;
297 page.set(metaData);
298 metaData.page = page;
299 metaData.createIndexes(tx);
300 tx.store(metaData.page, metaDataMarshaller, true);
301
302 } else {
303 Page<MetaData> page = tx.load(0, metaDataMarshaller);
304 metaData = page.get();
305 metaData.page = page;
306 }
307 metaData.load(tx);
308 metaData.loadLists(tx, persistentLists);
309 }
310 });
311 this.pageFile.flush();
312
313 if (cleanupInterval > 0) {
314 if (scheduler == null) {
315 scheduler = new Scheduler(PListStore.class.getSimpleName());
316 scheduler.start();
317 }
318 scheduler.executePeriodically(this, cleanupInterval);
319 }
320 this.initialized = true;
321 LOG.info(this + " initialized");
322 }
323 }
324 }
325
326 @Override
327 protected synchronized void doStart() throws Exception {
328 if (!lazyInit) {
329 intialize();
330 }
331 LOG.info(this + " started");
332 }
333
334 @Override
335 protected synchronized void doStop(ServiceStopper stopper) throws Exception {
336 if (scheduler != null) {
337 if (PListStore.class.getSimpleName().equals(scheduler.getName())) {
338 scheduler.stop();
339 scheduler = null;
340 }
341 }
342 for (PList pl : this.persistentLists.values()) {
343 pl.unload(null);
344 }
345 if (this.pageFile != null) {
346 this.pageFile.unload();
347 }
348 if (this.journal != null) {
349 journal.close();
350 }
351 if (this.lockFile != null) {
352 this.lockFile.unlock();
353 }
354 this.lockFile = null;
355 this.initialized = false;
356 LOG.info(this + " stopped");
357
358 }
359
360 public void run() {
361 try {
362 if (isStopping()) {
363 return;
364 }
365 final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
366 final Set<Integer> candidates = journal.getFileMap().keySet();
367 LOG.trace("Full gc candidate set:" + candidates);
368 if (candidates.size() > 1) {
369 // prune current write
370 for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) {
371 if (iterator.next() >= lastJournalFileId) {
372 iterator.remove();
373 }
374 }
375 List<PList> plists = null;
376 synchronized (indexLock) {
377 synchronized (this) {
378 plists = new ArrayList<PList>(persistentLists.values());
379 }
380 }
381 for (PList list : plists) {
382 list.claimFileLocations(candidates);
383 if (isStopping()) {
384 return;
385 }
386 LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
387 }
388 LOG.trace("GC Candidate set:" + candidates);
389 this.journal.removeDataFiles(candidates);
390 }
391 } catch (IOException e) {
392 LOG.error("Exception on periodic cleanup: " + e, e);
393 }
394 }
395
396 ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
397 ByteSequence result = null;
398 result = this.journal.read(location);
399 return result;
400 }
401
402 Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
403 return this.journal.write(payload, sync);
404 }
405
406 private void lock() throws IOException {
407 if (lockFile == null) {
408 File lockFileName = new File(directory, "lock");
409 lockFile = new LockFile(lockFileName, true);
410 if (failIfDatabaseIsLocked) {
411 lockFile.lock();
412 } else {
413 while (true) {
414 try {
415 lockFile.lock();
416 break;
417 } catch (IOException e) {
418 LOG.info("Database " + lockFileName + " is locked... waiting "
419 + (DATABASE_LOCKED_WAIT_DELAY / 1000)
420 + " seconds for the database to be unlocked. Reason: " + e);
421 try {
422 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
423 } catch (InterruptedException e1) {
424 }
425 }
426 }
427 }
428 }
429 }
430
431 PageFile getPageFile() {
432 this.pageFile.isLoaded();
433 return this.pageFile;
434 }
435
436 public boolean isFailIfDatabaseIsLocked() {
437 return failIfDatabaseIsLocked;
438 }
439
440 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
441 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
442 }
443
444 public int getJournalMaxFileLength() {
445 return journalMaxFileLength;
446 }
447
448 public void setJournalMaxFileLength(int journalMaxFileLength) {
449 this.journalMaxFileLength = journalMaxFileLength;
450 }
451
452 public int getJournalMaxWriteBatchSize() {
453 return journalMaxWriteBatchSize;
454 }
455
456 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
457 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
458 }
459
460 public boolean isEnableIndexWriteAsync() {
461 return enableIndexWriteAsync;
462 }
463
464 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
465 this.enableIndexWriteAsync = enableIndexWriteAsync;
466 }
467
468 public long getCleanupInterval() {
469 return cleanupInterval;
470 }
471
472 public void setCleanupInterval(long cleanupInterval) {
473 this.cleanupInterval = cleanupInterval;
474 }
475
476 public boolean isLazyInit() {
477 return lazyInit;
478 }
479
480 public void setLazyInit(boolean lazyInit) {
481 this.lazyInit = lazyInit;
482 }
483
484 @Override
485 public String toString() {
486 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
487 return "PListStore:[" + path + " ]";
488 }
489 }