001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl.index;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.io.RandomAccessFile;
022 import java.nio.channels.FileLock;
023 import java.util.concurrent.atomic.AtomicLong;
024
025 import org.apache.activemq.kaha.impl.DataManager;
026 import org.apache.activemq.util.IOHelper;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * Optimized Store reader
032 *
033 *
034 */
035 public final class IndexManager {
036
037 public static final String NAME_PREFIX = "index-";
038 private static final Logger LOG = LoggerFactory.getLogger(IndexManager.class);
039 private final String name;
040 private File directory;
041 private File file;
042 private RandomAccessFile indexFile;
043 private StoreIndexReader reader;
044 private StoreIndexWriter writer;
045 private DataManager redoLog;
046 private String mode;
047 private long length;
048 private IndexItem firstFree;
049 private IndexItem lastFree;
050 private boolean dirty;
051 private final AtomicLong storeSize;
052 private int freeSize = 0;
053
054 public IndexManager(File directory, String name, String mode, DataManager redoLog, AtomicLong storeSize) throws IOException {
055 this.directory = directory;
056 this.name = name;
057 this.mode = mode;
058 this.redoLog = redoLog;
059 this.storeSize=storeSize;
060 initialize();
061 }
062
063 public synchronized boolean isEmpty() {
064 return lastFree == null && length == 0;
065 }
066
067 public synchronized IndexItem getIndex(long offset) throws IOException {
068 IndexItem result = null;
069 if (offset >= 0) {
070 result = reader.readItem(offset);
071 }
072 return result;
073 }
074
075 public synchronized IndexItem refreshIndex(IndexItem item) throws IOException {
076 reader.updateIndexes(item);
077 return item;
078 }
079
080 public synchronized void freeIndex(IndexItem item) throws IOException {
081 item.reset();
082 item.setActive(false);
083 if (lastFree == null) {
084 firstFree = item;
085 lastFree = item;
086 } else {
087 lastFree.setNextItem(item.getOffset());
088 if (lastFree.equals(firstFree)) {
089 firstFree=new IndexItem();
090 firstFree.copyIndex(lastFree);
091 writer.updateIndexes(firstFree);
092 }
093 writer.updateIndexes(lastFree);
094 lastFree=item;
095 }
096 writer.updateIndexes(item);
097 freeSize++;
098 dirty = true;
099 }
100
101 public synchronized void storeIndex(IndexItem index) throws IOException {
102 writer.storeItem(index);
103 dirty = true;
104 }
105
106 public synchronized void updateIndexes(IndexItem index) throws IOException {
107 try {
108 writer.updateIndexes(index);
109 } catch (Throwable e) {
110 LOG.error(name + " error updating indexes ", e);
111 }
112 dirty = true;
113 }
114
115 public synchronized void redo(final RedoStoreIndexItem redo) throws IOException {
116 writer.redoStoreItem(redo);
117 dirty = true;
118 }
119
120 public synchronized IndexItem createNewIndex() throws IOException {
121 IndexItem result = getNextFreeIndex();
122 if (result == null) {
123 // allocate one
124 result = new IndexItem();
125 result.setOffset(length);
126 length += IndexItem.INDEX_SIZE;
127 storeSize.addAndGet(IndexItem.INDEX_SIZE);
128 }
129 return result;
130 }
131
132 public synchronized void close() throws IOException {
133 if (indexFile != null) {
134 indexFile.close();
135 indexFile = null;
136 }
137 }
138
139 public synchronized void force() throws IOException {
140 if (indexFile != null && dirty) {
141 indexFile.getFD().sync();
142 dirty = false;
143 }
144 }
145
146 public synchronized boolean delete() throws IOException {
147 firstFree = null;
148 lastFree = null;
149 if (indexFile != null) {
150 indexFile.close();
151 indexFile = null;
152 }
153 return file.delete();
154 }
155
156 private synchronized IndexItem getNextFreeIndex() throws IOException {
157 IndexItem result = null;
158 if (firstFree != null) {
159 if (firstFree.equals(lastFree)) {
160 result = firstFree;
161 firstFree = null;
162 lastFree = null;
163 } else {
164 result = firstFree;
165 firstFree = getIndex(firstFree.getNextItem());
166 if (firstFree == null) {
167 lastFree = null;
168 }
169 }
170 result.reset();
171 writer.updateIndexes(result);
172 freeSize--;
173 }
174 return result;
175 }
176
177 synchronized long getLength() {
178 return length;
179 }
180
181 public final long size() {
182 return length;
183 }
184
185 public synchronized void setLength(long value) {
186 this.length = value;
187 storeSize.addAndGet(length);
188 }
189
190 public synchronized FileLock getLock() throws IOException {
191 return indexFile.getChannel().tryLock(0, Math.max(1, indexFile.getChannel().size()), false);
192 }
193
194
195 public String toString() {
196 return "IndexManager:(" + NAME_PREFIX + name + ")";
197 }
198
199 protected void initialize() throws IOException {
200 file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name) );
201 IOHelper.mkdirs(file.getParentFile());
202 indexFile = new RandomAccessFile(file, mode);
203 reader = new StoreIndexReader(indexFile);
204 writer = new StoreIndexWriter(indexFile, name, redoLog);
205 long offset = 0;
206 while ((offset + IndexItem.INDEX_SIZE) <= indexFile.length()) {
207 IndexItem index = reader.readItem(offset);
208 if (!index.isActive()) {
209 index.reset();
210 if (lastFree != null) {
211 lastFree.setNextItem(index.getOffset());
212 updateIndexes(lastFree);
213 lastFree = index;
214 } else {
215 lastFree = index;
216 firstFree = index;
217 }
218 freeSize++;
219 }
220 offset += IndexItem.INDEX_SIZE;
221 }
222 length = offset;
223 storeSize.addAndGet(length);
224 }
225 }