001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.memory;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.concurrent.ConcurrentHashMap;
023 import java.util.concurrent.Future;
024
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.command.Message;
027 import org.apache.activemq.command.MessageAck;
028 import org.apache.activemq.command.MessageId;
029 import org.apache.activemq.command.TransactionId;
030 import org.apache.activemq.command.XATransactionId;
031 import org.apache.activemq.store.AbstractMessageStore;
032 import org.apache.activemq.store.MessageStore;
033 import org.apache.activemq.store.PersistenceAdapter;
034 import org.apache.activemq.store.ProxyMessageStore;
035 import org.apache.activemq.store.ProxyTopicMessageStore;
036 import org.apache.activemq.store.TopicMessageStore;
037 import org.apache.activemq.store.TransactionRecoveryListener;
038 import org.apache.activemq.store.TransactionStore;
039
040 /**
041 * Provides a TransactionStore implementation that can create transaction aware
042 * MessageStore objects from non transaction aware MessageStore objects.
043 *
044 *
045 */
046 public class MemoryTransactionStore implements TransactionStore {
047
048 ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
049 ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
050 final PersistenceAdapter persistenceAdapter;
051
052 private boolean doingRecover;
053
054 public class Tx {
055 private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
056
057 private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
058
059 public void add(AddMessageCommand msg) {
060 messages.add(msg);
061 }
062
063 public void add(RemoveMessageCommand ack) {
064 acks.add(ack);
065 }
066
067 public Message[] getMessages() {
068 Message rc[] = new Message[messages.size()];
069 int count = 0;
070 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
071 AddMessageCommand cmd = iter.next();
072 rc[count++] = cmd.getMessage();
073 }
074 return rc;
075 }
076
077 public MessageAck[] getAcks() {
078 MessageAck rc[] = new MessageAck[acks.size()];
079 int count = 0;
080 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
081 RemoveMessageCommand cmd = iter.next();
082 rc[count++] = cmd.getMessageAck();
083 }
084 return rc;
085 }
086
087 /**
088 * @throws IOException
089 */
090 public void commit() throws IOException {
091 ConnectionContext ctx = new ConnectionContext();
092 persistenceAdapter.beginTransaction(ctx);
093 try {
094
095 // Do all the message adds.
096 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
097 AddMessageCommand cmd = iter.next();
098 cmd.run(ctx);
099 }
100 // And removes..
101 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
102 RemoveMessageCommand cmd = iter.next();
103 cmd.run(ctx);
104 }
105
106 } catch ( IOException e ) {
107 persistenceAdapter.rollbackTransaction(ctx);
108 throw e;
109 }
110 persistenceAdapter.commitTransaction(ctx);
111 }
112 }
113
114 public interface AddMessageCommand {
115 Message getMessage();
116
117 void run(ConnectionContext context) throws IOException;
118 }
119
120 public interface RemoveMessageCommand {
121 MessageAck getMessageAck();
122
123 void run(ConnectionContext context) throws IOException;
124 }
125
126 public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
127 this.persistenceAdapter=persistenceAdapter;
128 }
129
130 public MessageStore proxy(MessageStore messageStore) {
131 return new ProxyMessageStore(messageStore) {
132 @Override
133 public void addMessage(ConnectionContext context, final Message send) throws IOException {
134 MemoryTransactionStore.this.addMessage(getDelegate(), send);
135 }
136
137 @Override
138 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
139 MemoryTransactionStore.this.addMessage(getDelegate(), send);
140 }
141
142 @Override
143 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
144 MemoryTransactionStore.this.addMessage(getDelegate(), message);
145 return AbstractMessageStore.FUTURE;
146 }
147
148 @Override
149 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
150 MemoryTransactionStore.this.addMessage(getDelegate(), message);
151 return AbstractMessageStore.FUTURE;
152 }
153
154 @Override
155 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
156 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
157 }
158
159 @Override
160 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
161 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
162 }
163 };
164 }
165
166 public TopicMessageStore proxy(TopicMessageStore messageStore) {
167 return new ProxyTopicMessageStore(messageStore) {
168 @Override
169 public void addMessage(ConnectionContext context, final Message send) throws IOException {
170 MemoryTransactionStore.this.addMessage(getDelegate(), send);
171 }
172
173 @Override
174 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
175 MemoryTransactionStore.this.addMessage(getDelegate(), send);
176 }
177
178 @Override
179 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
180 MemoryTransactionStore.this.addMessage(getDelegate(), message);
181 return AbstractMessageStore.FUTURE;
182 }
183
184 @Override
185 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
186 MemoryTransactionStore.this.addMessage(getDelegate(), message);
187 return AbstractMessageStore.FUTURE;
188 }
189
190 @Override
191 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
192 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
193 }
194
195 @Override
196 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
197 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
198 }
199
200 @Override
201 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
202 MessageId messageId, MessageAck ack) throws IOException {
203 MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
204 subscriptionName, messageId, ack);
205 }
206 };
207 }
208
209 /**
210 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
211 */
212 public void prepare(TransactionId txid) {
213 Tx tx = inflightTransactions.remove(txid);
214 if (tx == null) {
215 return;
216 }
217 preparedTransactions.put(txid, tx);
218 }
219
220 public Tx getTx(Object txid) {
221 Tx tx = inflightTransactions.get(txid);
222 if (tx == null) {
223 tx = new Tx();
224 inflightTransactions.put(txid, tx);
225 }
226 return tx;
227 }
228
229 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
230 if (preCommit != null) {
231 preCommit.run();
232 }
233 Tx tx;
234 if (wasPrepared) {
235 tx = preparedTransactions.remove(txid);
236 } else {
237 tx = inflightTransactions.remove(txid);
238 }
239
240 if (tx == null) {
241 if (postCommit != null) {
242 postCommit.run();
243 }
244 return;
245 }
246 // ensure message order w.r.t to cursor and store for setBatch()
247 synchronized (this) {
248 tx.commit();
249 if (postCommit != null) {
250 postCommit.run();
251 }
252 }
253 }
254
255 /**
256 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
257 */
258 public void rollback(TransactionId txid) {
259 preparedTransactions.remove(txid);
260 inflightTransactions.remove(txid);
261 }
262
263 public void start() throws Exception {
264 }
265
266 public void stop() throws Exception {
267 }
268
269 public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
270 // All the inflight transactions get rolled back..
271 inflightTransactions.clear();
272 this.doingRecover = true;
273 try {
274 for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
275 Object txid = iter.next();
276 Tx tx = preparedTransactions.get(txid);
277 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
278 }
279 } finally {
280 this.doingRecover = false;
281 }
282 }
283
284 /**
285 * @param message
286 * @throws IOException
287 */
288 void addMessage(final MessageStore destination, final Message message) throws IOException {
289
290 if (doingRecover) {
291 return;
292 }
293
294 if (message.getTransactionId() != null) {
295 Tx tx = getTx(message.getTransactionId());
296 tx.add(new AddMessageCommand() {
297 public Message getMessage() {
298 return message;
299 }
300
301 public void run(ConnectionContext ctx) throws IOException {
302 destination.addMessage(ctx, message);
303 }
304
305 });
306 } else {
307 destination.addMessage(null, message);
308 }
309 }
310
311 /**
312 * @param ack
313 * @throws IOException
314 */
315 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
316 if (doingRecover) {
317 return;
318 }
319
320 if (ack.isInTransaction()) {
321 Tx tx = getTx(ack.getTransactionId());
322 tx.add(new RemoveMessageCommand() {
323 public MessageAck getMessageAck() {
324 return ack;
325 }
326
327 public void run(ConnectionContext ctx) throws IOException {
328 destination.removeMessage(ctx, ack);
329 }
330 });
331 } else {
332 destination.removeMessage(null, ack);
333 }
334 }
335
336 final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
337 final MessageId messageId, final MessageAck ack) throws IOException {
338 if (doingRecover) {
339 return;
340 }
341
342 if (ack.isInTransaction()) {
343 Tx tx = getTx(ack.getTransactionId());
344 tx.add(new RemoveMessageCommand() {
345 public MessageAck getMessageAck() {
346 return ack;
347 }
348
349 public void run(ConnectionContext ctx) throws IOException {
350 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
351 }
352 });
353 } else {
354 destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
355 }
356 }
357
358
359 public void delete() {
360 inflightTransactions.clear();
361 preparedTransactions.clear();
362 doingRecover = false;
363 }
364
365 }