001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.Map;
022 import java.util.Map.Entry;
023 import java.util.concurrent.ConcurrentHashMap;
024
025 import org.apache.activemq.broker.BrokerService;
026 import org.apache.activemq.broker.BrokerServiceAware;
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.command.Message;
029 import org.apache.activemq.command.MessageAck;
030 import org.apache.activemq.command.MessageId;
031 import org.apache.activemq.command.TransactionId;
032 import org.apache.activemq.command.XATransactionId;
033 import org.apache.activemq.kaha.RuntimeStoreException;
034 import org.apache.activemq.store.MessageStore;
035 import org.apache.activemq.store.ProxyMessageStore;
036 import org.apache.activemq.store.ProxyTopicMessageStore;
037 import org.apache.activemq.store.TopicMessageStore;
038 import org.apache.activemq.store.TransactionRecoveryListener;
039 import org.apache.activemq.store.TransactionStore;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042
043 /**
044 * Provides a TransactionStore implementation that can create transaction aware
045 * MessageStore objects from non transaction aware MessageStore objects.
046 *
047 *
048 */
049 public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
050 private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class);
051
052 private final Map transactions = new ConcurrentHashMap();
053 private final Map prepared;
054 private final KahaPersistenceAdapter adaptor;
055
056 private BrokerService brokerService;
057
058 KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
059 this.adaptor = adaptor;
060 this.prepared = preparedMap;
061 }
062
063 public MessageStore proxy(MessageStore messageStore) {
064 return new ProxyMessageStore(messageStore) {
065 @Override
066 public void addMessage(ConnectionContext context, final Message send) throws IOException {
067 KahaTransactionStore.this.addMessage(getDelegate(), send);
068 }
069
070 @Override
071 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
072 KahaTransactionStore.this.addMessage(getDelegate(), send);
073 }
074
075 @Override
076 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
077 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
078 }
079 };
080 }
081
082 public TopicMessageStore proxy(TopicMessageStore messageStore) {
083 return new ProxyTopicMessageStore(messageStore) {
084 @Override
085 public void addMessage(ConnectionContext context, final Message send) throws IOException {
086 KahaTransactionStore.this.addMessage(getDelegate(), send);
087 }
088
089 @Override
090 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
091 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
092 }
093
094 @Override
095 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
096 MessageId messageId, MessageAck ack) throws IOException {
097 KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, subscriptionName, messageId, ack);
098 }
099 };
100 }
101
102 /**
103 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
104 */
105 public void prepare(TransactionId txid) {
106 KahaTransaction tx = getTx(txid);
107 if (tx != null) {
108 tx.prepare();
109 prepared.put(txid, tx);
110 }
111 }
112
113 public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
114 if(before != null) {
115 before.run();
116 }
117 KahaTransaction tx = getTx(txid);
118 if (tx != null) {
119 tx.commit(this);
120 removeTx(txid);
121 }
122 if (after != null) {
123 after.run();
124 }
125 }
126
127 /**
128 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
129 */
130 public void rollback(TransactionId txid) {
131 KahaTransaction tx = getTx(txid);
132 if (tx != null) {
133 tx.rollback();
134 removeTx(txid);
135 }
136 }
137
138 public void start() throws Exception {
139 }
140
141 public void stop() throws Exception {
142 }
143
144 public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
145 for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
146 Map.Entry entry = (Entry)i.next();
147 XATransactionId xid = (XATransactionId)entry.getKey();
148 KahaTransaction kt = (KahaTransaction)entry.getValue();
149 listener.recover(xid, kt.getMessages(), kt.getAcks());
150 }
151 }
152
153 /**
154 * @param message
155 * @throws IOException
156 */
157 void addMessage(final MessageStore destination, final Message message) throws IOException {
158 try {
159 if (message.isInTransaction()) {
160 KahaTransaction tx = getOrCreateTx(message.getTransactionId());
161 tx.add((KahaMessageStore)destination, message);
162 } else {
163 destination.addMessage(null, message);
164 }
165 } catch (RuntimeStoreException rse) {
166 if (rse.getCause() instanceof IOException) {
167 brokerService.handleIOException((IOException)rse.getCause());
168 }
169 throw rse;
170 }
171 }
172
173 /**
174 * @param ack
175 * @throws IOException
176 */
177 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
178 try {
179 if (ack.isInTransaction()) {
180 KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
181 tx.add((KahaMessageStore)destination, ack);
182 } else {
183 destination.removeMessage(null, ack);
184 }
185 } catch (RuntimeStoreException rse) {
186 if (rse.getCause() instanceof IOException) {
187 brokerService.handleIOException((IOException)rse.getCause());
188 }
189 throw rse;
190 }
191 }
192
193 final void acknowledge(final TopicMessageStore destination, String clientId,
194 String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
195 try {
196 if (ack.isInTransaction()) {
197 KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
198 tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId, ack);
199 } else {
200 destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
201 }
202 } catch (RuntimeStoreException rse) {
203 if (rse.getCause() instanceof IOException) {
204 brokerService.handleIOException((IOException)rse.getCause());
205 }
206 throw rse;
207 }
208 }
209
210 protected synchronized KahaTransaction getTx(TransactionId key) {
211 KahaTransaction result = (KahaTransaction)transactions.get(key);
212 if (result == null) {
213 result = (KahaTransaction)prepared.get(key);
214 }
215 return result;
216 }
217
218 protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
219 KahaTransaction result = (KahaTransaction)transactions.get(key);
220 if (result == null) {
221 result = new KahaTransaction();
222 transactions.put(key, result);
223 }
224 return result;
225 }
226
227 protected synchronized void removeTx(TransactionId key) {
228 transactions.remove(key);
229 prepared.remove(key);
230 }
231
232 public void delete() {
233 transactions.clear();
234 prepared.clear();
235 }
236
237 protected MessageStore getStoreById(Object id) {
238 return adaptor.retrieveMessageStore(id);
239 }
240
241 public void setBrokerService(BrokerService brokerService) {
242 this.brokerService = brokerService;
243 }
244 }