001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.journal;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Collections;
022 import java.util.HashSet;
023 import java.util.Iterator;
024 import java.util.LinkedHashMap;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Set;
028
029 import org.apache.activeio.journal.RecordLocation;
030 import org.apache.activemq.broker.ConnectionContext;
031 import org.apache.activemq.command.ActiveMQDestination;
032 import org.apache.activemq.command.JournalQueueAck;
033 import org.apache.activemq.command.Message;
034 import org.apache.activemq.command.MessageAck;
035 import org.apache.activemq.command.MessageId;
036 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
037 import org.apache.activemq.store.MessageRecoveryListener;
038 import org.apache.activemq.store.MessageStore;
039 import org.apache.activemq.store.PersistenceAdapter;
040 import org.apache.activemq.store.AbstractMessageStore;
041 import org.apache.activemq.transaction.Synchronization;
042 import org.apache.activemq.usage.MemoryUsage;
043 import org.apache.activemq.usage.SystemUsage;
044 import org.apache.activemq.util.Callback;
045 import org.apache.activemq.util.TransactionTemplate;
046 import org.slf4j.Logger;
047 import org.slf4j.LoggerFactory;
048
049 /**
050 * A MessageStore that uses a Journal to store it's messages.
051 *
052 *
053 */
054 public class JournalMessageStore extends AbstractMessageStore {
055
056 private static final Logger LOG = LoggerFactory.getLogger(JournalMessageStore.class);
057
058 protected final JournalPersistenceAdapter peristenceAdapter;
059 protected final JournalTransactionStore transactionStore;
060 protected final MessageStore longTermStore;
061 protected final TransactionTemplate transactionTemplate;
062 protected RecordLocation lastLocation;
063 protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
064
065 private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>();
066 private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
067
068 /** A MessageStore that we can use to retrieve messages quickly. */
069 private Map<MessageId, Message> cpAddedMessageIds;
070
071
072 private MemoryUsage memoryUsage;
073
074 public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
075 super(destination);
076 this.peristenceAdapter = adapter;
077 this.transactionStore = adapter.getTransactionStore();
078 this.longTermStore = checkpointStore;
079 this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
080 }
081
082
083 public void setMemoryUsage(MemoryUsage memoryUsage) {
084 this.memoryUsage=memoryUsage;
085 longTermStore.setMemoryUsage(memoryUsage);
086 }
087
088 /**
089 * Not synchronized since the Journal has better throughput if you increase
090 * the number of concurrent writes that it is doing.
091 */
092 public void addMessage(ConnectionContext context, final Message message) throws IOException {
093
094 final MessageId id = message.getMessageId();
095
096 final boolean debug = LOG.isDebugEnabled();
097 message.incrementReferenceCount();
098
099 final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
100 if (!context.isInTransaction()) {
101 if (debug) {
102 LOG.debug("Journalled message add for: " + id + ", at: " + location);
103 }
104 addMessage(message, location);
105 } else {
106 if (debug) {
107 LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
108 }
109 synchronized (this) {
110 inFlightTxLocations.add(location);
111 }
112 transactionStore.addMessage(this, message, location);
113 context.getTransaction().addSynchronization(new Synchronization() {
114 public void afterCommit() throws Exception {
115 if (debug) {
116 LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
117 }
118 synchronized (JournalMessageStore.this) {
119 inFlightTxLocations.remove(location);
120 addMessage(message, location);
121 }
122 }
123
124 public void afterRollback() throws Exception {
125 if (debug) {
126 LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
127 }
128 synchronized (JournalMessageStore.this) {
129 inFlightTxLocations.remove(location);
130 }
131 message.decrementReferenceCount();
132 }
133 });
134 }
135 }
136
137 void addMessage(final Message message, final RecordLocation location) {
138 synchronized (this) {
139 lastLocation = location;
140 MessageId id = message.getMessageId();
141 messages.put(id, message);
142 }
143 }
144
145 public void replayAddMessage(ConnectionContext context, Message message) {
146 try {
147 // Only add the message if it has not already been added.
148 Message t = longTermStore.getMessage(message.getMessageId());
149 if (t == null) {
150 longTermStore.addMessage(context, message);
151 }
152 } catch (Throwable e) {
153 LOG.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
154 }
155 }
156
157 /**
158 */
159 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
160 final boolean debug = LOG.isDebugEnabled();
161 JournalQueueAck remove = new JournalQueueAck();
162 remove.setDestination(destination);
163 remove.setMessageAck(ack);
164
165 final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
166 if (!context.isInTransaction()) {
167 if (debug) {
168 LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
169 }
170 removeMessage(ack, location);
171 } else {
172 if (debug) {
173 LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
174 }
175 synchronized (this) {
176 inFlightTxLocations.add(location);
177 }
178 transactionStore.removeMessage(this, ack, location);
179 context.getTransaction().addSynchronization(new Synchronization() {
180 public void afterCommit() throws Exception {
181 if (debug) {
182 LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
183 }
184 synchronized (JournalMessageStore.this) {
185 inFlightTxLocations.remove(location);
186 removeMessage(ack, location);
187 }
188 }
189
190 public void afterRollback() throws Exception {
191 if (debug) {
192 LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
193 }
194 synchronized (JournalMessageStore.this) {
195 inFlightTxLocations.remove(location);
196 }
197 }
198 });
199
200 }
201 }
202
203 final void removeMessage(final MessageAck ack, final RecordLocation location) {
204 synchronized (this) {
205 lastLocation = location;
206 MessageId id = ack.getLastMessageId();
207 Message message = messages.remove(id);
208 if (message == null) {
209 messageAcks.add(ack);
210 } else {
211 message.decrementReferenceCount();
212 }
213 }
214 }
215
216 public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
217 try {
218 // Only remove the message if it has not already been removed.
219 Message t = longTermStore.getMessage(messageAck.getLastMessageId());
220 if (t != null) {
221 longTermStore.removeMessage(context, messageAck);
222 }
223 } catch (Throwable e) {
224 LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
225 }
226 }
227
228 /**
229 * @return
230 * @throws IOException
231 */
232 public RecordLocation checkpoint() throws IOException {
233 return checkpoint(null);
234 }
235
236 /**
237 * @return
238 * @throws IOException
239 */
240 @SuppressWarnings("unchecked")
241 public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
242
243 final List<MessageAck> cpRemovedMessageLocations;
244 final List<RecordLocation> cpActiveJournalLocations;
245 final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
246
247 // swap out the message hash maps..
248 synchronized (this) {
249 cpAddedMessageIds = this.messages;
250 cpRemovedMessageLocations = this.messageAcks;
251
252 cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations);
253
254 this.messages = new LinkedHashMap<MessageId, Message>();
255 this.messageAcks = new ArrayList<MessageAck>();
256 }
257
258 transactionTemplate.run(new Callback() {
259 public void execute() throws Exception {
260
261 int size = 0;
262
263 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
264 ConnectionContext context = transactionTemplate.getContext();
265
266 // Checkpoint the added messages.
267 synchronized (JournalMessageStore.this) {
268 Iterator<Message> iterator = cpAddedMessageIds.values().iterator();
269 while (iterator.hasNext()) {
270 Message message = iterator.next();
271 try {
272 longTermStore.addMessage(context, message);
273 } catch (Throwable e) {
274 LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
275 }
276 size += message.getSize();
277 message.decrementReferenceCount();
278 // Commit the batch if it's getting too big
279 if (size >= maxCheckpointMessageAddSize) {
280 persitanceAdapter.commitTransaction(context);
281 persitanceAdapter.beginTransaction(context);
282 size = 0;
283 }
284 }
285 }
286
287 persitanceAdapter.commitTransaction(context);
288 persitanceAdapter.beginTransaction(context);
289
290 // Checkpoint the removed messages.
291 Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator();
292 while (iterator.hasNext()) {
293 try {
294 MessageAck ack = iterator.next();
295 longTermStore.removeMessage(transactionTemplate.getContext(), ack);
296 } catch (Throwable e) {
297 LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e);
298 }
299 }
300
301 if (postCheckpointTest != null) {
302 postCheckpointTest.execute();
303 }
304 }
305
306 });
307
308 synchronized (this) {
309 cpAddedMessageIds = null;
310 }
311
312 if (cpActiveJournalLocations.size() > 0) {
313 Collections.sort(cpActiveJournalLocations);
314 return cpActiveJournalLocations.get(0);
315 }
316 synchronized (this) {
317 return lastLocation;
318 }
319 }
320
321 /**
322 *
323 */
324 public Message getMessage(MessageId identity) throws IOException {
325 Message answer = null;
326
327 synchronized (this) {
328 // Do we have a still have it in the journal?
329 answer = messages.get(identity);
330 if (answer == null && cpAddedMessageIds != null) {
331 answer = cpAddedMessageIds.get(identity);
332 }
333 }
334
335 if (answer != null) {
336 return answer;
337 }
338
339 // If all else fails try the long term message store.
340 return longTermStore.getMessage(identity);
341 }
342
343 /**
344 * Replays the checkpointStore first as those messages are the oldest ones,
345 * then messages are replayed from the transaction log and then the cache is
346 * updated.
347 *
348 * @param listener
349 * @throws Exception
350 */
351 public void recover(final MessageRecoveryListener listener) throws Exception {
352 peristenceAdapter.checkpoint(true, true);
353 longTermStore.recover(listener);
354 }
355
356 public void start() throws Exception {
357 if (this.memoryUsage != null) {
358 this.memoryUsage.addUsageListener(peristenceAdapter);
359 }
360 longTermStore.start();
361 }
362
363 public void stop() throws Exception {
364 longTermStore.stop();
365 if (this.memoryUsage != null) {
366 this.memoryUsage.removeUsageListener(peristenceAdapter);
367 }
368 }
369
370 /**
371 * @return Returns the longTermStore.
372 */
373 public MessageStore getLongTermMessageStore() {
374 return longTermStore;
375 }
376
377 /**
378 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
379 */
380 public void removeAllMessages(ConnectionContext context) throws IOException {
381 peristenceAdapter.checkpoint(true, true);
382 longTermStore.removeAllMessages(context);
383 }
384
385 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
386 throw new IOException("The journal does not support message references.");
387 }
388
389 public String getMessageReference(MessageId identity) throws IOException {
390 throw new IOException("The journal does not support message references.");
391 }
392
393 /**
394 * @return
395 * @throws IOException
396 * @see org.apache.activemq.store.MessageStore#getMessageCount()
397 */
398 public int getMessageCount() throws IOException {
399 peristenceAdapter.checkpoint(true, true);
400 return longTermStore.getMessageCount();
401 }
402
403 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
404 peristenceAdapter.checkpoint(true, true);
405 longTermStore.recoverNextMessages(maxReturned, listener);
406
407 }
408
409 public void resetBatching() {
410 longTermStore.resetBatching();
411
412 }
413
414 @Override
415 public void setBatch(MessageId messageId) throws Exception {
416 peristenceAdapter.checkpoint(true, true);
417 longTermStore.setBatch(messageId);
418 }
419
420 }