001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl.container;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.List;
022
023 import org.apache.activemq.kaha.ContainerId;
024 import org.apache.activemq.kaha.RuntimeStoreException;
025 import org.apache.activemq.kaha.StoreEntry;
026 import org.apache.activemq.kaha.impl.DataManager;
027 import org.apache.activemq.kaha.impl.data.Item;
028 import org.apache.activemq.kaha.impl.index.DiskIndexLinkedList;
029 import org.apache.activemq.kaha.impl.index.IndexItem;
030 import org.apache.activemq.kaha.impl.index.IndexLinkedList;
031 import org.apache.activemq.kaha.impl.index.IndexManager;
032 import org.apache.activemq.kaha.impl.index.VMIndexLinkedList;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 /**
037 * Implementation of a ListContainer
038 *
039 *
040 */
041 public abstract class BaseContainerImpl {
042
043 private static final Logger LOG = LoggerFactory.getLogger(BaseContainerImpl.class);
044 protected IndexItem root;
045 protected IndexLinkedList indexList;
046 protected IndexManager indexManager;
047 protected DataManager dataManager;
048 protected ContainerId containerId;
049 protected boolean loaded;
050 protected boolean closed;
051 protected boolean initialized;
052 protected boolean persistentIndex;
053
054 protected BaseContainerImpl(ContainerId id, IndexItem root, IndexManager indexManager, DataManager dataManager, boolean persistentIndex) {
055 this.containerId = id;
056 this.root = root;
057 this.indexManager = indexManager;
058 this.dataManager = dataManager;
059 this.persistentIndex = persistentIndex;
060 }
061
062 public ContainerId getContainerId() {
063 return containerId;
064 }
065
066 public synchronized void init() {
067 if (!initialized) {
068 if (!initialized) {
069 initialized = true;
070 if (this.indexList == null) {
071 if (persistentIndex) {
072 this.indexList = new DiskIndexLinkedList(indexManager, root);
073 } else {
074 this.indexList = new VMIndexLinkedList(root);
075 }
076 }
077 }
078 }
079 }
080
081 public synchronized void clear() {
082 if (indexList != null) {
083 indexList.clear();
084 }
085 }
086
087 /**
088 * @return the indexList
089 */
090 public IndexLinkedList getList() {
091 return indexList;
092 }
093
094 /**
095 * @param indexList the indexList to set
096 */
097 public void setList(IndexLinkedList indexList) {
098 this.indexList = indexList;
099 }
100
101 public abstract void unload();
102
103 public abstract void load();
104
105 public abstract int size();
106
107 protected abstract Object getValue(StoreEntry currentItem);
108
109 protected abstract void remove(IndexItem currentItem);
110
111 protected final synchronized IndexLinkedList getInternalList() {
112 return indexList;
113 }
114
115 public final synchronized void close() {
116 unload();
117 closed = true;
118 }
119
120 /*
121 * (non-Javadoc)
122 *
123 * @see org.apache.activemq.kaha.ListContainer#isLoaded()
124 */
125 public final synchronized boolean isLoaded() {
126 checkClosed();
127 return loaded;
128 }
129
130 /*
131 * (non-Javadoc)
132 *
133 * @see org.apache.activemq.kaha.ListContainer#getId()
134 */
135 public final Object getId() {
136 checkClosed();
137 return containerId.getKey();
138 }
139
140 public DataManager getDataManager() {
141 return dataManager;
142 }
143
144 public IndexManager getIndexManager() {
145 return indexManager;
146 }
147
148 public final synchronized void expressDataInterest() throws IOException {
149 long nextItem = root.getNextItem();
150 while (nextItem != Item.POSITION_NOT_SET) {
151 IndexItem item = indexManager.getIndex(nextItem);
152 item.setOffset(nextItem);
153 dataManager.addInterestInFile(item.getKeyFile());
154 dataManager.addInterestInFile(item.getValueFile());
155 nextItem = item.getNextItem();
156 }
157 }
158
159 protected final void doClear() {
160 checkClosed();
161 loaded = true;
162 List<IndexItem> indexList = new ArrayList<IndexItem>();
163 try {
164 init();
165 long nextItem = root.getNextItem();
166 while (nextItem != Item.POSITION_NOT_SET) {
167 IndexItem item = new IndexItem();
168 item.setOffset(nextItem);
169 indexList.add(item);
170 nextItem = item.getNextItem();
171 }
172 root.setNextItem(Item.POSITION_NOT_SET);
173 storeIndex(root);
174 for (int i = 0; i < indexList.size(); i++) {
175 IndexItem item = indexList.get(i);
176 dataManager.removeInterestInFile(item.getKeyFile());
177 dataManager.removeInterestInFile(item.getValueFile());
178 indexManager.freeIndex(item);
179 }
180 indexList.clear();
181 } catch (IOException e) {
182 LOG.error("Failed to clear Container " + getId(), e);
183 throw new RuntimeStoreException(e);
184 }
185 }
186
187 protected final void delete(final IndexItem keyItem, final IndexItem prevItem, final IndexItem nextItem) {
188 if (keyItem != null) {
189 try {
190 root = indexList.getRoot();
191 IndexItem prev = prevItem == null ? root : prevItem;
192 IndexItem next = (nextItem == null || !nextItem.equals(root)) ? nextItem : null;
193 dataManager.removeInterestInFile(keyItem.getKeyFile());
194 dataManager.removeInterestInFile(keyItem.getValueFile());
195 if (next != null) {
196 prev.setNextItem(next.getOffset());
197 next.setPreviousItem(prev.getOffset());
198 updateIndexes(next);
199 } else {
200 prev.setNextItem(Item.POSITION_NOT_SET);
201 }
202 updateIndexes(prev);
203 indexManager.freeIndex(keyItem);
204 } catch (IOException e) {
205 LOG.error("Failed to delete " + keyItem, e);
206 throw new RuntimeStoreException(e);
207 }
208 }
209 }
210
211 protected final void checkClosed() {
212 if (closed) {
213 throw new RuntimeStoreException("The store is closed");
214 }
215 }
216
217 protected void storeIndex(IndexItem item) throws IOException {
218 indexManager.storeIndex(item);
219 }
220
221 protected void updateIndexes(IndexItem item) throws IOException {
222 indexManager.updateIndexes(item);
223 }
224
225 protected final boolean isRoot(StoreEntry item) {
226 return item != null && root != null && (root == item || root.getOffset() == item.getOffset());
227 // return item != null && indexRoot != null && indexRoot == item;
228 }
229
230 }