001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.kahadb.util;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.Iterator;
024 import java.util.List;
025 import java.util.NoSuchElementException;
026
027 /**
028 * Keeps track of a added long values. Collapses ranges of numbers using a
029 * Sequence representation. Use to keep track of received message ids to find
030 * out if a message is duplicate or if there are any missing messages.
031 *
032 * @author chirino
033 */
034 public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> {
035
036 public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet> {
037
038 public static final Marshaller INSTANCE = new Marshaller();
039
040 public SequenceSet readPayload(DataInput in) throws IOException {
041 SequenceSet value = new SequenceSet();
042 int count = in.readInt();
043 for (int i = 0; i < count; i++) {
044 if( in.readBoolean() ) {
045 Sequence sequence = new Sequence(in.readLong(), in.readLong());
046 value.addLast(sequence);
047 } else {
048 Sequence sequence = new Sequence(in.readLong());
049 value.addLast(sequence);
050 }
051 }
052 return value;
053 }
054
055 public void writePayload(SequenceSet value, DataOutput out) throws IOException {
056 out.writeInt(value.size());
057 Sequence sequence = value.getHead();
058 while (sequence != null ) {
059 if( sequence.range() > 1 ) {
060 out.writeBoolean(true);
061 out.writeLong(sequence.first);
062 out.writeLong(sequence.last);
063 } else {
064 out.writeBoolean(false);
065 out.writeLong(sequence.first);
066 }
067 sequence = sequence.getNext();
068 }
069 }
070
071 public int getFixedSize() {
072 return -1;
073 }
074
075 public SequenceSet deepCopy(SequenceSet value) {
076 SequenceSet rc = new SequenceSet();
077 Sequence sequence = value.getHead();
078 while (sequence != null ) {
079 rc.add(new Sequence(sequence.first, sequence.last));
080 sequence = sequence.getNext();
081 }
082 return rc;
083 }
084
085 public boolean isDeepCopySupported() {
086 return true;
087 }
088 }
089
090 public void add(Sequence value) {
091 // TODO we can probably optimize this a bit
092 for(long i=value.first; i<value.last+1; i++) {
093 add(i);
094 }
095 }
096
097 /**
098 *
099 * @param value
100 * the value to add to the list
101 * @return false if the value was a duplicate.
102 */
103 public boolean add(long value) {
104
105 if (isEmpty()) {
106 addFirst(new Sequence(value));
107 return true;
108 }
109
110 Sequence sequence = getHead();
111 while (sequence != null) {
112
113 if (sequence.isAdjacentToLast(value)) {
114 // grow the sequence...
115 sequence.last = value;
116 // it might connect us to the next sequence..
117 if (sequence.getNext() != null) {
118 Sequence next = sequence.getNext();
119 if (next.isAdjacentToFirst(value)) {
120 // Yep the sequence connected.. so join them.
121 sequence.last = next.last;
122 next.unlink();
123 }
124 }
125 return true;
126 }
127
128 if (sequence.isAdjacentToFirst(value)) {
129 // grow the sequence...
130 sequence.first = value;
131
132 // it might connect us to the previous
133 if (sequence.getPrevious() != null) {
134 Sequence prev = sequence.getPrevious();
135 if (prev.isAdjacentToLast(value)) {
136 // Yep the sequence connected.. so join them.
137 sequence.first = prev.first;
138 prev.unlink();
139 }
140 }
141 return true;
142 }
143
144 // Did that value land before this sequence?
145 if (value < sequence.first) {
146 // Then insert a new entry before this sequence item.
147 sequence.linkBefore(new Sequence(value));
148 return true;
149 }
150
151 // Did that value land within the sequence? The it's a duplicate.
152 if (sequence.contains(value)) {
153 return false;
154 }
155
156 sequence = sequence.getNext();
157 }
158
159 // Then the value is getting appended to the tail of the sequence.
160 addLast(new Sequence(value));
161 return true;
162 }
163
164 /**
165 * Removes the given value from the Sequence set, splitting a
166 * contained sequence if necessary.
167 *
168 * @param value
169 * The value that should be removed from the SequenceSet.
170 *
171 * @return true if the value was removed from the set, false if there
172 * was no sequence in the set that contained the given value.
173 */
174 public boolean remove(long value) {
175 Sequence sequence = getHead();
176 while (sequence != null ) {
177 if(sequence.contains(value)) {
178 if (sequence.range() == 1) {
179 sequence.unlink();
180 return true;
181 } else if (sequence.getFirst() == value) {
182 sequence.setFirst(value+1);
183 return true;
184 } else if (sequence.getLast() == value) {
185 sequence.setLast(value-1);
186 return true;
187 } else {
188 sequence.linkBefore(new Sequence(sequence.first, value-1));
189 sequence.linkAfter(new Sequence(value+1, sequence.last));
190 sequence.unlink();
191 return true;
192 }
193 }
194
195 sequence = sequence.getNext();
196 }
197
198 return false;
199 }
200
201 /**
202 * Removes and returns the first element from this list.
203 *
204 * @return the first element from this list.
205 * @throws NoSuchElementException if this list is empty.
206 */
207 public long removeFirst() {
208 if (isEmpty()) {
209 throw new NoSuchElementException();
210 }
211
212 Sequence rc = removeFirstSequence(1);
213 return rc.first;
214 }
215
216 /**
217 * Removes and returns the last sequence from this list.
218 *
219 * @return the last sequence from this list or null if the list is empty.
220 */
221 public Sequence removeLastSequence() {
222 if (isEmpty()) {
223 return null;
224 }
225
226 Sequence rc = getTail();
227 rc.unlink();
228 return rc;
229 }
230
231 /**
232 * Removes and returns the first sequence that is count range large.
233 *
234 * @return a sequence that is count range large, or null if no sequence is that large in the list.
235 */
236 public Sequence removeFirstSequence(long count) {
237 if (isEmpty()) {
238 return null;
239 }
240
241 Sequence sequence = getHead();
242 while (sequence != null ) {
243 if (sequence.range() == count ) {
244 sequence.unlink();
245 return sequence;
246 }
247 if (sequence.range() > count ) {
248 Sequence rc = new Sequence(sequence.first, sequence.first+count-1);
249 sequence.first+=count;
250 return rc;
251 }
252 sequence = sequence.getNext();
253 }
254 return null;
255 }
256
257 /**
258 * @return all the id Sequences that are missing from this set that are not
259 * in between the range provided.
260 */
261 public List<Sequence> getMissing(long first, long last) {
262 ArrayList<Sequence> rc = new ArrayList<Sequence>();
263 if (first > last) {
264 throw new IllegalArgumentException("First cannot be more than last");
265 }
266 if (isEmpty()) {
267 // We are missing all the messages.
268 rc.add(new Sequence(first, last));
269 return rc;
270 }
271
272 Sequence sequence = getHead();
273 while (sequence != null && first <= last) {
274 if (sequence.contains(first)) {
275 first = sequence.last + 1;
276 } else {
277 if (first < sequence.first) {
278 if (last < sequence.first) {
279 rc.add(new Sequence(first, last));
280 return rc;
281 } else {
282 rc.add(new Sequence(first, sequence.first - 1));
283 first = sequence.last + 1;
284 }
285 }
286 }
287 sequence = sequence.getNext();
288 }
289
290 if (first <= last) {
291 rc.add(new Sequence(first, last));
292 }
293 return rc;
294 }
295
296 /**
297 * @return all the Sequence that are in this list
298 */
299 public List<Sequence> getReceived() {
300 ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
301 Sequence sequence = getHead();
302 while (sequence != null) {
303 rc.add(new Sequence(sequence.first, sequence.last));
304 sequence = sequence.getNext();
305 }
306 return rc;
307 }
308
309 /**
310 * Returns true if the value given is contained within one of the
311 * sequences held in this set.
312 *
313 * @param value
314 * The value to search for in the set.
315 *
316 * @return true if the value is contained in the set.
317 */
318 public boolean contains(long value) {
319 if (isEmpty()) {
320 return false;
321 }
322
323 Sequence sequence = getHead();
324 while (sequence != null) {
325 if (sequence.contains(value)) {
326 return true;
327 }
328 sequence = sequence.getNext();
329 }
330
331 return false;
332 }
333
334 public boolean contains(int first, int last) {
335 if (isEmpty()) {
336 return false;
337 }
338 Sequence sequence = getHead();
339 while (sequence != null) {
340 if (sequence.first <= first && first <= sequence.last ) {
341 return last <= sequence.last;
342 }
343 sequence = sequence.getNext();
344 }
345 return false;
346 }
347
348 /**
349 * Computes the size of this Sequence by summing the values of all
350 * the contained sequences.
351 *
352 * @return the total number of values contained in this set if it
353 * were to be iterated over like an array.
354 */
355 public long rangeSize() {
356 long result = 0;
357 Sequence sequence = getHead();
358 while (sequence != null) {
359 result += sequence.range();
360 sequence = sequence.getNext();
361 }
362 return result;
363 }
364
365 public Iterator<Long> iterator() {
366 return new SequenceIterator();
367 }
368
369 private class SequenceIterator implements Iterator<Long> {
370
371 private Sequence currentEntry;
372 private long lastReturned = -1;
373
374 public SequenceIterator() {
375 currentEntry = getHead();
376 if (currentEntry != null) {
377 lastReturned = currentEntry.first - 1;
378 }
379 }
380
381 public boolean hasNext() {
382 return currentEntry != null;
383 }
384
385 public Long next() {
386 if (currentEntry == null) {
387 throw new NoSuchElementException();
388 }
389
390 if(lastReturned < currentEntry.first) {
391 lastReturned = currentEntry.first;
392 if (currentEntry.range() == 1) {
393 currentEntry = currentEntry.getNext();
394 }
395 } else {
396 lastReturned++;
397 if (lastReturned == currentEntry.last) {
398 currentEntry = currentEntry.getNext();
399 }
400 }
401
402 return lastReturned;
403 }
404
405 public void remove() {
406 throw new UnsupportedOperationException();
407 }
408
409 }
410
411 }