001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.journal;
018
019 import java.io.IOException;
020 import java.util.HashMap;
021 import java.util.Iterator;
022
023 import org.apache.activeio.journal.RecordLocation;
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.command.ActiveMQTopic;
026 import org.apache.activemq.command.JournalTopicAck;
027 import org.apache.activemq.command.Message;
028 import org.apache.activemq.command.MessageAck;
029 import org.apache.activemq.command.MessageId;
030 import org.apache.activemq.command.SubscriptionInfo;
031 import org.apache.activemq.store.MessageRecoveryListener;
032 import org.apache.activemq.store.TopicMessageStore;
033 import org.apache.activemq.transaction.Synchronization;
034 import org.apache.activemq.util.Callback;
035 import org.apache.activemq.util.SubscriptionKey;
036 import org.slf4j.Logger;
037 import org.slf4j.LoggerFactory;
038
039 /**
040 * A MessageStore that uses a Journal to store it's messages.
041 *
042 *
043 */
044 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
045
046 private static final Logger LOG = LoggerFactory.getLogger(JournalTopicMessageStore.class);
047
048 private TopicMessageStore longTermStore;
049 private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
050
051 public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore,
052 ActiveMQTopic destinationName) {
053 super(adapter, checkpointStore, destinationName);
054 this.longTermStore = checkpointStore;
055 }
056
057 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
058 throws Exception {
059 this.peristenceAdapter.checkpoint(true, true);
060 longTermStore.recoverSubscription(clientId, subscriptionName, listener);
061 }
062
063 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
064 MessageRecoveryListener listener) throws Exception {
065 this.peristenceAdapter.checkpoint(true, true);
066 longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
067
068 }
069
070 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
071 return longTermStore.lookupSubscription(clientId, subscriptionName);
072 }
073
074 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
075 this.peristenceAdapter.checkpoint(true, true);
076 longTermStore.addSubsciption(subscriptionInfo, retroactive);
077 }
078
079 public void addMessage(ConnectionContext context, Message message) throws IOException {
080 super.addMessage(context, message);
081 }
082
083 /**
084 */
085 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
086 final MessageId messageId, MessageAck originalAck) throws IOException {
087 final boolean debug = LOG.isDebugEnabled();
088
089 JournalTopicAck ack = new JournalTopicAck();
090 ack.setDestination(destination);
091 ack.setMessageId(messageId);
092 ack.setMessageSequenceId(messageId.getBrokerSequenceId());
093 ack.setSubscritionName(subscriptionName);
094 ack.setClientId(clientId);
095 ack.setTransactionId(context.getTransaction() != null
096 ? context.getTransaction().getTransactionId() : null);
097 final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
098
099 final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
100 if (!context.isInTransaction()) {
101 if (debug) {
102 LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
103 }
104 acknowledge(messageId, location, key);
105 } else {
106 if (debug) {
107 LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
108 }
109 synchronized (this) {
110 inFlightTxLocations.add(location);
111 }
112 transactionStore.acknowledge(this, ack, location);
113 context.getTransaction().addSynchronization(new Synchronization() {
114 public void afterCommit() throws Exception {
115 if (debug) {
116 LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
117 }
118 synchronized (JournalTopicMessageStore.this) {
119 inFlightTxLocations.remove(location);
120 acknowledge(messageId, location, key);
121 }
122 }
123
124 public void afterRollback() throws Exception {
125 if (debug) {
126 LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
127 }
128 synchronized (JournalTopicMessageStore.this) {
129 inFlightTxLocations.remove(location);
130 }
131 }
132 });
133 }
134
135 }
136
137 public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName,
138 MessageId messageId) {
139 try {
140 SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
141 if (sub != null) {
142 longTermStore.acknowledge(context, clientId, subscritionName, messageId, null);
143 }
144 } catch (Throwable e) {
145 LOG.debug("Could not replay acknowledge for message '" + messageId
146 + "'. Message may have already been acknowledged. reason: " + e);
147 }
148 }
149
150 /**
151 * @param messageId
152 * @param location
153 * @param key
154 */
155 protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
156 synchronized (this) {
157 lastLocation = location;
158 ackedLastAckLocations.put(key, messageId);
159 }
160 }
161
162 public RecordLocation checkpoint() throws IOException {
163
164 final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
165
166 // swap out the hash maps..
167 synchronized (this) {
168 cpAckedLastAckLocations = this.ackedLastAckLocations;
169 this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
170 }
171
172 return super.checkpoint(new Callback() {
173 public void execute() throws Exception {
174
175 // Checkpoint the acknowledged messages.
176 Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
177 while (iterator.hasNext()) {
178 SubscriptionKey subscriptionKey = iterator.next();
179 MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
180 longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
181 subscriptionKey.subscriptionName, identity, null);
182 }
183
184 }
185 });
186
187 }
188
189 /**
190 * @return Returns the longTermStore.
191 */
192 public TopicMessageStore getLongTermTopicMessageStore() {
193 return longTermStore;
194 }
195
196 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
197 longTermStore.deleteSubscription(clientId, subscriptionName);
198 }
199
200 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
201 return longTermStore.getAllSubscriptions();
202 }
203
204 public int getMessageCount(String clientId, String subscriberName) throws IOException {
205 this.peristenceAdapter.checkpoint(true, true);
206 return longTermStore.getMessageCount(clientId, subscriberName);
207 }
208
209 public void resetBatching(String clientId, String subscriptionName) {
210 longTermStore.resetBatching(clientId, subscriptionName);
211 }
212
213 }