001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.SQLException;
021 import java.util.concurrent.atomic.AtomicLong;
022
023 import org.apache.activemq.ActiveMQMessageAudit;
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.command.ActiveMQDestination;
026 import org.apache.activemq.command.Message;
027 import org.apache.activemq.command.MessageAck;
028 import org.apache.activemq.command.MessageId;
029 import org.apache.activemq.store.AbstractMessageStore;
030 import org.apache.activemq.store.MessageRecoveryListener;
031 import org.apache.activemq.util.ByteSequence;
032 import org.apache.activemq.util.ByteSequenceData;
033 import org.apache.activemq.util.IOExceptionSupport;
034 import org.apache.activemq.wireformat.WireFormat;
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 /**
039 *
040 */
041 public class JDBCMessageStore extends AbstractMessageStore {
042
043 class Duration {
044 static final int LIMIT = 100;
045 final long start = System.currentTimeMillis();
046 final String name;
047
048 Duration(String name) {
049 this.name = name;
050 }
051 void end() {
052 end(null);
053 }
054 void end(Object o) {
055 long duration = System.currentTimeMillis() - start;
056
057 if (duration > LIMIT) {
058 System.err.println(name + " took a long time: " + duration + "ms " + o);
059 }
060 }
061 }
062 private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
063 protected final WireFormat wireFormat;
064 protected final JDBCAdapter adapter;
065 protected final JDBCPersistenceAdapter persistenceAdapter;
066 protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
067 protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
068
069 protected ActiveMQMessageAudit audit;
070
071 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
072 super(destination);
073 this.persistenceAdapter = persistenceAdapter;
074 this.adapter = adapter;
075 this.wireFormat = wireFormat;
076 this.audit = audit;
077
078 if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
079 recordDestinationCreation(destination);
080 }
081 }
082
083 private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
084 TransactionContext c = persistenceAdapter.getTransactionContext();
085 try {
086 c = persistenceAdapter.getTransactionContext();
087 if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) {
088 adapter.doRecordDestination(c, destination);
089 }
090 } catch (SQLException e) {
091 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
092 throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e);
093 } finally {
094 c.close();
095 }
096 }
097
098 public void addMessage(ConnectionContext context, Message message) throws IOException {
099 MessageId messageId = message.getMessageId();
100 if (audit != null && audit.isDuplicate(message)) {
101 if (LOG.isDebugEnabled()) {
102 LOG.debug(destination.getPhysicalName()
103 + " ignoring duplicated (add) message, already stored: "
104 + messageId);
105 }
106 return;
107 }
108
109 long sequenceId = persistenceAdapter.getNextSequenceId();
110
111 // Serialize the Message..
112 byte data[];
113 try {
114 ByteSequence packet = wireFormat.marshal(message);
115 data = ByteSequenceData.toByteArray(packet);
116 } catch (IOException e) {
117 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
118 }
119
120 // Get a connection and insert the message into the DB.
121 TransactionContext c = persistenceAdapter.getTransactionContext(context);
122 try {
123 adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(),
124 this.isPrioritizedMessages() ? message.getPriority() : 0);
125 } catch (SQLException e) {
126 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
127 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
128 } finally {
129 c.close();
130 }
131 onAdd(messageId, sequenceId, message.getPriority());
132 }
133
134 protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
135 }
136
137 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
138 // Get a connection and insert the message into the DB.
139 TransactionContext c = persistenceAdapter.getTransactionContext(context);
140 try {
141 adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
142 } catch (SQLException e) {
143 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
144 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
145 } finally {
146 c.close();
147 }
148 }
149
150 public Message getMessage(MessageId messageId) throws IOException {
151 // Get a connection and pull the message out of the DB
152 TransactionContext c = persistenceAdapter.getTransactionContext();
153 try {
154 byte data[] = adapter.doGetMessage(c, messageId);
155 if (data == null) {
156 return null;
157 }
158
159 Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
160 return answer;
161 } catch (IOException e) {
162 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
163 } catch (SQLException e) {
164 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
165 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
166 } finally {
167 c.close();
168 }
169 }
170
171 public String getMessageReference(MessageId messageId) throws IOException {
172 long id = messageId.getBrokerSequenceId();
173
174 // Get a connection and pull the message out of the DB
175 TransactionContext c = persistenceAdapter.getTransactionContext();
176 try {
177 return adapter.doGetMessageReference(c, id);
178 } catch (IOException e) {
179 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
180 } catch (SQLException e) {
181 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
182 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
183 } finally {
184 c.close();
185 }
186 }
187
188 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
189
190 long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
191
192 // Get a connection and remove the message from the DB
193 TransactionContext c = persistenceAdapter.getTransactionContext(context);
194 try {
195 adapter.doRemoveMessage(c, seq);
196 } catch (SQLException e) {
197 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
198 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
199 } finally {
200 c.close();
201 }
202 }
203
204 public void recover(final MessageRecoveryListener listener) throws Exception {
205
206 // Get all the Message ids out of the database.
207 TransactionContext c = persistenceAdapter.getTransactionContext();
208 try {
209 c = persistenceAdapter.getTransactionContext();
210 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
211 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
212 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
213 msg.getMessageId().setBrokerSequenceId(sequenceId);
214 return listener.recoverMessage(msg);
215 }
216
217 public boolean recoverMessageReference(String reference) throws Exception {
218 return listener.recoverMessageReference(new MessageId(reference));
219 }
220 });
221 } catch (SQLException e) {
222 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
223 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
224 } finally {
225 c.close();
226 }
227 }
228
229 /**
230 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
231 */
232 public void removeAllMessages(ConnectionContext context) throws IOException {
233 // Get a connection and remove the message from the DB
234 TransactionContext c = persistenceAdapter.getTransactionContext(context);
235 try {
236 adapter.doRemoveAllMessages(c, destination);
237 } catch (SQLException e) {
238 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
239 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
240 } finally {
241 c.close();
242 }
243 }
244
245 public int getMessageCount() throws IOException {
246 int result = 0;
247 TransactionContext c = persistenceAdapter.getTransactionContext();
248 try {
249
250 result = adapter.doGetMessageCount(c, destination);
251
252 } catch (SQLException e) {
253 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
254 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
255 } finally {
256 c.close();
257 }
258 return result;
259 }
260
261 /**
262 * @param maxReturned
263 * @param listener
264 * @throws Exception
265 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
266 * org.apache.activemq.store.MessageRecoveryListener)
267 */
268 public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
269 TransactionContext c = persistenceAdapter.getTransactionContext();
270 try {
271 adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
272 maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
273
274 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
275 if (listener.hasSpace()) {
276 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
277 msg.getMessageId().setBrokerSequenceId(sequenceId);
278 listener.recoverMessage(msg);
279 lastRecoveredSequenceId.set(sequenceId);
280 lastRecoveredPriority.set(msg.getPriority());
281 return true;
282 }
283 return false;
284 }
285
286 public boolean recoverMessageReference(String reference) throws Exception {
287 if (listener.hasSpace()) {
288 listener.recoverMessageReference(new MessageId(reference));
289 return true;
290 }
291 return false;
292 }
293
294 });
295 } catch (SQLException e) {
296 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
297 } finally {
298 c.close();
299 }
300
301 }
302
303 /**
304 * @see org.apache.activemq.store.MessageStore#resetBatching()
305 */
306 public void resetBatching() {
307 if (LOG.isTraceEnabled()) {
308 LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
309 }
310 lastRecoveredSequenceId.set(-1);
311 lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
312
313 }
314
315 @Override
316 public void setBatch(MessageId messageId) {
317 try {
318 long[] storedValues = getStoreSequenceIdForMessageId(messageId);
319 lastRecoveredSequenceId.set(storedValues[0]);
320 lastRecoveredPriority.set(storedValues[1]);
321 } catch (IOException ignoredAsAlreadyLogged) {
322 lastRecoveredSequenceId.set(-1);
323 lastRecoveredPriority.set(Byte.MAX_VALUE -1);
324 }
325 if (LOG.isTraceEnabled()) {
326 LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
327 + ", priority: " + lastRecoveredPriority.get());
328 }
329 }
330
331 private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
332 long[] result = new long[]{-1, Byte.MAX_VALUE -1};
333 TransactionContext c = persistenceAdapter.getTransactionContext();
334 try {
335 result = adapter.getStoreSequenceId(c, destination, messageId);
336 } catch (SQLException e) {
337 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
338 throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
339 } finally {
340 c.close();
341 }
342 return result;
343 }
344
345 public void setPrioritizedMessages(boolean prioritizedMessages) {
346 super.setPrioritizedMessages(prioritizedMessages);
347 }
348 }