001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.kahadb.journal;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.List;
024 import java.util.Map;
025
026 /**
027 * Used to pool DataFileAccessors.
028 *
029 * @author chirino
030 */
031 public class DataFileAccessorPool {
032
033 private final Journal journal;
034 private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
035 private boolean closed;
036 private int maxOpenReadersPerFile = 5;
037
038 class Pool {
039
040 private final DataFile file;
041 private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
042 private boolean used;
043 private int openCounter;
044 private boolean disposed;
045
046 public Pool(DataFile file) {
047 this.file = file;
048 }
049
050 public DataFileAccessor openDataFileReader() throws IOException {
051 DataFileAccessor rc = null;
052 if (pool.isEmpty()) {
053 rc = new DataFileAccessor(journal, file);
054 } else {
055 rc = pool.remove(pool.size() - 1);
056 }
057 used = true;
058 openCounter++;
059 return rc;
060 }
061
062 public synchronized void closeDataFileReader(DataFileAccessor reader) {
063 openCounter--;
064 if (pool.size() >= maxOpenReadersPerFile || disposed) {
065 reader.dispose();
066 } else {
067 pool.add(reader);
068 }
069 }
070
071 public synchronized void clearUsedMark() {
072 used = false;
073 }
074
075 public synchronized boolean isUsed() {
076 return used;
077 }
078
079 public synchronized void dispose() {
080 for (DataFileAccessor reader : pool) {
081 reader.dispose();
082 }
083 pool.clear();
084 disposed = true;
085 }
086
087 public synchronized int getOpenCounter() {
088 return openCounter;
089 }
090
091 }
092
093 public DataFileAccessorPool(Journal dataManager) {
094 this.journal = dataManager;
095 }
096
097 synchronized void clearUsedMark() {
098 for (Pool pool : pools.values()) {
099 pool.clearUsedMark();
100 }
101 }
102
103 synchronized void disposeUnused() {
104 for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
105 Pool pool = iter.next();
106 if (!pool.isUsed()) {
107 pool.dispose();
108 iter.remove();
109 }
110 }
111 }
112
113 synchronized void disposeDataFileAccessors(DataFile dataFile) {
114 if (closed) {
115 throw new IllegalStateException("Closed.");
116 }
117 Pool pool = pools.get(dataFile.getDataFileId());
118 if (pool != null) {
119 if (pool.getOpenCounter() == 0) {
120 pool.dispose();
121 pools.remove(dataFile.getDataFileId());
122 } else {
123 throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
124 }
125 }
126 }
127
128 synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
129 if (closed) {
130 throw new IOException("Closed.");
131 }
132
133 Pool pool = pools.get(dataFile.getDataFileId());
134 if (pool == null) {
135 pool = new Pool(dataFile);
136 pools.put(dataFile.getDataFileId(), pool);
137 }
138 return pool.openDataFileReader();
139 }
140
141 synchronized void closeDataFileAccessor(DataFileAccessor reader) {
142 Pool pool = pools.get(reader.getDataFile().getDataFileId());
143 if (pool == null || closed) {
144 reader.dispose();
145 } else {
146 pool.closeDataFileReader(reader);
147 }
148 }
149
150 public synchronized void close() {
151 if (closed) {
152 return;
153 }
154 closed = true;
155 for (Pool pool : pools.values()) {
156 pool.dispose();
157 }
158 pools.clear();
159 }
160
161 }