001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.journal;
018
019 import java.io.File;
020 import java.io.IOException;
021
022 import org.apache.activeio.journal.Journal;
023 import org.apache.activeio.journal.active.JournalImpl;
024 import org.apache.activeio.journal.active.JournalLockedException;
025 import org.apache.activemq.store.PersistenceAdapter;
026 import org.apache.activemq.store.PersistenceAdapterFactory;
027 import org.apache.activemq.store.jdbc.DataSourceSupport;
028 import org.apache.activemq.store.jdbc.JDBCAdapter;
029 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
030 import org.apache.activemq.store.jdbc.Statements;
031 import org.apache.activemq.thread.TaskRunnerFactory;
032 import org.slf4j.Logger;
033 import org.slf4j.LoggerFactory;
034
035 /**
036 * Factory class that can create PersistenceAdapter objects.
037 *
038 * @org.apache.xbean.XBean
039 *
040 */
041 public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
042
043 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
044
045 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class);
046
047 private int journalLogFileSize = 1024 * 1024 * 20;
048 private int journalLogFiles = 2;
049 private TaskRunnerFactory taskRunnerFactory;
050 private Journal journal;
051 private boolean useJournal = true;
052 private boolean useQuickJournal;
053 private File journalArchiveDirectory;
054 private boolean failIfJournalIsLocked;
055 private int journalThreadPriority = Thread.MAX_PRIORITY;
056 private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
057 private boolean useDedicatedTaskRunner;
058
059 public PersistenceAdapter createPersistenceAdapter() throws IOException {
060 jdbcPersistenceAdapter.setDataSource(getDataSource());
061
062 if (!useJournal) {
063 return jdbcPersistenceAdapter;
064 }
065 JournalPersistenceAdapter result = new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
066 result.setDirectory(getDataDirectoryFile());
067 return result;
068
069 }
070
071 public int getJournalLogFiles() {
072 return journalLogFiles;
073 }
074
075 /**
076 * Sets the number of journal log files to use
077 */
078 public void setJournalLogFiles(int journalLogFiles) {
079 this.journalLogFiles = journalLogFiles;
080 }
081
082 public int getJournalLogFileSize() {
083 return journalLogFileSize;
084 }
085
086 /**
087 * Sets the size of the journal log files
088 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
089 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
090 */
091 public void setJournalLogFileSize(int journalLogFileSize) {
092 this.journalLogFileSize = journalLogFileSize;
093 }
094
095 public JDBCPersistenceAdapter getJdbcAdapter() {
096 return jdbcPersistenceAdapter;
097 }
098
099 public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
100 this.jdbcPersistenceAdapter = jdbcAdapter;
101 }
102
103 public boolean isUseJournal() {
104 return useJournal;
105 }
106
107 /**
108 * Enables or disables the use of the journal. The default is to use the
109 * journal
110 *
111 * @param useJournal
112 */
113 public void setUseJournal(boolean useJournal) {
114 this.useJournal = useJournal;
115 }
116
117 public boolean isUseDedicatedTaskRunner() {
118 return useDedicatedTaskRunner;
119 }
120
121 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
122 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
123 }
124
125 public TaskRunnerFactory getTaskRunnerFactory() {
126 if (taskRunnerFactory == null) {
127 taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
128 true, 1000, isUseDedicatedTaskRunner());
129 }
130 return taskRunnerFactory;
131 }
132
133 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
134 this.taskRunnerFactory = taskRunnerFactory;
135 }
136
137 public Journal getJournal() throws IOException {
138 if (journal == null) {
139 createJournal();
140 }
141 return journal;
142 }
143
144 public void setJournal(Journal journal) {
145 this.journal = journal;
146 }
147
148 public File getJournalArchiveDirectory() {
149 if (journalArchiveDirectory == null && useQuickJournal) {
150 journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
151 }
152 return journalArchiveDirectory;
153 }
154
155 public void setJournalArchiveDirectory(File journalArchiveDirectory) {
156 this.journalArchiveDirectory = journalArchiveDirectory;
157 }
158
159 public boolean isUseQuickJournal() {
160 return useQuickJournal;
161 }
162
163 /**
164 * Enables or disables the use of quick journal, which keeps messages in the
165 * journal and just stores a reference to the messages in JDBC. Defaults to
166 * false so that messages actually reside long term in the JDBC database.
167 */
168 public void setUseQuickJournal(boolean useQuickJournal) {
169 this.useQuickJournal = useQuickJournal;
170 }
171
172 public JDBCAdapter getAdapter() throws IOException {
173 return jdbcPersistenceAdapter.getAdapter();
174 }
175
176 public void setAdapter(JDBCAdapter adapter) {
177 jdbcPersistenceAdapter.setAdapter(adapter);
178 }
179
180 public Statements getStatements() {
181 return jdbcPersistenceAdapter.getStatements();
182 }
183
184 public void setStatements(Statements statements) {
185 jdbcPersistenceAdapter.setStatements(statements);
186 }
187
188 public boolean isUseDatabaseLock() {
189 return jdbcPersistenceAdapter.isUseDatabaseLock();
190 }
191
192 /**
193 * Sets whether or not an exclusive database lock should be used to enable
194 * JDBC Master/Slave. Enabled by default.
195 */
196 public void setUseDatabaseLock(boolean useDatabaseLock) {
197 jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
198 }
199
200 public boolean isCreateTablesOnStartup() {
201 return jdbcPersistenceAdapter.isCreateTablesOnStartup();
202 }
203
204 /**
205 * Sets whether or not tables are created on startup
206 */
207 public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
208 jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
209 }
210
211 public int getJournalThreadPriority() {
212 return journalThreadPriority;
213 }
214
215 /**
216 * Sets the thread priority of the journal thread
217 */
218 public void setJournalThreadPriority(int journalThreadPriority) {
219 this.journalThreadPriority = journalThreadPriority;
220 }
221
222 /**
223 * @throws IOException
224 */
225 protected void createJournal() throws IOException {
226 File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
227 if (failIfJournalIsLocked) {
228 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
229 getJournalArchiveDirectory());
230 } else {
231 while (true) {
232 try {
233 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
234 getJournalArchiveDirectory());
235 break;
236 } catch (JournalLockedException e) {
237 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
238 + " seconds for the journal to be unlocked.");
239 try {
240 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
241 } catch (InterruptedException e1) {
242 }
243 }
244 }
245 }
246 }
247
248 }