001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.util;
018
019 import java.io.Serializable;
020 import java.util.LinkedList;
021
022 /**
023 * Holder for many bitArrays - used for message audit
024 *
025 *
026 */
027 public class BitArrayBin implements Serializable {
028
029 private static final long serialVersionUID = 1L;
030 private LinkedList<BitArray> list;
031 private int maxNumberOfArrays;
032 private int firstIndex = -1;
033 private long lastInOrderBit=-1;
034
035 /**
036 * Create a BitArrayBin to a certain window size (number of messages to
037 * keep)
038 *
039 * @param windowSize
040 */
041 public BitArrayBin(int windowSize) {
042 maxNumberOfArrays = ((windowSize + 1) / BitArray.LONG_SIZE) + 1;
043 maxNumberOfArrays = Math.max(maxNumberOfArrays, 1);
044 list = new LinkedList<BitArray>();
045 for (int i = 0; i < maxNumberOfArrays; i++) {
046 list.add(null);
047 }
048 }
049
050 /**
051 * Set a bit
052 *
053 * @param index
054 * @param value
055 * @return true if set
056 */
057 public boolean setBit(long index, boolean value) {
058 boolean answer = false;
059 BitArray ba = getBitArray(index);
060 if (ba != null) {
061 int offset = getOffset(index);
062 if (offset >= 0) {
063 answer = ba.set(offset, value);
064 }
065 }
066 return answer;
067 }
068
069 /**
070 * Test if in order
071 * @param index
072 * @return true if next message is in order
073 */
074 public boolean isInOrder(long index) {
075 boolean result = false;
076 if (lastInOrderBit == -1) {
077 result = true;
078 } else {
079 result = lastInOrderBit + 1 == index;
080 }
081 lastInOrderBit = index;
082 return result;
083
084 }
085
086 /**
087 * Get the boolean value at the index
088 *
089 * @param index
090 * @return true/false
091 */
092 public boolean getBit(long index) {
093 boolean answer = index >= firstIndex;
094 BitArray ba = getBitArray(index);
095 if (ba != null) {
096 int offset = getOffset(index);
097 if (offset >= 0) {
098 answer = ba.get(offset);
099 return answer;
100 }
101 } else {
102 // gone passed range for previous bins so assume set
103 answer = true;
104 }
105 return answer;
106 }
107
108 /**
109 * Get the BitArray for the index
110 *
111 * @param index
112 * @return BitArray
113 */
114 private BitArray getBitArray(long index) {
115 int bin = getBin(index);
116 BitArray answer = null;
117 if (bin >= 0) {
118 if (bin >= maxNumberOfArrays) {
119 int overShoot = bin - maxNumberOfArrays + 1;
120 while (overShoot > 0) {
121 list.removeFirst();
122 firstIndex += BitArray.LONG_SIZE;
123 list.add(new BitArray());
124 overShoot--;
125 }
126
127 bin = maxNumberOfArrays - 1;
128 }
129 answer = list.get(bin);
130 if (answer == null) {
131 answer = new BitArray();
132 list.set(bin, answer);
133 }
134 }
135 return answer;
136 }
137
138 /**
139 * Get the index of the bin from the total index
140 *
141 * @param index
142 * @return the index of the bin
143 */
144 private int getBin(long index) {
145 int answer = 0;
146 if (firstIndex < 0) {
147 firstIndex = (int) (index - (index % BitArray.LONG_SIZE));
148 } else if (firstIndex >= 0) {
149 answer = (int)((index - firstIndex) / BitArray.LONG_SIZE);
150 }
151 return answer;
152 }
153
154 /**
155 * Get the offset into a bin from the total index
156 *
157 * @param index
158 * @return the relative offset into a bin
159 */
160 private int getOffset(long index) {
161 int answer = 0;
162 if (firstIndex >= 0) {
163 answer = (int)((index - firstIndex) - (BitArray.LONG_SIZE * getBin(index)));
164 }
165 return answer;
166 }
167
168 public long getLastSetIndex() {
169 long result = -1;
170
171 if (firstIndex >=0) {
172 result = firstIndex;
173 BitArray last = null;
174 for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) {
175 last = list.get(lastBitArrayIndex);
176 if (last != null) {
177 result += last.length() -1;
178 result += lastBitArrayIndex * BitArray.LONG_SIZE;
179 break;
180 }
181 }
182 }
183 return result;
184 }
185 }