001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl.container;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.Collection;
022 import java.util.Iterator;
023 import java.util.Map;
024 import java.util.Set;
025
026 import org.apache.activemq.kaha.ContainerId;
027 import org.apache.activemq.kaha.IndexMBean;
028 import org.apache.activemq.kaha.MapContainer;
029 import org.apache.activemq.kaha.Marshaller;
030 import org.apache.activemq.kaha.RuntimeStoreException;
031 import org.apache.activemq.kaha.Store;
032 import org.apache.activemq.kaha.StoreEntry;
033 import org.apache.activemq.kaha.StoreLocation;
034 import org.apache.activemq.kaha.impl.DataManager;
035 import org.apache.activemq.kaha.impl.data.Item;
036 import org.apache.activemq.kaha.impl.index.Index;
037 import org.apache.activemq.kaha.impl.index.IndexItem;
038 import org.apache.activemq.kaha.impl.index.IndexLinkedList;
039 import org.apache.activemq.kaha.impl.index.IndexManager;
040 import org.apache.activemq.kaha.impl.index.VMIndex;
041 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
042 import org.apache.activemq.util.IOHelper;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046 /**
047 * Implementation of a MapContainer
048 *
049 *
050 */
051 public final class MapContainerImpl extends BaseContainerImpl implements MapContainer {
052
053 private static final Logger LOG = LoggerFactory.getLogger(MapContainerImpl.class);
054 protected Index index;
055 protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER;
056 protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER;
057 protected File directory;
058 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
059 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
060 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
061 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
062 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
063
064 public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager,
065 DataManager dataManager, boolean persistentIndex) {
066 super(id, root, indexManager, dataManager, persistentIndex);
067 this.directory = directory;
068 }
069
070 public synchronized void init() {
071 super.init();
072 if (index == null) {
073 if (persistentIndex) {
074 String name = containerId.getDataContainerName() + "_" + containerId.getKey();
075 try {
076 HashIndex hashIndex = new HashIndex(directory, name, indexManager);
077 hashIndex.setNumberOfBins(getIndexBinSize());
078 hashIndex.setKeySize(getIndexKeySize());
079 hashIndex.setPageSize(getIndexPageSize());
080 hashIndex.setMaximumCapacity(getIndexMaxBinSize());
081 hashIndex.setLoadFactor(getIndexLoadFactor());
082 this.index = hashIndex;
083 } catch (IOException e) {
084 LOG.error("Failed to create HashIndex", e);
085 throw new RuntimeException(e);
086 }
087 } else {
088 this.index = new VMIndex(indexManager);
089 }
090 }
091 index.setKeyMarshaller(keyMarshaller);
092 }
093
094 /*
095 * (non-Javadoc)
096 *
097 * @see org.apache.activemq.kaha.MapContainer#load()
098 */
099 public synchronized void load() {
100 checkClosed();
101 if (!loaded) {
102 if (!loaded) {
103 loaded = true;
104 try {
105 init();
106 index.load();
107 long nextItem = root.getNextItem();
108 while (nextItem != Item.POSITION_NOT_SET) {
109 IndexItem item = indexManager.getIndex(nextItem);
110 StoreLocation data = item.getKeyDataItem();
111 Object key = dataManager.readItem(keyMarshaller, data);
112 if (index.isTransient()) {
113 index.store(key, item);
114 }
115 indexList.add(item);
116 nextItem = item.getNextItem();
117 }
118 } catch (IOException e) {
119 LOG.error("Failed to load container " + getId(), e);
120 throw new RuntimeStoreException(e);
121 }
122 }
123 }
124 }
125
126 /*
127 * (non-Javadoc)
128 *
129 * @see org.apache.activemq.kaha.MapContainer#unload()
130 */
131 public synchronized void unload() {
132 checkClosed();
133 if (loaded) {
134 loaded = false;
135 try {
136 index.unload();
137 } catch (IOException e) {
138 LOG.warn("Failed to unload the index", e);
139 }
140 indexList.clear();
141 }
142 }
143
144 public synchronized void delete() {
145 unload();
146 try {
147 index.delete();
148 } catch (IOException e) {
149 LOG.warn("Failed to unload the index", e);
150 }
151 }
152
153
154 public synchronized void setKeyMarshaller(Marshaller keyMarshaller) {
155 checkClosed();
156 this.keyMarshaller = keyMarshaller;
157 if (index != null) {
158 index.setKeyMarshaller(keyMarshaller);
159 }
160 }
161
162 public synchronized void setValueMarshaller(Marshaller valueMarshaller) {
163 checkClosed();
164 this.valueMarshaller = valueMarshaller;
165 }
166
167 /*
168 * (non-Javadoc)
169 *
170 * @see org.apache.activemq.kaha.MapContainer#size()
171 */
172 public synchronized int size() {
173 load();
174 return indexList.size();
175 }
176
177 /*
178 * (non-Javadoc)
179 *
180 * @see org.apache.activemq.kaha.MapContainer#isEmpty()
181 */
182 public synchronized boolean isEmpty() {
183 load();
184 return indexList.isEmpty();
185 }
186
187 /*
188 * (non-Javadoc)
189 *
190 * @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
191 */
192 public synchronized boolean containsKey(Object key) {
193 load();
194 try {
195 return index.containsKey(key);
196 } catch (IOException e) {
197 LOG.error("Failed trying to find key: " + key, e);
198 throw new RuntimeException(e);
199 }
200 }
201
202 /*
203 * (non-Javadoc)
204 *
205 * @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
206 */
207 public synchronized Object get(Object key) {
208 load();
209 Object result = null;
210 StoreEntry item = null;
211 try {
212 item = index.get(key);
213 } catch (IOException e) {
214 LOG.error("Failed trying to get key: " + key, e);
215 throw new RuntimeException(e);
216 }
217 if (item != null) {
218 result = getValue(item);
219 }
220 return result;
221 }
222
223 /**
224 * Get the StoreEntry associated with the key
225 *
226 * @param key
227 * @return the StoreEntry
228 */
229 public synchronized StoreEntry getEntry(Object key) {
230 load();
231 StoreEntry item = null;
232 try {
233 item = index.get(key);
234 } catch (IOException e) {
235 LOG.error("Failed trying to get key: " + key, e);
236 throw new RuntimeException(e);
237 }
238 return item;
239 }
240
241 /*
242 * (non-Javadoc)
243 *
244 * @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
245 */
246 public synchronized boolean containsValue(Object o) {
247 load();
248 boolean result = false;
249 if (o != null) {
250 IndexItem item = indexList.getFirst();
251 while (item != null) {
252 Object value = getValue(item);
253 if (value != null && value.equals(o)) {
254 result = true;
255 break;
256 }
257 item = indexList.getNextEntry(item);
258 }
259 }
260 return result;
261 }
262
263 /*
264 * (non-Javadoc)
265 *
266 * @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
267 */
268 public synchronized void putAll(Map t) {
269 load();
270 if (t != null) {
271 for (Iterator i = t.entrySet().iterator(); i.hasNext();) {
272 Map.Entry entry = (Map.Entry)i.next();
273 put(entry.getKey(), entry.getValue());
274 }
275 }
276 }
277
278 /*
279 * (non-Javadoc)
280 *
281 * @see org.apache.activemq.kaha.MapContainer#keySet()
282 */
283 public synchronized Set keySet() {
284 load();
285 return new ContainerKeySet(this);
286 }
287
288 /*
289 * (non-Javadoc)
290 *
291 * @see org.apache.activemq.kaha.MapContainer#values()
292 */
293 public synchronized Collection values() {
294 load();
295 return new ContainerValueCollection(this);
296 }
297
298 /*
299 * (non-Javadoc)
300 *
301 * @see org.apache.activemq.kaha.MapContainer#entrySet()
302 */
303 public synchronized Set entrySet() {
304 load();
305 return new ContainerEntrySet(this);
306 }
307
308 /*
309 * (non-Javadoc)
310 *
311 * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object,
312 * java.lang.Object)
313 */
314 public synchronized Object put(Object key, Object value) {
315 load();
316 Object result = remove(key);
317 IndexItem item = write(key, value);
318 try {
319 index.store(key, item);
320 } catch (IOException e) {
321 LOG.error("Failed trying to insert key: " + key, e);
322 throw new RuntimeException(e);
323 }
324 indexList.add(item);
325 return result;
326 }
327
328 /*
329 * (non-Javadoc)
330 *
331 * @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
332 */
333 public synchronized Object remove(Object key) {
334 load();
335 try {
336 Object result = null;
337 IndexItem item = (IndexItem)index.remove(key);
338 if (item != null) {
339 // refresh the index
340 item = (IndexItem)indexList.refreshEntry(item);
341 result = getValue(item);
342 IndexItem prev = indexList.getPrevEntry(item);
343 IndexItem next = indexList.getNextEntry(item);
344 indexList.remove(item);
345 delete(item, prev, next);
346 }
347 return result;
348 } catch (IOException e) {
349 LOG.error("Failed trying to remove key: " + key, e);
350 throw new RuntimeException(e);
351 }
352 }
353
354 public synchronized boolean removeValue(Object o) {
355 load();
356 boolean result = false;
357 if (o != null) {
358 IndexItem item = indexList.getFirst();
359 while (item != null) {
360 Object value = getValue(item);
361 if (value != null && value.equals(o)) {
362 result = true;
363 // find the key
364 Object key = getKey(item);
365 if (key != null) {
366 remove(key);
367 }
368 break;
369 }
370 item = indexList.getNextEntry(item);
371 }
372 }
373 return result;
374 }
375
376 protected synchronized void remove(IndexItem item) {
377 Object key = getKey(item);
378 if (key != null) {
379 remove(key);
380 }
381 }
382
383 /*
384 * (non-Javadoc)
385 *
386 * @see org.apache.activemq.kaha.MapContainer#clear()
387 */
388 public synchronized void clear() {
389 checkClosed();
390 loaded = true;
391 init();
392 if (index != null) {
393 try {
394 index.clear();
395 } catch (IOException e) {
396 LOG.error("Failed trying clear index", e);
397 throw new RuntimeException(e);
398 }
399 }
400 super.clear();
401 doClear();
402 }
403
404 /**
405 * Add an entry to the Store Map
406 *
407 * @param key
408 * @param value
409 * @return the StoreEntry associated with the entry
410 */
411 public synchronized StoreEntry place(Object key, Object value) {
412 load();
413 try {
414 remove(key);
415 IndexItem item = write(key, value);
416 index.store(key, item);
417 indexList.add(item);
418 return item;
419 } catch (IOException e) {
420 LOG.error("Failed trying to place key: " + key, e);
421 throw new RuntimeException(e);
422 }
423 }
424
425 /**
426 * Remove an Entry from ther Map
427 *
428 * @param entry
429 * @throws IOException
430 */
431 public synchronized void remove(StoreEntry entry) {
432 load();
433 IndexItem item = (IndexItem)entry;
434 if (item != null) {
435 Object key = getKey(item);
436 try {
437 index.remove(key);
438 } catch (IOException e) {
439 LOG.error("Failed trying to remove entry: " + entry, e);
440 throw new RuntimeException(e);
441 }
442 IndexItem prev = indexList.getPrevEntry(item);
443 IndexItem next = indexList.getNextEntry(item);
444 indexList.remove(item);
445 delete(item, prev, next);
446 }
447 }
448
449 public synchronized StoreEntry getFirst() {
450 load();
451 return indexList.getFirst();
452 }
453
454 public synchronized StoreEntry getLast() {
455 load();
456 return indexList.getLast();
457 }
458
459 public synchronized StoreEntry getNext(StoreEntry entry) {
460 load();
461 IndexItem item = (IndexItem)entry;
462 return indexList.getNextEntry(item);
463 }
464
465 public synchronized StoreEntry getPrevious(StoreEntry entry) {
466 load();
467 IndexItem item = (IndexItem)entry;
468 return indexList.getPrevEntry(item);
469 }
470
471 public synchronized StoreEntry refresh(StoreEntry entry) {
472 load();
473 return indexList.getEntry(entry);
474 }
475
476 /**
477 * Get the value from it's location
478 *
479 * @param item
480 * @return the value associated with the store entry
481 */
482 public synchronized Object getValue(StoreEntry item) {
483 load();
484 Object result = null;
485 if (item != null) {
486 try {
487 // ensure this value is up to date
488 // item=indexList.getEntry(item);
489 StoreLocation data = item.getValueDataItem();
490 result = dataManager.readItem(valueMarshaller, data);
491 } catch (IOException e) {
492 LOG.error("Failed to get value for " + item, e);
493 throw new RuntimeStoreException(e);
494 }
495 }
496 return result;
497 }
498
499 /**
500 * Get the Key object from it's location
501 *
502 * @param item
503 * @return the Key Object associated with the StoreEntry
504 */
505 public synchronized Object getKey(StoreEntry item) {
506 load();
507 Object result = null;
508 if (item != null) {
509 try {
510 StoreLocation data = item.getKeyDataItem();
511 result = dataManager.readItem(keyMarshaller, data);
512 } catch (IOException e) {
513 LOG.error("Failed to get key for " + item, e);
514 throw new RuntimeStoreException(e);
515 }
516 }
517 return result;
518 }
519
520 protected IndexLinkedList getItemList() {
521 return indexList;
522 }
523
524 protected synchronized IndexItem write(Object key, Object value) {
525 IndexItem index = null;
526 try {
527 index = indexManager.createNewIndex();
528 StoreLocation data = dataManager.storeDataItem(keyMarshaller, key);
529 index.setKeyData(data);
530
531 if (value != null) {
532 data = dataManager.storeDataItem(valueMarshaller, value);
533 index.setValueData(data);
534 }
535 IndexItem prev = indexList.getLast();
536 prev = prev != null ? prev : indexList.getRoot();
537 IndexItem next = indexList.getNextEntry(prev);
538 prev.setNextItem(index.getOffset());
539 index.setPreviousItem(prev.getOffset());
540 updateIndexes(prev);
541 if (next != null) {
542 next.setPreviousItem(index.getOffset());
543 index.setNextItem(next.getOffset());
544 updateIndexes(next);
545 }
546 storeIndex(index);
547 } catch (IOException e) {
548 LOG.error("Failed to write " + key + " , " + value, e);
549 throw new RuntimeStoreException(e);
550 }
551 return index;
552 }
553
554 public int getIndexBinSize() {
555 return indexBinSize;
556 }
557
558 public void setIndexBinSize(int indexBinSize) {
559 this.indexBinSize = indexBinSize;
560 }
561
562 public int getIndexKeySize() {
563 return indexKeySize;
564 }
565
566 public void setIndexKeySize(int indexKeySize) {
567 this.indexKeySize = indexKeySize;
568 }
569
570 public int getIndexPageSize() {
571 return indexPageSize;
572 }
573
574 public void setIndexPageSize(int indexPageSize) {
575 this.indexPageSize = indexPageSize;
576 }
577
578 public int getIndexLoadFactor() {
579 return indexLoadFactor;
580 }
581
582 public void setIndexLoadFactor(int loadFactor) {
583 this.indexLoadFactor = loadFactor;
584 }
585
586
587 public IndexMBean getIndexMBean() {
588 return (IndexMBean) index;
589 }
590 public int getIndexMaxBinSize() {
591 return indexMaxBinSize;
592 }
593
594 public void setIndexMaxBinSize(int maxBinSize) {
595 this.indexMaxBinSize = maxBinSize;
596 }
597
598
599
600 public String toString() {
601 load();
602 StringBuffer buf = new StringBuffer();
603 buf.append("{");
604 Iterator i = entrySet().iterator();
605 boolean hasNext = i.hasNext();
606 while (hasNext) {
607 Map.Entry e = (Entry) i.next();
608 Object key = e.getKey();
609 Object value = e.getValue();
610 buf.append(key);
611 buf.append("=");
612
613 buf.append(value);
614 hasNext = i.hasNext();
615 if (hasNext)
616 buf.append(", ");
617 }
618 buf.append("}");
619 return buf.toString();
620 }
621 }