001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.io.IOException;
020 import java.util.HashSet;
021 import java.util.Set;
022 import java.util.concurrent.locks.Lock;
023 import java.util.concurrent.locks.ReentrantLock;
024
025 import org.apache.activemq.ActiveMQMessageAudit;
026 import org.apache.activemq.broker.ConnectionContext;
027 import org.apache.activemq.command.ActiveMQDestination;
028 import org.apache.activemq.command.Message;
029 import org.apache.activemq.command.MessageAck;
030 import org.apache.activemq.command.MessageId;
031 import org.apache.activemq.kaha.MapContainer;
032 import org.apache.activemq.kaha.MessageAckWithLocation;
033 import org.apache.activemq.kaha.StoreEntry;
034 import org.apache.activemq.store.AbstractMessageStore;
035 import org.apache.activemq.store.MessageRecoveryListener;
036 import org.apache.activemq.store.ReferenceStore;
037 import org.slf4j.Logger;
038 import org.slf4j.LoggerFactory;
039
040 /**
041 * @author rajdavies
042 *
043 */
044 public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
045
046 private static final Logger LOG = LoggerFactory.getLogger(KahaReferenceStore.class);
047 protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
048 protected KahaReferenceStoreAdapter adapter;
049 // keep track of dispatched messages so that duplicate sends that follow a successful
050 // dispatch can be suppressed.
051 protected ActiveMQMessageAudit dispatchAudit = new ActiveMQMessageAudit();
052 private StoreEntry batchEntry;
053 private String lastBatchId;
054 protected final Lock lock = new ReentrantLock();
055
056 public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
057 ActiveMQDestination destination) throws IOException {
058 super(destination);
059 this.adapter = adapter;
060 this.messageContainer = container;
061 }
062
063 public Lock getStoreLock() {
064 return lock;
065 }
066
067 public void dispose(ConnectionContext context) {
068 super.dispose(context);
069 this.messageContainer.delete();
070 this.adapter.removeReferenceStore(this);
071 }
072
073 protected MessageId getMessageId(Object object) {
074 return new MessageId(((ReferenceRecord)object).getMessageId());
075 }
076
077 public void addMessage(ConnectionContext context, Message message) throws IOException {
078 throw new RuntimeException("Use addMessageReference instead");
079 }
080
081 public Message getMessage(MessageId identity) throws IOException {
082 throw new RuntimeException("Use addMessageReference instead");
083 }
084
085 protected final boolean recoverReference(MessageRecoveryListener listener,
086 ReferenceRecord record) throws Exception {
087 MessageId id = new MessageId(record.getMessageId());
088 if (listener.hasSpace()) {
089 return listener.recoverMessageReference(id);
090 }
091 return false;
092 }
093
094 public void recover(MessageRecoveryListener listener) throws Exception {
095 lock.lock();
096 try {
097 for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
098 .getNext(entry)) {
099 ReferenceRecord record = messageContainer.getValue(entry);
100 if (!recoverReference(listener, record)) {
101 break;
102 }
103 }
104 }finally {
105 lock.unlock();
106 }
107 }
108
109 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
110 throws Exception {
111 lock.lock();
112 try {
113 StoreEntry entry = batchEntry;
114 if (entry == null) {
115 entry = messageContainer.getFirst();
116 } else {
117 entry = messageContainer.refresh(entry);
118 if (entry != null) {
119 entry = messageContainer.getNext(entry);
120 }
121 }
122 if (entry != null) {
123 int count = 0;
124 do {
125 ReferenceRecord msg = messageContainer.getValue(entry);
126 if (msg != null ) {
127 if (recoverReference(listener, msg)) {
128 count++;
129 lastBatchId = msg.getMessageId();
130 } else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) {
131 if (LOG.isDebugEnabled()) {
132 LOG.debug(destination.getQualifiedName() + " did not recover (will retry) message: " + msg.getMessageId());
133 }
134 // give usage limits a chance to reclaim
135 break;
136 } else {
137 // skip duplicate and continue
138 if (LOG.isDebugEnabled()) {
139 LOG.debug(destination.getQualifiedName() + " skipping duplicate, " + msg.getMessageId());
140 }
141 }
142 } else {
143 lastBatchId = null;
144 }
145 batchEntry = entry;
146 entry = messageContainer.getNext(entry);
147 } while (entry != null && count < maxReturned && listener.hasSpace());
148 }
149 }finally {
150 lock.unlock();
151 }
152 }
153
154 public boolean addMessageReference(ConnectionContext context, MessageId messageId,
155 ReferenceData data) throws IOException {
156
157 boolean uniqueueReferenceAdded = false;
158 lock.lock();
159 try {
160 if (!isDuplicate(messageId)) {
161 ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
162 messageContainer.put(messageId, record);
163 uniqueueReferenceAdded = true;
164 addInterest(record);
165 if (LOG.isDebugEnabled()) {
166 LOG.debug(destination.getPhysicalName() + " add: " + messageId);
167 }
168 }
169 } finally {
170 lock.unlock();
171 }
172 return uniqueueReferenceAdded;
173 }
174
175 protected boolean isDuplicate(final MessageId messageId) {
176 boolean duplicate = messageContainer.containsKey(messageId);
177 if (!duplicate) {
178 duplicate = dispatchAudit.isDuplicate(messageId);
179 if (duplicate) {
180 if (LOG.isDebugEnabled()) {
181 LOG.debug(destination.getPhysicalName()
182 + " ignoring duplicated (add) message reference, already dispatched: "
183 + messageId);
184 }
185 }
186 } else if (LOG.isDebugEnabled()) {
187 LOG.debug(destination.getPhysicalName()
188 + " ignoring duplicated (add) message reference, already in store: " + messageId);
189 }
190 return duplicate;
191 }
192
193 public ReferenceData getMessageReference(MessageId identity) throws IOException {
194 lock.lock();
195 try {
196 ReferenceRecord result = messageContainer.get(identity);
197 if (result == null) {
198 return null;
199 }
200 return result.getData();
201 }finally {
202 lock.unlock();
203 }
204 }
205
206 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
207 lock.lock();
208 try {
209 MessageId msgId = ack.getLastMessageId();
210 StoreEntry entry = messageContainer.getEntry(msgId);
211 if (entry != null) {
212 ReferenceRecord rr = messageContainer.remove(msgId);
213 if (rr != null) {
214 removeInterest(rr);
215 if (ack instanceof MessageAckWithLocation) {
216 recordAckFileReferences((MessageAckWithLocation)ack, rr.getData().getFileId());
217 }
218 dispatchAudit.isDuplicate(msgId);
219 if (LOG.isDebugEnabled()) {
220 LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId);
221 }
222 if (messageContainer.isEmpty()
223 || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
224 || (batchEntry != null && batchEntry.equals(entry))) {
225 resetBatching();
226 }
227 }
228 }
229 }finally {
230 lock.unlock();
231 }
232 }
233
234 private void recordAckFileReferences(MessageAckWithLocation ack, int messageFileId) {
235 adapter.recordAckFileReferences(ack.location.getDataFileId(), messageFileId);
236 }
237
238 public void removeAllMessages(ConnectionContext context) throws IOException {
239 lock.lock();
240 try {
241 Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
242 MessageAck ack = new MessageAck();
243 for (MessageId id:tmpSet) {
244 ack.setLastMessageId(id);
245 removeMessage(null, ack);
246 }
247 resetBatching();
248 messageContainer.clear();
249 }finally {
250 lock.unlock();
251 }
252 }
253
254 public void delete() {
255 lock.lock();
256 try {
257 messageContainer.clear();
258 }finally {
259 lock.unlock();
260 }
261 }
262
263 public void resetBatching() {
264 lock.lock();
265 try {
266 batchEntry = null;
267 lastBatchId = null;
268 }finally {
269 lock.unlock();
270 }
271 }
272
273 public int getMessageCount() {
274 return messageContainer.size();
275 }
276
277 public boolean isSupportForCursors() {
278 return true;
279 }
280
281 public boolean supportsExternalBatchControl() {
282 return true;
283 }
284
285 void removeInterest(ReferenceRecord rr) {
286 adapter.removeInterestInRecordFile(rr.getData().getFileId());
287 }
288
289 void addInterest(ReferenceRecord rr) {
290 adapter.addInterestInRecordFile(rr.getData().getFileId());
291 }
292
293 /**
294 * @param startAfter
295 * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
296 */
297 public void setBatch(MessageId startAfter) {
298 lock.lock();
299 try {
300 batchEntry = messageContainer.getEntry(startAfter);
301 if (LOG.isDebugEnabled()) {
302 LOG.debug("setBatch: " + startAfter);
303 }
304 } finally {
305 lock.unlock();
306 }
307 }
308 }