001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.scheduler;
018
019 import org.apache.activemq.util.IOHelper;
020 import org.apache.activemq.util.ServiceStopper;
021 import org.apache.activemq.util.ServiceSupport;
022 import org.apache.kahadb.index.BTreeIndex;
023 import org.apache.kahadb.journal.Journal;
024 import org.apache.kahadb.journal.Location;
025 import org.apache.kahadb.page.Page;
026 import org.apache.kahadb.page.PageFile;
027 import org.apache.kahadb.page.Transaction;
028 import org.apache.kahadb.util.ByteSequence;
029 import org.apache.kahadb.util.IntegerMarshaller;
030 import org.apache.kahadb.util.LockFile;
031 import org.apache.kahadb.util.StringMarshaller;
032 import org.apache.kahadb.util.VariableMarshaller;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 import java.io.DataInput;
037 import java.io.DataOutput;
038 import java.io.File;
039 import java.io.IOException;
040 import java.util.ArrayList;
041 import java.util.HashMap;
042 import java.util.HashSet;
043 import java.util.Iterator;
044 import java.util.List;
045 import java.util.Map;
046 import java.util.Map.Entry;
047 import java.util.Set;
048
049 public class JobSchedulerStore extends ServiceSupport {
050 static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052
053 public static final int CLOSED_STATE = 1;
054 public static final int OPEN_STATE = 2;
055
056 private File directory;
057 PageFile pageFile;
058 private Journal journal;
059 private LockFile lockFile;
060 private boolean failIfDatabaseIsLocked;
061 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063 private boolean enableIndexWriteAsync = false;
064 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065 MetaData metaData = new MetaData(this);
066 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067 Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068
069 protected class MetaData {
070 protected MetaData(JobSchedulerStore store) {
071 this.store = store;
072 }
073 private final JobSchedulerStore store;
074 Page<MetaData> page;
075 BTreeIndex<Integer, Integer> journalRC;
076 BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077
078 void createIndexes(Transaction tx) throws IOException {
079 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081 }
082
083 void load(Transaction tx) throws IOException {
084 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086 this.storedSchedulers.load(tx);
087 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089 this.journalRC.load(tx);
090 }
091
092 void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093 for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094 Entry<String, JobSchedulerImpl> entry = i.next();
095 entry.getValue().load(tx);
096 schedulers.put(entry.getKey(), entry.getValue());
097 }
098 }
099
100 public void read(DataInput is) throws IOException {
101 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107 }
108
109 public void write(DataOutput os) throws IOException {
110 os.writeLong(this.storedSchedulers.getPageId());
111 os.writeLong(this.journalRC.getPageId());
112
113 }
114 }
115
116 class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117 private final JobSchedulerStore store;
118
119 MetaDataMarshaller(JobSchedulerStore store) {
120 this.store = store;
121 }
122 public MetaData readPayload(DataInput dataIn) throws IOException {
123 MetaData rc = new MetaData(this.store);
124 rc.read(dataIn);
125 return rc;
126 }
127
128 public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
129 object.write(dataOut);
130 }
131 }
132
133 class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
134 public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
135 List<JobLocation> result = new ArrayList<JobLocation>();
136 int size = dataIn.readInt();
137 for (int i = 0; i < size; i++) {
138 JobLocation jobLocation = new JobLocation();
139 jobLocation.readExternal(dataIn);
140 result.add(jobLocation);
141 }
142 return result;
143 }
144
145 public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
146 dataOut.writeInt(value.size());
147 for (JobLocation jobLocation : value) {
148 jobLocation.writeExternal(dataOut);
149 }
150 }
151 }
152
153 class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
154 private final JobSchedulerStore store;
155 JobSchedulerMarshaller(JobSchedulerStore store) {
156 this.store = store;
157 }
158 public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
159 JobSchedulerImpl result = new JobSchedulerImpl(this.store);
160 result.read(dataIn);
161 return result;
162 }
163
164 public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
165 js.write(dataOut);
166 }
167 }
168
169 public File getDirectory() {
170 return directory;
171 }
172
173 public void setDirectory(File directory) {
174 this.directory = directory;
175 }
176
177 public long size() {
178 if ( !isStarted() ) {
179 return 0;
180 }
181 try {
182 return journal.getDiskSize() + pageFile.getDiskSize();
183 } catch (IOException e) {
184 throw new RuntimeException(e);
185 }
186 }
187
188 public JobScheduler getJobScheduler(final String name) throws Exception {
189 JobSchedulerImpl result = this.schedulers.get(name);
190 if (result == null) {
191 final JobSchedulerImpl js = new JobSchedulerImpl(this);
192 js.setName(name);
193 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
194 public void execute(Transaction tx) throws IOException {
195 js.createIndexes(tx);
196 js.load(tx);
197 metaData.storedSchedulers.put(tx, name, js);
198 }
199 });
200 result = js;
201 this.schedulers.put(name, js);
202 if (isStarted()) {
203 result.start();
204 }
205 this.pageFile.flush();
206 }
207 return result;
208 }
209
210 synchronized public boolean removeJobScheduler(final String name) throws Exception {
211 boolean result = false;
212 final JobSchedulerImpl js = this.schedulers.remove(name);
213 result = js != null;
214 if (result) {
215 js.stop();
216 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
217 public void execute(Transaction tx) throws IOException {
218 metaData.storedSchedulers.remove(tx, name);
219 js.destroy(tx);
220 }
221 });
222 }
223 return result;
224 }
225
226 @Override
227 protected synchronized void doStart() throws Exception {
228 if (this.directory == null) {
229 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
230 }
231 IOHelper.mkdirs(this.directory);
232 lock();
233 this.journal = new Journal();
234 this.journal.setDirectory(directory);
235 this.journal.setMaxFileLength(getJournalMaxFileLength());
236 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
237 this.journal.start();
238 this.pageFile = new PageFile(directory, "scheduleDB");
239 this.pageFile.setWriteBatchSize(1);
240 this.pageFile.load();
241
242 this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
243 public void execute(Transaction tx) throws IOException {
244 if (pageFile.getPageCount() == 0) {
245 Page<MetaData> page = tx.allocate();
246 assert page.getPageId() == 0;
247 page.set(metaData);
248 metaData.page = page;
249 metaData.createIndexes(tx);
250 tx.store(metaData.page, metaDataMarshaller, true);
251
252 } else {
253 Page<MetaData> page = tx.load(0, metaDataMarshaller);
254 metaData = page.get();
255 metaData.page = page;
256 }
257 metaData.load(tx);
258 metaData.loadScheduler(tx, schedulers);
259 for (JobSchedulerImpl js :schedulers.values()) {
260 try {
261 js.start();
262 } catch (Exception e) {
263 JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
264 }
265 }
266 }
267 });
268
269 this.pageFile.flush();
270 LOG.info(this + " started");
271 }
272
273 @Override
274 protected synchronized void doStop(ServiceStopper stopper) throws Exception {
275 for (JobSchedulerImpl js : this.schedulers.values()) {
276 js.stop();
277 }
278 if (this.pageFile != null) {
279 this.pageFile.unload();
280 }
281 if (this.journal != null) {
282 journal.close();
283 }
284 if (this.lockFile != null) {
285 this.lockFile.unlock();
286 }
287 this.lockFile = null;
288 LOG.info(this + " stopped");
289
290 }
291
292 synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
293 int logId = location.getDataFileId();
294 Integer val = this.metaData.journalRC.get(tx, logId);
295 int refCount = val != null ? val.intValue() + 1 : 1;
296 this.metaData.journalRC.put(tx, logId, refCount);
297
298 }
299
300 synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
301 int logId = location.getDataFileId();
302 int refCount = this.metaData.journalRC.get(tx, logId);
303 refCount--;
304 if (refCount <= 0) {
305 this.metaData.journalRC.remove(tx, logId);
306 Set<Integer> set = new HashSet<Integer>();
307 set.add(logId);
308 this.journal.removeDataFiles(set);
309 } else {
310 this.metaData.journalRC.put(tx, logId, refCount);
311 }
312
313 }
314
315 synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
316 ByteSequence result = null;
317 result = this.journal.read(location);
318 return result;
319 }
320
321 synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
322 return this.journal.write(payload, sync);
323 }
324
325 private void lock() throws IOException {
326 if (lockFile == null) {
327 File lockFileName = new File(directory, "lock");
328 lockFile = new LockFile(lockFileName, true);
329 if (failIfDatabaseIsLocked) {
330 lockFile.lock();
331 } else {
332 while (true) {
333 try {
334 lockFile.lock();
335 break;
336 } catch (IOException e) {
337 LOG.info("Database " + lockFileName + " is locked... waiting "
338 + (DATABASE_LOCKED_WAIT_DELAY / 1000)
339 + " seconds for the database to be unlocked. Reason: " + e);
340 try {
341 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
342 } catch (InterruptedException e1) {
343 }
344 }
345 }
346 }
347 }
348 }
349
350 PageFile getPageFile() {
351 this.pageFile.isLoaded();
352 return this.pageFile;
353 }
354
355 public boolean isFailIfDatabaseIsLocked() {
356 return failIfDatabaseIsLocked;
357 }
358
359 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
360 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
361 }
362
363 public int getJournalMaxFileLength() {
364 return journalMaxFileLength;
365 }
366
367 public void setJournalMaxFileLength(int journalMaxFileLength) {
368 this.journalMaxFileLength = journalMaxFileLength;
369 }
370
371 public int getJournalMaxWriteBatchSize() {
372 return journalMaxWriteBatchSize;
373 }
374
375 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
376 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
377 }
378
379 public boolean isEnableIndexWriteAsync() {
380 return enableIndexWriteAsync;
381 }
382
383 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
384 this.enableIndexWriteAsync = enableIndexWriteAsync;
385 }
386
387 @Override
388 public String toString() {
389 return "JobSchedulerStore:" + this.directory;
390 }
391
392 }