001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.amq;
018
019 import java.io.IOException;
020 import java.io.InterruptedIOException;
021 import java.util.ArrayList;
022 import java.util.Collections;
023 import java.util.HashSet;
024 import java.util.Iterator;
025 import java.util.LinkedHashMap;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Set;
029 import java.util.Map.Entry;
030 import java.util.concurrent.CountDownLatch;
031 import java.util.concurrent.atomic.AtomicReference;
032 import java.util.concurrent.locks.Lock;
033 import org.apache.activemq.broker.ConnectionContext;
034 import org.apache.activemq.command.ActiveMQDestination;
035 import org.apache.activemq.command.DataStructure;
036 import org.apache.activemq.command.JournalQueueAck;
037 import org.apache.activemq.command.Message;
038 import org.apache.activemq.command.MessageAck;
039 import org.apache.activemq.command.MessageId;
040 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
041 import org.apache.activemq.kaha.MessageAckWithLocation;
042 import org.apache.activemq.kaha.impl.async.Location;
043 import org.apache.activemq.store.AbstractMessageStore;
044 import org.apache.activemq.store.MessageRecoveryListener;
045 import org.apache.activemq.store.PersistenceAdapter;
046 import org.apache.activemq.store.ReferenceStore;
047 import org.apache.activemq.store.ReferenceStore.ReferenceData;
048 import org.apache.activemq.thread.Task;
049 import org.apache.activemq.thread.TaskRunner;
050 import org.apache.activemq.transaction.Synchronization;
051 import org.apache.activemq.usage.MemoryUsage;
052 import org.apache.activemq.util.Callback;
053 import org.apache.activemq.util.TransactionTemplate;
054 import org.slf4j.Logger;
055 import org.slf4j.LoggerFactory;
056
057 /**
058 * A MessageStore that uses a Journal to store it's messages.
059 *
060 *
061 */
062 public class AMQMessageStore extends AbstractMessageStore {
063 private static final Logger LOG = LoggerFactory.getLogger(AMQMessageStore.class);
064 protected final AMQPersistenceAdapter peristenceAdapter;
065 protected final AMQTransactionStore transactionStore;
066 protected final ReferenceStore referenceStore;
067 protected final TransactionTemplate transactionTemplate;
068 protected Location lastLocation;
069 protected Location lastWrittenLocation;
070 protected Set<Location> inFlightTxLocations = new HashSet<Location>();
071 protected final TaskRunner asyncWriteTask;
072 protected CountDownLatch flushLatch;
073 private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
074 private List<MessageAckWithLocation> messageAcks = new ArrayList<MessageAckWithLocation>();
075 /** A MessageStore that we can use to retrieve messages quickly. */
076 private Map<MessageId, ReferenceData> cpAddedMessageIds;
077 private final boolean debug = LOG.isDebugEnabled();
078 private final AtomicReference<Location> mark = new AtomicReference<Location>();
079 protected final Lock lock;
080
081 public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
082 super(destination);
083 this.peristenceAdapter = adapter;
084 this.lock = referenceStore.getStoreLock();
085 this.transactionStore = adapter.getTransactionStore();
086 this.referenceStore = referenceStore;
087 this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(
088 new NonCachedMessageEvaluationContext()));
089 asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
090 public boolean iterate() {
091 asyncWrite();
092 return false;
093 }
094 }, "Checkpoint: " + destination);
095 }
096
097 public void setMemoryUsage(MemoryUsage memoryUsage) {
098 referenceStore.setMemoryUsage(memoryUsage);
099 }
100
101 /**
102 * Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it
103 * is doing.
104 */
105 public final void addMessage(ConnectionContext context, final Message message) throws IOException {
106 final MessageId id = message.getMessageId();
107 final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
108 if (!context.isInTransaction()) {
109 if (debug) {
110 LOG.debug("Journalled message add for: " + id + ", at: " + location);
111 }
112 this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
113 addMessage(message, location);
114 } else {
115 if (debug) {
116 LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
117 }
118 lock.lock();
119 try {
120 inFlightTxLocations.add(location);
121 } finally {
122 lock.unlock();
123 }
124 transactionStore.addMessage(this, message, location);
125 context.getTransaction().addSynchronization(new Synchronization() {
126 public void afterCommit() throws Exception {
127 if (debug) {
128 LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
129 }
130 lock.lock();
131 try {
132 inFlightTxLocations.remove(location);
133 } finally {
134 lock.unlock();
135 }
136 addMessage(message, location);
137 }
138
139 public void afterRollback() throws Exception {
140 if (debug) {
141 LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
142 }
143 lock.lock();
144 try {
145 inFlightTxLocations.remove(location);
146 } finally {
147 lock.unlock();
148 }
149 }
150 });
151 }
152 }
153
154 final void addMessage(final Message message, final Location location) throws InterruptedIOException {
155 ReferenceData data = new ReferenceData();
156 data.setExpiration(message.getExpiration());
157 data.setFileId(location.getDataFileId());
158 data.setOffset(location.getOffset());
159 lock.lock();
160 try {
161 lastLocation = location;
162 ReferenceData prev = messages.put(message.getMessageId(), data);
163 if (prev != null) {
164 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, prev.getFileId());
165 }
166 } finally {
167 lock.unlock();
168 }
169 if (messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
170 flush();
171 } else {
172 try {
173 asyncWriteTask.wakeup();
174 } catch (InterruptedException e) {
175 throw new InterruptedIOException();
176 }
177 }
178 }
179
180 public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
181 MessageId id = message.getMessageId();
182 try {
183 // Only add the message if it has not already been added.
184 ReferenceData data = referenceStore.getMessageReference(id);
185 if (data == null) {
186 data = new ReferenceData();
187 data.setExpiration(message.getExpiration());
188 data.setFileId(location.getDataFileId());
189 data.setOffset(location.getOffset());
190 referenceStore.addMessageReference(context, id, data);
191 return true;
192 }
193 } catch (Throwable e) {
194 LOG.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: "
195 + e, e);
196 }
197 return false;
198 }
199
200 /**
201 */
202 public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException {
203 JournalQueueAck remove = new JournalQueueAck();
204 remove.setDestination(destination);
205 remove.setMessageAck(ack);
206 final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
207 if (!context.isInTransaction()) {
208 if (debug) {
209 LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
210 }
211 removeMessage(ack, location);
212 } else {
213 if (debug) {
214 LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
215 }
216 lock.lock();
217 try {
218 inFlightTxLocations.add(location);
219 } finally {
220 lock.unlock();
221 }
222 transactionStore.removeMessage(this, ack, location);
223 context.getTransaction().addSynchronization(new Synchronization() {
224 public void afterCommit() throws Exception {
225 if (debug) {
226 LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: "
227 + location);
228 }
229 lock.lock();
230 try {
231 inFlightTxLocations.remove(location);
232 } finally {
233 lock.unlock();
234 }
235 removeMessage(ack, location);
236 }
237
238 public void afterRollback() throws Exception {
239 if (debug) {
240 LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: "
241 + location);
242 }
243 lock.lock();
244 try {
245 inFlightTxLocations.remove(location);
246 } finally {
247 lock.unlock();
248 }
249 }
250 });
251 }
252 }
253
254 final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
255 ReferenceData data;
256 lock.lock();
257 try {
258 lastLocation = location;
259 MessageId id = ack.getLastMessageId();
260 data = messages.remove(id);
261 if (data == null) {
262 messageAcks.add(new MessageAckWithLocation(ack, location));
263 } else {
264 // message never got written so datafileReference will still exist
265 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
266 }
267 } finally {
268 lock.unlock();
269 }
270 if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
271 flush();
272 } else if (data == null) {
273 try {
274 asyncWriteTask.wakeup();
275 } catch (InterruptedException e) {
276 throw new InterruptedIOException();
277 }
278 }
279 }
280
281 public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
282 try {
283 // Only remove the message if it has not already been removed.
284 ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
285 if (t != null) {
286 referenceStore.removeMessage(context, messageAck);
287 return true;
288 }
289 } catch (Throwable e) {
290 LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
291 + "'. Message may have already been acknowledged. reason: " + e);
292 }
293 return false;
294 }
295
296 /**
297 * Waits till the lastest data has landed on the referenceStore
298 *
299 * @throws InterruptedIOException
300 */
301 public void flush() throws InterruptedIOException {
302 if (LOG.isDebugEnabled()) {
303 LOG.debug("flush starting ...");
304 }
305 CountDownLatch countDown;
306 lock.lock();
307 try {
308 if (lastWrittenLocation == lastLocation) {
309 return;
310 }
311 if (flushLatch == null) {
312 flushLatch = new CountDownLatch(1);
313 }
314 countDown = flushLatch;
315 } finally {
316 lock.unlock();
317 }
318 try {
319 asyncWriteTask.wakeup();
320 countDown.await();
321 } catch (InterruptedException e) {
322 throw new InterruptedIOException();
323 }
324 if (LOG.isDebugEnabled()) {
325 LOG.debug("flush finished");
326 }
327 }
328
329 /**
330 * @return
331 * @throws IOException
332 */
333 synchronized void asyncWrite() {
334 try {
335 CountDownLatch countDown;
336 lock.lock();
337 try {
338 countDown = flushLatch;
339 flushLatch = null;
340 } finally {
341 lock.unlock();
342 }
343 mark.set(doAsyncWrite());
344 if (countDown != null) {
345 countDown.countDown();
346 }
347 } catch (IOException e) {
348 LOG.error("Checkpoint failed: " + e, e);
349 }
350 }
351
352 /**
353 * @return
354 * @throws IOException
355 */
356 protected Location doAsyncWrite() throws IOException {
357 final List<MessageAckWithLocation> cpRemovedMessageLocations;
358 final List<Location> cpActiveJournalLocations;
359 final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
360 final Location lastLocation;
361 // swap out the message hash maps..
362 lock.lock();
363 try {
364 cpAddedMessageIds = this.messages;
365 cpRemovedMessageLocations = this.messageAcks;
366 cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
367 this.messages = new LinkedHashMap<MessageId, ReferenceData>();
368 this.messageAcks = new ArrayList<MessageAckWithLocation>();
369 lastLocation = this.lastLocation;
370 } finally {
371 lock.unlock();
372 }
373 if (LOG.isDebugEnabled()) {
374 LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: "
375 + cpRemovedMessageLocations.size() + " ");
376 }
377 transactionTemplate.run(new Callback() {
378 public void execute() throws Exception {
379 int size = 0;
380 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
381 ConnectionContext context = transactionTemplate.getContext();
382 // Checkpoint the added messages.
383 Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
384 while (iterator.hasNext()) {
385 Entry<MessageId, ReferenceData> entry = iterator.next();
386 try {
387 if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue())) {
388 if (LOG.isDebugEnabled()) {
389 LOG.debug("adding message ref:" + entry.getKey());
390 }
391 size++;
392 } else {
393 if (LOG.isDebugEnabled()) {
394 LOG.debug("not adding duplicate reference: " + entry.getKey() + ", " + entry.getValue());
395 }
396 }
397 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry
398 .getValue().getFileId());
399 } catch (Throwable e) {
400 LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
401 }
402
403 // Commit the batch if it's getting too big
404 if (size >= maxCheckpointMessageAddSize) {
405 persitanceAdapter.commitTransaction(context);
406 persitanceAdapter.beginTransaction(context);
407 size = 0;
408 }
409 }
410 persitanceAdapter.commitTransaction(context);
411 persitanceAdapter.beginTransaction(context);
412 // Checkpoint the removed messages.
413 for (MessageAckWithLocation ack : cpRemovedMessageLocations) {
414 try {
415 referenceStore.removeMessage(transactionTemplate.getContext(), ack);
416 } catch (Throwable e) {
417 LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e);
418 }
419 }
420 }
421 });
422 LOG.debug("Batch update done. lastLocation:" + lastLocation);
423 lock.lock();
424 try {
425 cpAddedMessageIds = null;
426 lastWrittenLocation = lastLocation;
427 } finally {
428 lock.unlock();
429 }
430 if (cpActiveJournalLocations.size() > 0) {
431 Collections.sort(cpActiveJournalLocations);
432 return cpActiveJournalLocations.get(0);
433 } else {
434 return lastLocation;
435 }
436 }
437
438 /**
439 *
440 */
441 public Message getMessage(MessageId identity) throws IOException {
442 Location location = getLocation(identity);
443 if (location != null) {
444 DataStructure rc = peristenceAdapter.readCommand(location);
445 try {
446 return (Message) rc;
447 } catch (ClassCastException e) {
448 throw new IOException("Could not read message " + identity + " at location " + location
449 + ", expected a message, but got: " + rc);
450 }
451 }
452 return null;
453 }
454
455 protected Location getLocation(MessageId messageId) throws IOException {
456 ReferenceData data = null;
457 lock.lock();
458 try {
459 // Is it still in flight???
460 data = messages.get(messageId);
461 if (data == null && cpAddedMessageIds != null) {
462 data = cpAddedMessageIds.get(messageId);
463 }
464 } finally {
465 lock.unlock();
466 }
467 if (data == null) {
468 data = referenceStore.getMessageReference(messageId);
469 if (data == null) {
470 return null;
471 }
472 }
473 Location location = new Location();
474 location.setDataFileId(data.getFileId());
475 location.setOffset(data.getOffset());
476 return location;
477 }
478
479 /**
480 * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
481 * transaction log and then the cache is updated.
482 *
483 * @param listener
484 * @throws Exception
485 */
486 public void recover(final MessageRecoveryListener listener) throws Exception {
487 flush();
488 referenceStore.recover(new RecoveryListenerAdapter(this, listener));
489 }
490
491 public void start() throws Exception {
492 referenceStore.start();
493 }
494
495 public void stop() throws Exception {
496 flush();
497 asyncWriteTask.shutdown();
498 referenceStore.stop();
499 }
500
501 /**
502 * @return Returns the longTermStore.
503 */
504 public ReferenceStore getReferenceStore() {
505 return referenceStore;
506 }
507
508 /**
509 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
510 */
511 public void removeAllMessages(ConnectionContext context) throws IOException {
512 flush();
513 referenceStore.removeAllMessages(context);
514 }
515
516 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
517 String messageRef) throws IOException {
518 throw new IOException("The journal does not support message references.");
519 }
520
521 public String getMessageReference(MessageId identity) throws IOException {
522 throw new IOException("The journal does not support message references.");
523 }
524
525 /**
526 * @return
527 * @throws IOException
528 * @see org.apache.activemq.store.MessageStore#getMessageCount()
529 */
530 public int getMessageCount() throws IOException {
531 flush();
532 return referenceStore.getMessageCount();
533 }
534
535 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
536 RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
537 referenceStore.recoverNextMessages(maxReturned, recoveryListener);
538 if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
539 flush();
540 referenceStore.recoverNextMessages(maxReturned, recoveryListener);
541 }
542 }
543
544 Message getMessage(ReferenceData data) throws IOException {
545 Location location = new Location();
546 location.setDataFileId(data.getFileId());
547 location.setOffset(data.getOffset());
548 DataStructure rc = peristenceAdapter.readCommand(location);
549 try {
550 return (Message) rc;
551 } catch (ClassCastException e) {
552 throw new IOException("Could not read message at location " + location + ", expected a message, but got: "
553 + rc);
554 }
555 }
556
557 public void resetBatching() {
558 referenceStore.resetBatching();
559 }
560
561 public Location getMark() {
562 return mark.get();
563 }
564
565 public void dispose(ConnectionContext context) {
566 try {
567 flush();
568 } catch (InterruptedIOException e) {
569 Thread.currentThread().interrupt();
570 }
571 referenceStore.dispose(context);
572 super.dispose(context);
573 }
574
575 public void setBatch(MessageId messageId) {
576 try {
577 flush();
578 } catch (InterruptedIOException e) {
579 LOG.debug("flush on setBatch resulted in exception", e);
580 }
581 getReferenceStore().setBatch(messageId);
582 }
583
584 }