001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl.index.tree;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.io.RandomAccessFile;
022 import java.util.concurrent.atomic.AtomicBoolean;
023 import org.apache.activemq.kaha.Marshaller;
024 import org.apache.activemq.kaha.StoreEntry;
025 import org.apache.activemq.kaha.impl.index.Index;
026 import org.apache.activemq.kaha.impl.index.IndexManager;
027 import org.apache.activemq.util.DataByteArrayInputStream;
028 import org.apache.activemq.util.DataByteArrayOutputStream;
029 import org.apache.activemq.util.IOHelper;
030 import org.apache.activemq.util.LRUCache;
031 import org.slf4j.Logger;
032 import org.slf4j.LoggerFactory;
033
034 /**
035 * BTree implementation
036 *
037 *
038 */
039 public class TreeIndex implements Index {
040
041 private static final String NAME_PREFIX = "tree-index-";
042 private static final int DEFAULT_PAGE_SIZE;
043 private static final int DEFAULT_KEY_SIZE;
044 private static final Logger LOG = LoggerFactory.getLogger(TreeIndex.class);
045 private final String name;
046 private File directory;
047 private File file;
048 private RandomAccessFile indexFile;
049 private IndexManager indexManager;
050 private int pageSize = DEFAULT_PAGE_SIZE;
051 private int keySize = DEFAULT_KEY_SIZE;
052 private int keysPerPage = pageSize / keySize;
053 private TreePage root;
054 private LRUCache<Long, TreePage> pageCache;
055 private DataByteArrayInputStream dataIn;
056 private DataByteArrayOutputStream dataOut;
057 private byte[] readBuffer;
058 private Marshaller keyMarshaller;
059 private long length;
060 private TreePage firstFree;
061 private TreePage lastFree;
062 private AtomicBoolean loaded = new AtomicBoolean();
063 private boolean enablePageCaching = true;
064 private int pageCacheSize = 10;
065
066 /**
067 * Constructor
068 *
069 * @param directory
070 * @param name
071 * @param indexManager
072 * @throws IOException
073 */
074 public TreeIndex(File directory, String name, IndexManager indexManager) throws IOException {
075 this.directory = directory;
076 this.name = name;
077 this.indexManager = indexManager;
078 pageCache = new LRUCache<Long, TreePage>(pageCacheSize, pageCacheSize, 0.75f, true);
079 openIndexFile();
080 }
081
082 /**
083 * Set the marshaller for key objects
084 *
085 * @param marshaller
086 */
087 public void setKeyMarshaller(Marshaller marshaller) {
088 this.keyMarshaller = marshaller;
089 }
090
091 /**
092 * @return the keySize
093 */
094 public int getKeySize() {
095 return this.keySize;
096 }
097
098 /**
099 * @param keySize the keySize to set
100 */
101 public void setKeySize(int keySize) {
102 this.keySize = keySize;
103 if (loaded.get()) {
104 throw new RuntimeException("Pages already loaded - can't reset key size");
105 }
106 }
107
108 /**
109 * @return the pageSize
110 */
111 public int getPageSize() {
112 return this.pageSize;
113 }
114
115 /**
116 * @param pageSize the pageSize to set
117 */
118 public void setPageSize(int pageSize) {
119 if (loaded.get() && pageSize != this.pageSize) {
120 throw new RuntimeException("Pages already loaded - can't reset page size");
121 }
122 this.pageSize = pageSize;
123 }
124
125 public boolean isTransient() {
126 return false;
127 }
128
129 /**
130 * @return the enablePageCaching
131 */
132 public boolean isEnablePageCaching() {
133 return this.enablePageCaching;
134 }
135
136 /**
137 * @param enablePageCaching the enablePageCaching to set
138 */
139 public void setEnablePageCaching(boolean enablePageCaching) {
140 this.enablePageCaching = enablePageCaching;
141 }
142
143 /**
144 * @return the pageCacheSize
145 */
146 public int getPageCacheSize() {
147 return this.pageCacheSize;
148 }
149
150 /**
151 * @param pageCacheSize the pageCacheSize to set
152 */
153 public void setPageCacheSize(int pageCacheSize) {
154 this.pageCacheSize = pageCacheSize;
155 pageCache.setMaxCacheSize(pageCacheSize);
156 }
157
158 public void load() {
159 if (loaded.compareAndSet(false, true)) {
160 keysPerPage = pageSize / keySize;
161 dataIn = new DataByteArrayInputStream();
162 dataOut = new DataByteArrayOutputStream(pageSize);
163 readBuffer = new byte[pageSize];
164 try {
165 openIndexFile();
166 long offset = 0;
167 while ((offset + pageSize) <= indexFile.length()) {
168 indexFile.seek(offset);
169 indexFile.readFully(readBuffer, 0, TreePage.PAGE_HEADER_SIZE);
170 dataIn.restart(readBuffer);
171 TreePage page = new TreePage(keysPerPage);
172 page.setTree(this);
173 page.setId(offset);
174 page.readHeader(dataIn);
175 if (!page.isActive()) {
176 if (lastFree != null) {
177 lastFree.setNextFreePageId(offset);
178 indexFile.seek(lastFree.getId());
179 dataOut.reset();
180 lastFree.writeHeader(dataOut);
181 indexFile.write(dataOut.getData(), 0, TreePage.PAGE_HEADER_SIZE);
182 lastFree = page;
183 } else {
184 lastFree = page;
185 firstFree = page;
186 }
187 } else if (root == null && page.isRoot()) {
188 root = getFullPage(offset);
189 }
190 offset += pageSize;
191 }
192 length = offset;
193 if (root == null) {
194 root = createRoot();
195 }
196 } catch (IOException e) {
197 LOG.error("Failed to load index ", e);
198 throw new RuntimeException(e);
199 }
200 }
201 }
202
203 public void unload() throws IOException {
204 if (loaded.compareAndSet(true, false)) {
205 if (indexFile != null) {
206 indexFile.close();
207 indexFile = null;
208 pageCache.clear();
209 root = null;
210 firstFree = null;
211 lastFree = null;
212 }
213 }
214 }
215
216 public void store(Object key, StoreEntry value) throws IOException {
217 TreeEntry entry = new TreeEntry();
218 entry.setKey((Comparable)key);
219 entry.setIndexOffset(value.getOffset());
220 root.put(entry);
221 }
222
223 public StoreEntry get(Object key) throws IOException {
224 TreeEntry entry = new TreeEntry();
225 entry.setKey((Comparable)key);
226 TreeEntry result = root.find(entry);
227 return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
228 }
229
230 public StoreEntry remove(Object key) throws IOException {
231 TreeEntry entry = new TreeEntry();
232 entry.setKey((Comparable)key);
233 TreeEntry result = root.remove(entry);
234 return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
235 }
236
237 public boolean containsKey(Object key) throws IOException {
238 TreeEntry entry = new TreeEntry();
239 entry.setKey((Comparable)key);
240 return root.find(entry) != null;
241 }
242
243 public void clear() throws IOException {
244 unload();
245 delete();
246 openIndexFile();
247 load();
248 }
249
250 public void delete() throws IOException {
251 unload();
252 if (file.exists()) {
253 boolean result = file.delete();
254 }
255 length = 0;
256 }
257
258 /**
259 * @return the root
260 */
261 TreePage getRoot() {
262 return this.root;
263 }
264
265 TreePage lookupPage(long pageId) throws IOException {
266 TreePage result = null;
267 if (pageId >= 0) {
268 if (root != null && root.getId() == pageId) {
269 result = root;
270 } else {
271 result = getFromCache(pageId);
272 }
273 if (result == null) {
274 result = getFullPage(pageId);
275 if (result != null) {
276 if (result.isActive()) {
277 addToCache(result);
278 } else {
279 throw new IllegalStateException("Trying to access an inactive page: " + pageId + " root is " + root);
280 }
281 }
282 }
283 }
284 return result;
285 }
286
287 TreePage createRoot() throws IOException {
288 TreePage result = createPage(-1);
289 root = result;
290 return result;
291 }
292
293 TreePage createPage(long parentId) throws IOException {
294 TreePage result = getNextFreePage();
295 if (result == null) {
296 // allocate one
297 result = new TreePage(keysPerPage);
298 result.setId(length);
299 result.setTree(this);
300 result.setParentId(parentId);
301 writePage(result);
302 length += pageSize;
303 indexFile.seek(length);
304 indexFile.write(TreeEntry.NOT_SET);
305 }
306 addToCache(result);
307 return result;
308 }
309
310 void releasePage(TreePage page) throws IOException {
311 removeFromCache(page);
312 page.reset();
313 page.setActive(false);
314 if (lastFree == null) {
315 firstFree = page;
316 lastFree = page;
317 } else {
318 lastFree.setNextFreePageId(page.getId());
319 writePage(lastFree);
320 }
321 writePage(page);
322 }
323
324 private TreePage getNextFreePage() throws IOException {
325 TreePage result = null;
326 if (firstFree != null) {
327 if (firstFree.equals(lastFree)) {
328 result = firstFree;
329 firstFree = null;
330 lastFree = null;
331 } else {
332 result = firstFree;
333 firstFree = getPage(firstFree.getNextFreePageId());
334 if (firstFree == null) {
335 lastFree = null;
336 }
337 }
338 result.setActive(true);
339 result.reset();
340 result.saveHeader();
341 }
342 return result;
343 }
344
345 void writeFullPage(TreePage page) throws IOException {
346 dataOut.reset();
347 page.write(keyMarshaller, dataOut);
348 if (dataOut.size() > pageSize) {
349 throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
350 }
351 indexFile.seek(page.getId());
352 indexFile.write(dataOut.getData(), 0, dataOut.size());
353 }
354
355 void writePage(TreePage page) throws IOException {
356 dataOut.reset();
357 page.writeHeader(dataOut);
358 indexFile.seek(page.getId());
359 indexFile.write(dataOut.getData(), 0, TreePage.PAGE_HEADER_SIZE);
360 }
361
362 TreePage getFullPage(long id) throws IOException {
363 indexFile.seek(id);
364 indexFile.readFully(readBuffer, 0, pageSize);
365 dataIn.restart(readBuffer);
366 TreePage page = new TreePage(keysPerPage);
367 page.setId(id);
368 page.setTree(this);
369 page.read(keyMarshaller, dataIn);
370 return page;
371 }
372
373 TreePage getPage(long id) throws IOException {
374 indexFile.seek(id);
375 indexFile.readFully(readBuffer, 0, TreePage.PAGE_HEADER_SIZE);
376 dataIn.restart(readBuffer);
377 TreePage page = new TreePage(keysPerPage);
378 page.setId(id);
379 page.setTree(this);
380 page.readHeader(dataIn);
381 return page;
382 }
383
384 private TreePage getFromCache(long pageId) {
385 TreePage result = null;
386 if (enablePageCaching) {
387 result = pageCache.get(pageId);
388 }
389 return result;
390 }
391
392 private void addToCache(TreePage page) {
393 if (enablePageCaching) {
394 pageCache.put(page.getId(), page);
395 }
396 }
397
398 private void removeFromCache(TreePage page) {
399 if (enablePageCaching) {
400 pageCache.remove(page.getId());
401 }
402 }
403
404 protected void openIndexFile() throws IOException {
405 if (indexFile == null) {
406 file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
407 IOHelper.mkdirs(file.getParentFile());
408 indexFile = new RandomAccessFile(file, "rw");
409 }
410 }
411
412 static {
413 DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
414 DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
415 }
416
417 public int getSize() {
418 return 0;
419 }
420 }