001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.Serializable;
020
021 import javax.jms.JMSException;
022 import javax.jms.Message;
023
024 import org.apache.activemq.broker.region.MessageReference;
025 import org.apache.activemq.command.MessageId;
026 import org.apache.activemq.command.ProducerId;
027 import org.apache.activemq.util.BitArrayBin;
028 import org.apache.activemq.util.IdGenerator;
029 import org.apache.activemq.util.LRUCache;
030
031 /**
032 * Provides basic audit functions for Messages without sync
033 *
034 *
035 */
036 public class ActiveMQMessageAuditNoSync implements Serializable {
037
038 private static final long serialVersionUID = 1L;
039
040 public static final int DEFAULT_WINDOW_SIZE = 2048;
041 public static final int MAXIMUM_PRODUCER_COUNT = 64;
042 private int auditDepth;
043 private int maximumNumberOfProducersToTrack;
044 private LRUCache<Object, BitArrayBin> map;
045
046 /**
047 * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
048 * 64
049 */
050 public ActiveMQMessageAuditNoSync() {
051 this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
052 }
053
054 /**
055 * Construct a MessageAudit
056 *
057 * @param auditDepth range of ids to track
058 * @param maximumNumberOfProducersToTrack number of producers expected in
059 * the system
060 */
061 public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
062 this.auditDepth = auditDepth;
063 this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
064 this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
065 }
066
067 /**
068 * @return the auditDepth
069 */
070 public int getAuditDepth() {
071 return auditDepth;
072 }
073
074 /**
075 * @param auditDepth the auditDepth to set
076 */
077 public void setAuditDepth(int auditDepth) {
078 this.auditDepth = auditDepth;
079 }
080
081 /**
082 * @return the maximumNumberOfProducersToTrack
083 */
084 public int getMaximumNumberOfProducersToTrack() {
085 return maximumNumberOfProducersToTrack;
086 }
087
088 /**
089 * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
090 */
091 public void setMaximumNumberOfProducersToTrack(
092 int maximumNumberOfProducersToTrack) {
093 this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
094 this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
095 }
096
097 /**
098 * Checks if this message has been seen before
099 *
100 * @param message
101 * @return true if the message is a duplicate
102 * @throws JMSException
103 */
104 public boolean isDuplicate(Message message) throws JMSException {
105 return isDuplicate(message.getJMSMessageID());
106 }
107
108 /**
109 * checks whether this messageId has been seen before and adds this
110 * messageId to the list
111 *
112 * @param id
113 * @return true if the message is a duplicate
114 */
115 public boolean isDuplicate(String id) {
116 boolean answer = false;
117 String seed = IdGenerator.getSeedFromId(id);
118 if (seed != null) {
119 BitArrayBin bab = map.get(seed);
120 if (bab == null) {
121 bab = new BitArrayBin(auditDepth);
122 map.put(seed, bab);
123 }
124 long index = IdGenerator.getSequenceFromId(id);
125 if (index >= 0) {
126 answer = bab.setBit(index, true);
127 }
128 }
129 return answer;
130 }
131
132 /**
133 * Checks if this message has been seen before
134 *
135 * @param message
136 * @return true if the message is a duplicate
137 */
138 public boolean isDuplicate(final MessageReference message) {
139 MessageId id = message.getMessageId();
140 return isDuplicate(id);
141 }
142
143 /**
144 * Checks if this messageId has been seen before
145 *
146 * @param id
147 * @return true if the message is a duplicate
148 */
149 public boolean isDuplicate(final MessageId id) {
150 boolean answer = false;
151
152 if (id != null) {
153 ProducerId pid = id.getProducerId();
154 if (pid != null) {
155 BitArrayBin bab = map.get(pid);
156 if (bab == null) {
157 bab = new BitArrayBin(auditDepth);
158 map.put(pid, bab);
159 }
160 answer = bab.setBit(id.getProducerSequenceId(), true);
161 }
162 }
163 return answer;
164 }
165
166 /**
167 * mark this message as being received
168 *
169 * @param message
170 */
171 public void rollback(final MessageReference message) {
172 MessageId id = message.getMessageId();
173 rollback(id);
174 }
175
176 /**
177 * mark this message as being received
178 *
179 * @param id
180 */
181 public void rollback(final MessageId id) {
182 if (id != null) {
183 ProducerId pid = id.getProducerId();
184 if (pid != null) {
185 BitArrayBin bab = map.get(pid);
186 if (bab != null) {
187 bab.setBit(id.getProducerSequenceId(), false);
188 }
189 }
190 }
191 }
192
193 public void rollback(final String id) {
194 String seed = IdGenerator.getSeedFromId(id);
195 if (seed != null) {
196 BitArrayBin bab = map.get(seed);
197 if (bab != null) {
198 long index = IdGenerator.getSequenceFromId(id);
199 bab.setBit(index, false);
200 }
201 }
202 }
203
204 /**
205 * Check the message is in order
206 * @param msg
207 * @return
208 * @throws JMSException
209 */
210 public boolean isInOrder(Message msg) throws JMSException {
211 return isInOrder(msg.getJMSMessageID());
212 }
213
214 /**
215 * Check the message id is in order
216 * @param id
217 * @return
218 */
219 public boolean isInOrder(final String id) {
220 boolean answer = true;
221
222 if (id != null) {
223 String seed = IdGenerator.getSeedFromId(id);
224 if (seed != null) {
225 BitArrayBin bab = map.get(seed);
226 if (bab != null) {
227 long index = IdGenerator.getSequenceFromId(id);
228 answer = bab.isInOrder(index);
229 }
230
231 }
232 }
233 return answer;
234 }
235
236 /**
237 * Check the MessageId is in order
238 * @param message
239 * @return
240 */
241 public boolean isInOrder(final MessageReference message) {
242 return isInOrder(message.getMessageId());
243 }
244
245 /**
246 * Check the MessageId is in order
247 * @param id
248 * @return
249 */
250 public boolean isInOrder(final MessageId id) {
251 boolean answer = false;
252
253 if (id != null) {
254 ProducerId pid = id.getProducerId();
255 if (pid != null) {
256 BitArrayBin bab = map.get(pid);
257 if (bab == null) {
258 bab = new BitArrayBin(auditDepth);
259 map.put(pid, bab);
260 }
261 answer = bab.isInOrder(id.getProducerSequenceId());
262
263 }
264 }
265 return answer;
266 }
267
268 public long getLastSeqId(ProducerId id) {
269 long result = -1;
270 BitArrayBin bab = map.get(id.toString());
271 if (bab != null) {
272 result = bab.getLastSetIndex();
273 }
274 return result;
275 }
276
277 public void clear() {
278 map.clear();
279 }
280 }