001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb.plist;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.IOException;
022 import java.util.Iterator;
023 import java.util.Map;
024 import java.util.NoSuchElementException;
025 import java.util.Set;
026 import java.util.concurrent.atomic.AtomicBoolean;
027 import java.util.concurrent.atomic.AtomicReference;
028 import org.apache.kahadb.index.ListIndex;
029 import org.apache.kahadb.journal.Location;
030 import org.apache.kahadb.page.Transaction;
031 import org.apache.kahadb.util.ByteSequence;
032 import org.apache.kahadb.util.LocationMarshaller;
033 import org.apache.kahadb.util.StringMarshaller;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036
037 public class PList extends ListIndex<String, Location> {
038 static final Logger LOG = LoggerFactory.getLogger(PList.class);
039 final PListStore store;
040 private String name;
041 Object indexLock;
042
043 PList(PListStore store) {
044 this.store = store;
045 this.indexLock = store.getIndexLock();
046 setPageFile(store.getPageFile());
047 setKeyMarshaller(StringMarshaller.INSTANCE);
048 setValueMarshaller(LocationMarshaller.INSTANCE);
049 }
050
051 public void setName(String name) {
052 this.name = name;
053 }
054
055 public String getName() {
056 return this.name;
057 }
058
059 void read(DataInput in) throws IOException {
060 setHeadPageId(in.readLong());
061 }
062
063 public void write(DataOutput out) throws IOException {
064 out.writeLong(getHeadPageId());
065 }
066
067 public synchronized void destroy() throws IOException {
068 synchronized (indexLock) {
069 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
070 public void execute(Transaction tx) throws IOException {
071 clear(tx);
072 unload(tx);
073 }
074 });
075 }
076 }
077
078 public void addLast(final String id, final ByteSequence bs) throws IOException {
079 final Location location = this.store.write(bs, false);
080 synchronized (indexLock) {
081 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
082 public void execute(Transaction tx) throws IOException {
083 add(tx, id, location);
084 }
085 });
086 }
087 }
088
089 public void addFirst(final String id, final ByteSequence bs) throws IOException {
090 final Location location = this.store.write(bs, false);
091 synchronized (indexLock) {
092 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
093 public void execute(Transaction tx) throws IOException {
094 addFirst(tx, id, location);
095 }
096 });
097 }
098 }
099
100 public boolean remove(final String id) throws IOException {
101 final AtomicBoolean result = new AtomicBoolean();
102 synchronized (indexLock) {
103 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
104 public void execute(Transaction tx) throws IOException {
105 result.set(remove(tx, id) != null);
106 }
107 });
108 }
109 return result.get();
110 }
111
112 public boolean remove(final long position) throws IOException {
113 final AtomicBoolean result = new AtomicBoolean();
114 synchronized (indexLock) {
115 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
116 public void execute(Transaction tx) throws IOException {
117 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
118 if (iterator.hasNext()) {
119 iterator.next();
120 iterator.remove();
121 result.set(true);
122 } else {
123 result.set(false);
124 }
125 }
126 });
127 }
128 return result.get();
129 }
130
131 public PListEntry get(final long position) throws IOException {
132 PListEntry result = null;
133 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
134 synchronized (indexLock) {
135 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
136 public void execute(Transaction tx) throws IOException {
137 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
138 ref.set(iterator.next());
139 }
140 });
141 }
142 if (ref.get() != null) {
143 ByteSequence bs = this.store.getPayload(ref.get().getValue());
144 result = new PListEntry(ref.get().getKey(), bs);
145 }
146 return result;
147 }
148
149 public PListEntry getFirst() throws IOException {
150 PListEntry result = null;
151 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
152 synchronized (indexLock) {
153 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
154 public void execute(Transaction tx) throws IOException {
155 ref.set(getFirst(tx));
156 }
157 });
158 }
159 if (ref.get() != null) {
160 ByteSequence bs = this.store.getPayload(ref.get().getValue());
161 result = new PListEntry(ref.get().getKey(), bs);
162 }
163 return result;
164 }
165
166 public PListEntry getLast() throws IOException {
167 PListEntry result = null;
168 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
169 synchronized (indexLock) {
170 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
171 public void execute(Transaction tx) throws IOException {
172 ref.set(getLast(tx));
173 }
174 });
175 }
176 if (ref.get() != null) {
177 ByteSequence bs = this.store.getPayload(ref.get().getValue());
178 result = new PListEntry(ref.get().getKey(), bs);
179 }
180 return result;
181 }
182
183 public boolean isEmpty() {
184 return size() == 0;
185 }
186
187 public PListIterator iterator() throws IOException {
188 return new PListIterator();
189 }
190
191 public final class PListIterator implements Iterator<PListEntry> {
192 final Iterator<Map.Entry<String, Location>> iterator;
193 final Transaction tx;
194
195 PListIterator() throws IOException {
196 tx = store.pageFile.tx();
197 synchronized (indexLock) {
198 this.iterator = iterator(tx);
199 }
200 }
201
202 @Override
203 public boolean hasNext() {
204 return iterator.hasNext();
205 }
206
207 @Override
208 public PListEntry next() {
209 Map.Entry<String, Location> entry = iterator.next();
210 ByteSequence bs = null;
211 try {
212 bs = store.getPayload(entry.getValue());
213 } catch (IOException unexpected) {
214 NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
215 e.initCause(unexpected);
216 throw e;
217 }
218 return new PListEntry(entry.getKey(), bs);
219 }
220
221 @Override
222 public void remove() {
223 try {
224 synchronized (indexLock) {
225 tx.execute(new Transaction.Closure<IOException>() {
226 @Override
227 public void execute(Transaction tx) throws IOException {
228 iterator.remove();
229 }
230 });
231 }
232 } catch (IOException unexpected) {
233 IllegalStateException e = new IllegalStateException(unexpected);
234 e.initCause(unexpected);
235 throw e;
236 }
237 }
238
239 public void release() {
240 try {
241 tx.rollback();
242 } catch (IOException unexpected) {
243 IllegalStateException e = new IllegalStateException(unexpected);
244 e.initCause(unexpected);
245 throw e;
246 }
247 }
248 }
249
250 public void claimFileLocations(final Set<Integer> candidates) throws IOException {
251 synchronized (indexLock) {
252 if (loaded.get()) {
253 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
254 public void execute(Transaction tx) throws IOException {
255 Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
256 while (iterator.hasNext()) {
257 Location location = iterator.next().getValue();
258 candidates.remove(location.getDataFileId());
259 }
260 }
261 });
262 }
263 }
264 }
265
266 @Override
267 public String toString() {
268 return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
269 }
270 }