001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.kahadb.index;
018
019 import java.io.IOException;
020 import java.lang.ref.WeakReference;
021 import java.util.Iterator;
022 import java.util.Map;
023 import java.util.concurrent.atomic.AtomicBoolean;
024 import java.util.concurrent.atomic.AtomicLong;
025
026 import org.apache.kahadb.index.ListNode.ListIterator;
027 import org.apache.kahadb.page.Page;
028 import org.apache.kahadb.page.PageFile;
029 import org.apache.kahadb.page.Transaction;
030 import org.apache.kahadb.util.Marshaller;
031 import org.slf4j.Logger;
032 import org.slf4j.LoggerFactory;
033
034 public class ListIndex<Key,Value> implements Index<Key,Value> {
035
036 private static final Logger LOG = LoggerFactory.getLogger(ListIndex.class);
037 public final static long NOT_SET = -1;
038 protected PageFile pageFile;
039 protected long headPageId;
040 protected long tailPageId;
041 private AtomicLong size = new AtomicLong(0);
042
043 protected AtomicBoolean loaded = new AtomicBoolean();
044
045 private ListNode.NodeMarshaller<Key, Value> marshaller;
046 private Marshaller<Key> keyMarshaller;
047 private Marshaller<Value> valueMarshaller;
048
049 public ListIndex() {
050 }
051
052 public ListIndex(PageFile pageFile, long headPageId) {
053 this.pageFile = pageFile;
054 setHeadPageId(headPageId);
055 }
056
057 @SuppressWarnings("rawtypes")
058 public ListIndex(PageFile pageFile, Page page) {
059 this(pageFile, page.getPageId());
060 }
061
062 synchronized public void load(Transaction tx) throws IOException {
063 if (loaded.compareAndSet(false, true)) {
064 LOG.debug("loading");
065 if( keyMarshaller == null ) {
066 throw new IllegalArgumentException("The key marshaller must be set before loading the ListIndex");
067 }
068 if( valueMarshaller == null ) {
069 throw new IllegalArgumentException("The value marshaller must be set before loading the ListIndex");
070 }
071
072 marshaller = new ListNode.NodeMarshaller<Key, Value>(keyMarshaller, valueMarshaller);
073 final Page<ListNode<Key,Value>> p = tx.load(getHeadPageId(), null);
074 if( p.getType() == Page.PAGE_FREE_TYPE ) {
075 // Need to initialize it..
076 ListNode<Key, Value> root = createNode(p);
077 storeNode(tx, root, true);
078 setHeadPageId(p.getPageId());
079 setTailPageId(getHeadPageId());
080 } else {
081 ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
082 setTailPageId(getHeadPageId());
083 size.addAndGet(node.size(tx));
084 while (node.getNext() != NOT_SET ) {
085 node = loadNode(tx, node.getNext());
086 size.addAndGet(node.size(tx));
087 setTailPageId(node.getPageId());
088 }
089 }
090 }
091 }
092
093 synchronized public void unload(Transaction tx) {
094 if (loaded.compareAndSet(true, false)) {
095 }
096 }
097
098 protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
099 return loadNode(tx, getHeadPageId());
100 }
101
102 protected ListNode<Key,Value> getTail(Transaction tx) throws IOException {
103 return loadNode(tx, getTailPageId());
104 }
105
106 synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
107 assertLoaded();
108
109 if (size.get() == 0) {
110 return false;
111 }
112
113 for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
114 Map.Entry<Key,Value> candidate = iterator.next();
115 if (key.equals(candidate.getKey())) {
116 return true;
117 }
118 }
119 return false;
120 }
121
122 private ListNode<Key, Value> lastGetNodeCache = null;
123 private Map.Entry<Key, Value> lastGetEntryCache = null;
124 private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null);
125
126 @SuppressWarnings({ "rawtypes", "unchecked" })
127 synchronized public Value get(Transaction tx, Key key) throws IOException {
128 assertLoaded();
129 for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
130 Map.Entry<Key,Value> candidate = iterator.next();
131 if (key.equals(candidate.getKey())) {
132 this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
133 this.lastGetEntryCache = candidate;
134 this.lastCacheTxSrc = new WeakReference<Transaction>(tx);
135 return candidate.getValue();
136 }
137 }
138 return null;
139 }
140
141 /**
142 * Update the value of the item with the given key in the list if ot exists, otherwise
143 * it appends the value to the end of the list.
144 *
145 * @return the old value contained in the list if one exists or null.
146 */
147 @SuppressWarnings({ "rawtypes" })
148 synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
149
150 Value oldValue = null;
151
152 if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
153
154 if(lastGetEntryCache.getKey().equals(key)) {
155 oldValue = lastGetEntryCache.setValue(value);
156 lastGetEntryCache.setValue(value);
157 lastGetNodeCache.storeUpdate(tx);
158 flushCache();
159 return oldValue;
160 }
161
162 // This searches from the last location of a call to get for the element to replace
163 // all the way to the end of the ListIndex.
164 Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
165 while (iterator.hasNext()) {
166 Map.Entry<Key, Value> entry = iterator.next();
167 if (entry.getKey().equals(key)) {
168 oldValue = entry.setValue(value);
169 ((ListIterator) iterator).getCurrent().storeUpdate(tx);
170 flushCache();
171 return oldValue;
172 }
173 }
174 } else {
175 flushCache();
176 }
177
178 // Not found because the cache wasn't set or its not at the end of the list so we
179 // start from the beginning and go to the cached location or the end, then we do
180 // an add if its not found.
181 Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
182 while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
183 Map.Entry<Key, Value> entry = iterator.next();
184 if (entry.getKey().equals(key)) {
185 oldValue = entry.setValue(value);
186 ((ListIterator) iterator).getCurrent().storeUpdate(tx);
187 flushCache();
188 return oldValue;
189 }
190 }
191
192 // Not found so add it last.
193 flushCache();
194
195 return add(tx, key, value);
196 }
197
198 synchronized public Value add(Transaction tx, Key key, Value value) throws IOException {
199 assertLoaded();
200 getTail(tx).put(tx, key, value);
201 size.incrementAndGet();
202 flushCache();
203 return null;
204 }
205
206 synchronized public Value addFirst(Transaction tx, Key key, Value value) throws IOException {
207 assertLoaded();
208 getHead(tx).addFirst(tx, key, value);
209 size.incrementAndGet();
210 flushCache();
211 return null;
212 }
213
214 @SuppressWarnings("rawtypes")
215 synchronized public Value remove(Transaction tx, Key key) throws IOException {
216 assertLoaded();
217
218 if (size.get() == 0) {
219 return null;
220 }
221
222 if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
223
224 // This searches from the last location of a call to get for the element to remove
225 // all the way to the end of the ListIndex.
226 Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
227 while (iterator.hasNext()) {
228 Map.Entry<Key, Value> entry = iterator.next();
229 if (entry.getKey().equals(key)) {
230 iterator.remove();
231 flushCache();
232 return entry.getValue();
233 }
234 }
235 } else {
236 flushCache();
237 }
238
239 // Not found because the cache wasn't set or its not at the end of the list so we
240 // start from the beginning and go to the cached location or the end to find the
241 // element to remove.
242 Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
243 while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
244 Map.Entry<Key, Value> entry = iterator.next();
245 if (entry.getKey().equals(key)) {
246 iterator.remove();
247 flushCache();
248 return entry.getValue();
249 }
250 }
251
252 return null;
253 }
254
255 public void onRemove() {
256 size.decrementAndGet();
257 flushCache();
258 }
259
260 public boolean isTransient() {
261 return false;
262 }
263
264 synchronized public void clear(Transaction tx) throws IOException {
265 for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
266 ListNode<Key,Value>candidate = iterator.next();
267 candidate.clear(tx);
268 // break up the transaction
269 tx.commit();
270 }
271 flushCache();
272 size.set(0);
273 }
274
275 synchronized public Iterator<ListNode<Key, Value>> listNodeIterator(Transaction tx) throws IOException {
276 return getHead(tx).listNodeIterator(tx);
277 }
278
279 synchronized public boolean isEmpty(final Transaction tx) throws IOException {
280 return getHead(tx).isEmpty(tx);
281 }
282
283 synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
284 return getHead(tx).iterator(tx);
285 }
286
287 synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException {
288 return getHead(tx).iterator(tx, initialPosition);
289 }
290
291 synchronized public Map.Entry<Key,Value> getFirst(Transaction tx) throws IOException {
292 return getHead(tx).getFirst(tx);
293 }
294
295 synchronized public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException {
296 return getTail(tx).getLast(tx);
297 }
298
299 private void assertLoaded() throws IllegalStateException {
300 if( !loaded.get() ) {
301 throw new IllegalStateException("TheListIndex is not loaded");
302 }
303 }
304
305 ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
306 Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
307 ListNode<Key, Value> node = page.get();
308 node.setPage(page);
309 node.setContainingList(this);
310 return node;
311 }
312
313 ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
314 ListNode<Key,Value> node = new ListNode<Key,Value>();
315 node.setPage(page);
316 page.set(node);
317 node.setContainingList(this);
318 return node;
319 }
320
321 public ListNode<Key,Value> createNode(Transaction tx) throws IOException {
322 return createNode(tx.<ListNode<Key,Value>>load(tx.<ListNode<Key,Value>>allocate().getPageId(), null));
323 }
324
325 public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException {
326 tx.store(node.getPage(), marshaller, overflow);
327 flushCache();
328 }
329
330 public PageFile getPageFile() {
331 return pageFile;
332 }
333
334 public void setPageFile(PageFile pageFile) {
335 this.pageFile = pageFile;
336 }
337
338 public long getHeadPageId() {
339 return headPageId;
340 }
341
342 public void setHeadPageId(long headPageId) {
343 this.headPageId = headPageId;
344 }
345
346 public Marshaller<Key> getKeyMarshaller() {
347 return keyMarshaller;
348 }
349 public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
350 this.keyMarshaller = keyMarshaller;
351 }
352
353 public Marshaller<Value> getValueMarshaller() {
354 return valueMarshaller;
355 }
356 public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
357 this.valueMarshaller = valueMarshaller;
358 }
359
360 public void setTailPageId(long tailPageId) {
361 this.tailPageId = tailPageId;
362 }
363
364 public long getTailPageId() {
365 return tailPageId;
366 }
367
368 public long size() {
369 return size.get();
370 }
371
372 private void flushCache() {
373 this.lastGetEntryCache = null;
374 this.lastGetNodeCache = null;
375 this.lastCacheTxSrc.clear();
376 }
377 }