001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl.index.hash;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.io.RandomAccessFile;
022 import java.util.LinkedList;
023 import java.util.concurrent.atomic.AtomicBoolean;
024
025 import org.apache.activemq.kaha.Marshaller;
026 import org.apache.activemq.kaha.StoreEntry;
027 import org.apache.activemq.kaha.impl.index.Index;
028 import org.apache.activemq.kaha.impl.index.IndexManager;
029 import org.apache.activemq.util.DataByteArrayInputStream;
030 import org.apache.activemq.util.DataByteArrayOutputStream;
031 import org.apache.activemq.util.IOHelper;
032 import org.apache.activemq.util.LRUCache;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 /**
037 * BTree implementation
038 *
039 *
040 */
041 public class HashIndex implements Index, HashIndexMBean {
042 public static final int DEFAULT_PAGE_SIZE;
043 public static final int DEFAULT_KEY_SIZE;
044 public static final int DEFAULT_BIN_SIZE;
045 public static final int MAXIMUM_CAPACITY;
046 public static final int DEFAULT_LOAD_FACTOR;
047 private static final int LOW_WATER_MARK=1024*16;
048 private static final String NAME_PREFIX = "hash-index-";
049 private static final Logger LOG = LoggerFactory.getLogger(HashIndex.class);
050 private final String name;
051 private File directory;
052 private File file;
053 private RandomAccessFile indexFile;
054 private IndexManager indexManager;
055 private int pageSize = DEFAULT_PAGE_SIZE;
056 private int keySize = DEFAULT_KEY_SIZE;
057 private int numberOfBins = DEFAULT_BIN_SIZE;
058 private int keysPerPage = this.pageSize /this.keySize;
059 private DataByteArrayInputStream dataIn;
060 private DataByteArrayOutputStream dataOut;
061 private byte[] readBuffer;
062 private HashBin[] bins;
063 private Marshaller keyMarshaller;
064 private long length;
065 private LinkedList<HashPage> freeList = new LinkedList<HashPage>();
066 private AtomicBoolean loaded = new AtomicBoolean();
067 private LRUCache<Long, HashPage> pageCache;
068 private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
069 private int pageCacheSize = 10;
070 private int size;
071 private int highestSize=0;
072 private int activeBins;
073 private int threshold;
074 private int maximumCapacity=MAXIMUM_CAPACITY;
075 private int loadFactor=DEFAULT_LOAD_FACTOR;
076
077
078 /**
079 * Constructor
080 *
081 * @param directory
082 * @param name
083 * @param indexManager
084 * @throws IOException
085 */
086 public HashIndex(File directory, String name, IndexManager indexManager) throws IOException {
087 this.directory = directory;
088 this.name = name;
089 this.indexManager = indexManager;
090 openIndexFile();
091 pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f, true);
092 }
093
094 /**
095 * Set the marshaller for key objects
096 *
097 * @param marshaller
098 */
099 public synchronized void setKeyMarshaller(Marshaller marshaller) {
100 this.keyMarshaller = marshaller;
101 }
102
103 /**
104 * @return the keySize
105 */
106 public synchronized int getKeySize() {
107 return this.keySize;
108 }
109
110 /**
111 * @param keySize the keySize to set
112 */
113 public synchronized void setKeySize(int keySize) {
114 this.keySize = keySize;
115 if (loaded.get()) {
116 throw new RuntimeException("Pages already loaded - can't reset key size");
117 }
118 }
119
120 /**
121 * @return the pageSize
122 */
123 public synchronized int getPageSize() {
124 return this.pageSize;
125 }
126
127 /**
128 * @param pageSize the pageSize to set
129 */
130 public synchronized void setPageSize(int pageSize) {
131 if (loaded.get() && pageSize != this.pageSize) {
132 throw new RuntimeException("Pages already loaded - can't reset page size");
133 }
134 this.pageSize = pageSize;
135 }
136
137 /**
138 * @return number of bins
139 */
140 public int getNumberOfBins() {
141 return this.numberOfBins;
142 }
143
144 /**
145 * @param numberOfBins
146 */
147 public void setNumberOfBins(int numberOfBins) {
148 if (loaded.get() && numberOfBins != this.numberOfBins) {
149 throw new RuntimeException("Pages already loaded - can't reset bin size");
150 }
151 this.numberOfBins = numberOfBins;
152 }
153
154 /**
155 * @return the enablePageCaching
156 */
157 public synchronized boolean isEnablePageCaching() {
158 return this.enablePageCaching;
159 }
160
161 /**
162 * @param enablePageCaching the enablePageCaching to set
163 */
164 public synchronized void setEnablePageCaching(boolean enablePageCaching) {
165 this.enablePageCaching = enablePageCaching;
166 }
167
168 /**
169 * @return the pageCacheSize
170 */
171 public synchronized int getPageCacheSize() {
172 return this.pageCacheSize;
173 }
174
175 /**
176 * @param pageCacheSize the pageCacheSize to set
177 */
178 public synchronized void setPageCacheSize(int pageCacheSize) {
179 this.pageCacheSize = pageCacheSize;
180 pageCache.setMaxCacheSize(pageCacheSize);
181 }
182
183 public synchronized boolean isTransient() {
184 return false;
185 }
186
187 /**
188 * @return the threshold
189 */
190 public int getThreshold() {
191 return threshold;
192 }
193
194 /**
195 * @param threshold the threshold to set
196 */
197 public void setThreshold(int threshold) {
198 this.threshold = threshold;
199 }
200
201 /**
202 * @return the loadFactor
203 */
204 public int getLoadFactor() {
205 return loadFactor;
206 }
207
208 /**
209 * @param loadFactor the loadFactor to set
210 */
211 public void setLoadFactor(int loadFactor) {
212 this.loadFactor = loadFactor;
213 }
214
215 /**
216 * @return the maximumCapacity
217 */
218 public int getMaximumCapacity() {
219 return maximumCapacity;
220 }
221
222 /**
223 * @param maximumCapacity the maximumCapacity to set
224 */
225 public void setMaximumCapacity(int maximumCapacity) {
226 this.maximumCapacity = maximumCapacity;
227 }
228
229 public synchronized int getSize() {
230 return size;
231 }
232
233 public synchronized int getActiveBins(){
234 return activeBins;
235 }
236
237 public synchronized void load() {
238 if (loaded.compareAndSet(false, true)) {
239 int capacity = 1;
240 while (capacity < numberOfBins) {
241 capacity <<= 1;
242 }
243 this.bins = new HashBin[capacity];
244 this.numberOfBins=capacity;
245 threshold = calculateThreashold();
246 keysPerPage = pageSize / keySize;
247 dataIn = new DataByteArrayInputStream();
248 dataOut = new DataByteArrayOutputStream(pageSize);
249 readBuffer = new byte[pageSize];
250 try {
251 openIndexFile();
252 if (indexFile.length() > 0) {
253 doCompress();
254 }
255 } catch (IOException e) {
256 LOG.error("Failed to load index ", e);
257 throw new RuntimeException(e);
258 }
259 }
260 }
261
262 public synchronized void unload() throws IOException {
263 if (loaded.compareAndSet(true, false)) {
264 if (indexFile != null) {
265 indexFile.close();
266 indexFile = null;
267 freeList.clear();
268 pageCache.clear();
269 bins = new HashBin[bins.length];
270 }
271 }
272 }
273
274 public synchronized void store(Object key, StoreEntry value) throws IOException {
275 load();
276 HashEntry entry = new HashEntry();
277 entry.setKey((Comparable)key);
278 entry.setIndexOffset(value.getOffset());
279 if (!getBin(key).put(entry)) {
280 this.size++;
281 }
282 if (this.size >= this.threshold) {
283 resize(2*bins.length);
284 }
285 if(this.size > this.highestSize) {
286 this.highestSize=this.size;
287 }
288 }
289
290 public synchronized StoreEntry get(Object key) throws IOException {
291 load();
292 HashEntry entry = new HashEntry();
293 entry.setKey((Comparable)key);
294 HashEntry result = getBin(key).find(entry);
295 return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
296 }
297
298 public synchronized StoreEntry remove(Object key) throws IOException {
299 load();
300 StoreEntry result = null;
301 HashEntry entry = new HashEntry();
302 entry.setKey((Comparable)key);
303 HashEntry he = getBin(key).remove(entry);
304 if (he != null) {
305 this.size--;
306 result = this.indexManager.getIndex(he.getIndexOffset());
307 }
308 if (this.highestSize > LOW_WATER_MARK && this.highestSize > (this.size *2)) {
309 int newSize = this.size/this.keysPerPage;
310 newSize = Math.max(128, newSize);
311 this.highestSize=0;
312 resize(newSize);
313
314 }
315 return result;
316 }
317
318 public synchronized boolean containsKey(Object key) throws IOException {
319 return get(key) != null;
320 }
321
322 public synchronized void clear() throws IOException {
323 unload();
324 delete();
325 openIndexFile();
326 load();
327 }
328
329 public synchronized void delete() throws IOException {
330 unload();
331 if (file.exists()) {
332 file.delete();
333 }
334 length = 0;
335 }
336
337 HashPage lookupPage(long pageId) throws IOException {
338 HashPage result = null;
339 if (pageId >= 0) {
340 result = getFromCache(pageId);
341 if (result == null) {
342 result = getFullPage(pageId);
343 if (result != null) {
344 if (result.isActive()) {
345 addToCache(result);
346 } else {
347 throw new IllegalStateException("Trying to access an inactive page: " + pageId);
348 }
349 }
350 }
351 }
352 return result;
353 }
354
355 HashPage createPage(int binId) throws IOException {
356 HashPage result = getNextFreePage();
357 if (result == null) {
358 // allocate one
359 result = new HashPage(keysPerPage);
360 result.setId(length);
361 result.setBinId(binId);
362 writePageHeader(result);
363 length += pageSize;
364 indexFile.seek(length);
365 indexFile.write(HashEntry.NOT_SET);
366 }
367 addToCache(result);
368 return result;
369 }
370
371 void releasePage(HashPage page) throws IOException {
372 removeFromCache(page);
373 page.reset();
374 page.setActive(false);
375 writePageHeader(page);
376 freeList.add(page);
377 }
378
379 private HashPage getNextFreePage() throws IOException {
380 HashPage result = null;
381 if(!freeList.isEmpty()) {
382 result = freeList.removeFirst();
383 result.setActive(true);
384 result.reset();
385 writePageHeader(result);
386 }
387 return result;
388 }
389
390 void writeFullPage(HashPage page) throws IOException {
391 dataOut.reset();
392 page.write(keyMarshaller, dataOut);
393 if (dataOut.size() > pageSize) {
394 throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
395 }
396 indexFile.seek(page.getId());
397 indexFile.write(dataOut.getData(), 0, dataOut.size());
398 }
399
400 void writePageHeader(HashPage page) throws IOException {
401 dataOut.reset();
402 page.writeHeader(dataOut);
403 indexFile.seek(page.getId());
404 indexFile.write(dataOut.getData(), 0, HashPage.PAGE_HEADER_SIZE);
405 }
406
407 HashPage getFullPage(long id) throws IOException {
408 indexFile.seek(id);
409 indexFile.readFully(readBuffer, 0, pageSize);
410 dataIn.restart(readBuffer);
411 HashPage page = new HashPage(keysPerPage);
412 page.setId(id);
413 page.read(keyMarshaller, dataIn);
414 return page;
415 }
416
417 HashPage getPageHeader(long id) throws IOException {
418 indexFile.seek(id);
419 indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
420 dataIn.restart(readBuffer);
421 HashPage page = new HashPage(keysPerPage);
422 page.setId(id);
423 page.readHeader(dataIn);
424 return page;
425 }
426
427 void addToBin(HashPage page) throws IOException {
428 int index = page.getBinId();
429 if (index >= this.bins.length) {
430 resize(index+1);
431 }
432 HashBin bin = getBin(index);
433 bin.addHashPageInfo(page.getId(), page.getPersistedSize());
434 }
435
436 private HashBin getBin(int index) {
437
438 HashBin result = bins[index];
439 if (result == null) {
440 result = new HashBin(this, index, pageSize / keySize);
441 bins[index] = result;
442 activeBins++;
443 }
444 return result;
445 }
446
447 private void openIndexFile() throws IOException {
448 if (indexFile == null) {
449 file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
450 IOHelper.mkdirs(file.getParentFile());
451 indexFile = new RandomAccessFile(file, "rw");
452 }
453 }
454
455 private HashBin getBin(Object key) {
456 int hash = hash(key);
457 int i = indexFor(hash, bins.length);
458 return getBin(i);
459 }
460
461 private HashPage getFromCache(long pageId) {
462 HashPage result = null;
463 if (enablePageCaching) {
464 result = pageCache.get(pageId);
465 }
466 return result;
467 }
468
469 private void addToCache(HashPage page) {
470 if (enablePageCaching) {
471 pageCache.put(page.getId(), page);
472 }
473 }
474
475 private void removeFromCache(HashPage page) {
476 if (enablePageCaching) {
477 pageCache.remove(page.getId());
478 }
479 }
480
481 private void doLoad() throws IOException {
482 long offset = 0;
483 if (loaded.compareAndSet(false, true)) {
484 while ((offset + pageSize) <= indexFile.length()) {
485 indexFile.seek(offset);
486 indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
487 dataIn.restart(readBuffer);
488 HashPage page = new HashPage(keysPerPage);
489 page.setId(offset);
490 page.readHeader(dataIn);
491 if (!page.isActive()) {
492 page.reset();
493 freeList.add(page);
494 } else {
495 addToBin(page);
496 size+=page.size();
497 }
498 offset += pageSize;
499 }
500 length=offset;
501 }
502 }
503
504 private void doCompress() throws IOException {
505 String backFileName = name + "-COMPRESS";
506 HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
507 backIndex.setKeyMarshaller(keyMarshaller);
508 backIndex.setKeySize(getKeySize());
509 backIndex.setNumberOfBins(getNumberOfBins());
510 backIndex.setPageSize(getPageSize());
511 backIndex.load();
512 File backFile = backIndex.file;
513 long offset = 0;
514 while ((offset + pageSize) <= indexFile.length()) {
515 indexFile.seek(offset);
516 HashPage page = getFullPage(offset);
517 if (page.isActive()) {
518 for (HashEntry entry : page.getEntries()) {
519 backIndex.getBin(entry.getKey()).put(entry);
520 backIndex.size++;
521 }
522 }
523 page=null;
524 offset += pageSize;
525 }
526 backIndex.unload();
527
528 unload();
529 IOHelper.deleteFile(file);
530 IOHelper.copyFile(backFile, file);
531 IOHelper.deleteFile(backFile);
532 openIndexFile();
533 doLoad();
534 }
535
536 private void resize(int newCapacity) throws IOException {
537 if (bins.length < getMaximumCapacity()) {
538 if (newCapacity != numberOfBins) {
539 int capacity = 1;
540 while (capacity < newCapacity) {
541 capacity <<= 1;
542 }
543 newCapacity=capacity;
544 if (newCapacity != numberOfBins) {
545 LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
546 String backFileName = name + "-REISZE";
547 HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
548 backIndex.setKeyMarshaller(keyMarshaller);
549 backIndex.setKeySize(getKeySize());
550 backIndex.setNumberOfBins(newCapacity);
551 backIndex.setPageSize(getPageSize());
552 backIndex.load();
553 File backFile = backIndex.file;
554 long offset = 0;
555 while ((offset + pageSize) <= indexFile.length()) {
556 indexFile.seek(offset);
557 HashPage page = getFullPage(offset);
558 if (page.isActive()) {
559 for (HashEntry entry : page.getEntries()) {
560 backIndex.getBin(entry.getKey()).put(entry);
561 backIndex.size++;
562 }
563 }
564 page=null;
565 offset += pageSize;
566 }
567 backIndex.unload();
568
569 unload();
570 IOHelper.deleteFile(file);
571 IOHelper.copyFile(backFile, file);
572 IOHelper.deleteFile(backFile);
573 setNumberOfBins(newCapacity);
574 bins = new HashBin[newCapacity];
575 threshold = calculateThreashold();
576 openIndexFile();
577 doLoad();
578 }
579 }
580 }else {
581 threshold = Integer.MAX_VALUE;
582 return;
583 }
584 }
585
586 private int calculateThreashold() {
587 return (int)(bins.length * loadFactor);
588 }
589
590
591 public String toString() {
592 String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
593 return str;
594 }
595
596
597 static int hash(Object x) {
598 int h = x.hashCode();
599 h += ~(h << 9);
600 h ^= h >>> 14;
601 h += h << 4;
602 h ^= h >>> 10;
603 return h;
604 }
605
606 static int indexFor(int h, int length) {
607 return h & (length - 1);
608 }
609
610 static {
611 DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "1024"));
612 DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
613 DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
614 MAXIMUM_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
615 DEFAULT_LOAD_FACTOR=Integer.parseInt(System.getProperty("defaultLoadFactor","50"));
616 }
617 }