001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.DataInputStream;
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.HashSet;
023 import java.util.Iterator;
024 import java.util.Map;
025 import java.util.Set;
026 import java.util.Map.Entry;
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.command.ActiveMQDestination;
029 import org.apache.activemq.command.ActiveMQQueue;
030 import org.apache.activemq.command.ActiveMQTempQueue;
031 import org.apache.activemq.command.ActiveMQTempTopic;
032 import org.apache.activemq.command.ActiveMQTopic;
033 import org.apache.activemq.command.Message;
034 import org.apache.activemq.command.MessageAck;
035 import org.apache.activemq.command.MessageId;
036 import org.apache.activemq.command.ProducerId;
037 import org.apache.activemq.command.SubscriptionInfo;
038 import org.apache.activemq.command.TransactionId;
039 import org.apache.activemq.command.XATransactionId;
040 import org.apache.activemq.openwire.OpenWireFormat;
041 import org.apache.activemq.protobuf.Buffer;
042 import org.apache.activemq.store.AbstractMessageStore;
043 import org.apache.activemq.store.MessageRecoveryListener;
044 import org.apache.activemq.store.MessageStore;
045 import org.apache.activemq.store.PersistenceAdapter;
046 import org.apache.activemq.store.TopicMessageStore;
047 import org.apache.activemq.store.TransactionRecoveryListener;
048 import org.apache.activemq.store.TransactionStore;
049 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
050 import org.apache.activemq.store.kahadb.data.KahaDestination;
051 import org.apache.activemq.store.kahadb.data.KahaLocation;
052 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
053 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
054 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
055 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
056 import org.apache.activemq.usage.MemoryUsage;
057 import org.apache.activemq.usage.SystemUsage;
058 import org.apache.activemq.util.ByteSequence;
059 import org.apache.activemq.wireformat.WireFormat;
060 import org.apache.kahadb.journal.Location;
061 import org.apache.kahadb.page.Transaction;
062
063 public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
064
065 private final WireFormat wireFormat = new OpenWireFormat();
066
067 public void setBrokerName(String brokerName) {
068 }
069 public void setUsageManager(SystemUsage usageManager) {
070 }
071
072 public TransactionStore createTransactionStore() throws IOException {
073 return new TransactionStore(){
074
075 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
076 if (preCommit != null) {
077 preCommit.run();
078 }
079 processCommit(txid);
080 if (postCommit != null) {
081 postCommit.run();
082 }
083 }
084 public void prepare(TransactionId txid) throws IOException {
085 processPrepare(txid);
086 }
087 public void rollback(TransactionId txid) throws IOException {
088 processRollback(txid);
089 }
090 public void recover(TransactionRecoveryListener listener) throws IOException {
091 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
092 XATransactionId xid = (XATransactionId)entry.getKey();
093 ArrayList<Message> messageList = new ArrayList<Message>();
094 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
095
096 for (Operation op : entry.getValue()) {
097 if( op.getClass() == AddOpperation.class ) {
098 AddOpperation addOp = (AddOpperation)op;
099 Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
100 messageList.add(msg);
101 } else {
102 RemoveOpperation rmOp = (RemoveOpperation)op;
103 MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
104 ackList.add(ack);
105 }
106 }
107
108 Message[] addedMessages = new Message[messageList.size()];
109 MessageAck[] acks = new MessageAck[ackList.size()];
110 messageList.toArray(addedMessages);
111 ackList.toArray(acks);
112 listener.recover(xid, addedMessages, acks);
113 }
114 }
115 public void start() throws Exception {
116 }
117 public void stop() throws Exception {
118 }
119 };
120 }
121
122 public class KahaDBMessageStore extends AbstractMessageStore {
123 protected KahaDestination dest;
124
125 public KahaDBMessageStore(ActiveMQDestination destination) {
126 super(destination);
127 this.dest = convert( destination );
128 }
129
130 @Override
131 public ActiveMQDestination getDestination() {
132 return destination;
133 }
134
135 public void addMessage(ConnectionContext context, Message message) throws IOException {
136 KahaAddMessageCommand command = new KahaAddMessageCommand();
137 command.setDestination(dest);
138 command.setMessageId(message.getMessageId().toString());
139 processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
140 }
141
142 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
143 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
144 command.setDestination(dest);
145 command.setMessageId(ack.getLastMessageId().toString());
146 processRemove(command, ack.getTransactionId());
147 }
148
149 public void removeAllMessages(ConnectionContext context) throws IOException {
150 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
151 command.setDestination(dest);
152 process(command);
153 }
154
155 public Message getMessage(MessageId identity) throws IOException {
156 final String key = identity.toString();
157
158 // Hopefully one day the page file supports concurrent read operations... but for now we must
159 // externally synchronize...
160 ByteSequence data;
161 synchronized(indexMutex) {
162 data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
163 public ByteSequence execute(Transaction tx) throws IOException {
164 StoredDestination sd = getStoredDestination(dest, tx);
165 Long sequence = sd.messageIdIndex.get(tx, key);
166 if( sequence ==null ) {
167 return null;
168 }
169 return sd.orderIndex.get(tx, sequence).data;
170 }
171 });
172 }
173 if( data == null ) {
174 return null;
175 }
176
177 Message msg = (Message)wireFormat.unmarshal( data );
178 return msg;
179 }
180
181 public int getMessageCount() throws IOException {
182 synchronized(indexMutex) {
183 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
184 public Integer execute(Transaction tx) throws IOException {
185 // Iterate through all index entries to get a count of messages in the destination.
186 StoredDestination sd = getStoredDestination(dest, tx);
187 int rc=0;
188 for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
189 iterator.next();
190 rc++;
191 }
192 return rc;
193 }
194 });
195 }
196 }
197
198 public void recover(final MessageRecoveryListener listener) throws Exception {
199 synchronized(indexMutex) {
200 pageFile.tx().execute(new Transaction.Closure<Exception>(){
201 public void execute(Transaction tx) throws Exception {
202 StoredDestination sd = getStoredDestination(dest, tx);
203 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
204 Entry<Long, MessageRecord> entry = iterator.next();
205 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) );
206 }
207 }
208 });
209 }
210 }
211
212 long cursorPos=0;
213
214 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
215 synchronized(indexMutex) {
216 pageFile.tx().execute(new Transaction.Closure<Exception>(){
217 public void execute(Transaction tx) throws Exception {
218 StoredDestination sd = getStoredDestination(dest, tx);
219 Entry<Long, MessageRecord> entry=null;
220 int counter = 0;
221 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
222 entry = iterator.next();
223 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
224 counter++;
225 if( counter >= maxReturned ) {
226 break;
227 }
228 }
229 if( entry!=null ) {
230 cursorPos = entry.getKey()+1;
231 }
232 }
233 });
234 }
235 }
236
237 public void resetBatching() {
238 cursorPos=0;
239 }
240
241
242 @Override
243 public void setBatch(MessageId identity) throws IOException {
244 final String key = identity.toString();
245
246 // Hopefully one day the page file supports concurrent read operations... but for now we must
247 // externally synchronize...
248 Long location;
249 synchronized(indexMutex) {
250 location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
251 public Long execute(Transaction tx) throws IOException {
252 StoredDestination sd = getStoredDestination(dest, tx);
253 return sd.messageIdIndex.get(tx, key);
254 }
255 });
256 }
257 if( location!=null ) {
258 cursorPos=location+1;
259 }
260
261 }
262
263 @Override
264 public void setMemoryUsage(MemoryUsage memoeyUSage) {
265 }
266 @Override
267 public void start() throws Exception {
268 }
269 @Override
270 public void stop() throws Exception {
271 }
272
273 }
274
275 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
276 public KahaDBTopicMessageStore(ActiveMQTopic destination) {
277 super(destination);
278 }
279
280 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
281 MessageId messageId, MessageAck ack) throws IOException {
282 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
283 command.setDestination(dest);
284 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
285 command.setMessageId(messageId.toString());
286 // We are not passed a transaction info.. so we can't participate in a transaction.
287 // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack
288 // to pass back to the XA recover method.
289 // command.setTransactionInfo();
290 processRemove(command, null);
291 }
292
293 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
294 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
295 KahaSubscriptionCommand command = new KahaSubscriptionCommand();
296 command.setDestination(dest);
297 command.setSubscriptionKey(subscriptionKey);
298 command.setRetroactive(retroactive);
299 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
300 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
301 process(command);
302 }
303
304 public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
305 KahaSubscriptionCommand command = new KahaSubscriptionCommand();
306 command.setDestination(dest);
307 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
308 process(command);
309 }
310
311 public SubscriptionInfo[] getAllSubscriptions() throws IOException {
312
313 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
314 synchronized(indexMutex) {
315 pageFile.tx().execute(new Transaction.Closure<IOException>(){
316 public void execute(Transaction tx) throws IOException {
317 StoredDestination sd = getStoredDestination(dest, tx);
318 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
319 Entry<String, KahaSubscriptionCommand> entry = iterator.next();
320 SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
321 subscriptions.add(info);
322
323 }
324 }
325 });
326 }
327
328 SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
329 subscriptions.toArray(rc);
330 return rc;
331 }
332
333 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
334 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
335 synchronized(indexMutex) {
336 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
337 public SubscriptionInfo execute(Transaction tx) throws IOException {
338 StoredDestination sd = getStoredDestination(dest, tx);
339 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
340 if( command ==null ) {
341 return null;
342 }
343 return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
344 }
345 });
346 }
347 }
348
349 public int getMessageCount(String clientId, String subscriptionName) throws IOException {
350 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
351 synchronized(indexMutex) {
352 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
353 public Integer execute(Transaction tx) throws IOException {
354 StoredDestination sd = getStoredDestination(dest, tx);
355 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
356 if ( cursorPos==null ) {
357 // The subscription might not exist.
358 return 0;
359 }
360 cursorPos += 1;
361
362 int counter = 0;
363 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
364 iterator.next();
365 counter++;
366 }
367 return counter;
368 }
369 });
370 }
371 }
372
373 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
374 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
375 synchronized(indexMutex) {
376 pageFile.tx().execute(new Transaction.Closure<Exception>(){
377 public void execute(Transaction tx) throws Exception {
378 StoredDestination sd = getStoredDestination(dest, tx);
379 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
380 cursorPos += 1;
381
382 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
383 Entry<Long, MessageRecord> entry = iterator.next();
384 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
385 }
386 }
387 });
388 }
389 }
390
391 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
392 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
393 synchronized(indexMutex) {
394 pageFile.tx().execute(new Transaction.Closure<Exception>(){
395 public void execute(Transaction tx) throws Exception {
396 StoredDestination sd = getStoredDestination(dest, tx);
397 Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
398 if( cursorPos == null ) {
399 cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
400 cursorPos += 1;
401 }
402
403 Entry<Long, MessageRecord> entry=null;
404 int counter = 0;
405 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
406 entry = iterator.next();
407 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
408 counter++;
409 if( counter >= maxReturned ) {
410 break;
411 }
412 }
413 if( entry!=null ) {
414 sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
415 }
416 }
417 });
418 }
419 }
420
421 public void resetBatching(String clientId, String subscriptionName) {
422 try {
423 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
424 synchronized(indexMutex) {
425 pageFile.tx().execute(new Transaction.Closure<IOException>(){
426 public void execute(Transaction tx) throws IOException {
427 StoredDestination sd = getStoredDestination(dest, tx);
428 sd.subscriptionCursors.remove(subscriptionKey);
429 }
430 });
431 }
432 } catch (IOException e) {
433 throw new RuntimeException(e);
434 }
435 }
436 }
437
438 String subscriptionKey(String clientId, String subscriptionName){
439 return clientId+":"+subscriptionName;
440 }
441
442 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
443 return new KahaDBMessageStore(destination);
444 }
445
446 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
447 return new KahaDBTopicMessageStore(destination);
448 }
449
450 /**
451 * Cleanup method to remove any state associated with the given destination.
452 * This method does not stop the message store (it might not be cached).
453 *
454 * @param destination Destination to forget
455 */
456 public void removeQueueMessageStore(ActiveMQQueue destination) {
457 }
458
459 /**
460 * Cleanup method to remove any state associated with the given destination
461 * This method does not stop the message store (it might not be cached).
462 *
463 * @param destination Destination to forget
464 */
465 public void removeTopicMessageStore(ActiveMQTopic destination) {
466 }
467
468 public void deleteAllMessages() throws IOException {
469 }
470
471
472 public Set<ActiveMQDestination> getDestinations() {
473 try {
474 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
475 synchronized(indexMutex) {
476 pageFile.tx().execute(new Transaction.Closure<IOException>(){
477 public void execute(Transaction tx) throws IOException {
478 for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
479 Entry<String, StoredDestination> entry = iterator.next();
480 rc.add(convert(entry.getKey()));
481 }
482 }
483 });
484 }
485 return rc;
486 } catch (IOException e) {
487 throw new RuntimeException(e);
488 }
489 }
490
491 public long getLastMessageBrokerSequenceId() throws IOException {
492 return 0;
493 }
494
495 public long size() {
496 if ( !started.get() ) {
497 return 0;
498 }
499 try {
500 return pageFile.getDiskSize();
501 } catch (IOException e) {
502 throw new RuntimeException(e);
503 }
504 }
505
506 public void beginTransaction(ConnectionContext context) throws IOException {
507 throw new IOException("Not yet implemented.");
508 }
509 public void commitTransaction(ConnectionContext context) throws IOException {
510 throw new IOException("Not yet implemented.");
511 }
512 public void rollbackTransaction(ConnectionContext context) throws IOException {
513 throw new IOException("Not yet implemented.");
514 }
515
516 public void checkpoint(boolean sync) throws IOException {
517 }
518
519 ///////////////////////////////////////////////////////////////////
520 // Internal conversion methods.
521 ///////////////////////////////////////////////////////////////////
522
523
524
525 KahaLocation convert(Location location) {
526 KahaLocation rc = new KahaLocation();
527 rc.setLogId(location.getDataFileId());
528 rc.setOffset(location.getOffset());
529 return rc;
530 }
531
532 KahaDestination convert(ActiveMQDestination dest) {
533 KahaDestination rc = new KahaDestination();
534 rc.setName(dest.getPhysicalName());
535 switch( dest.getDestinationType() ) {
536 case ActiveMQDestination.QUEUE_TYPE:
537 rc.setType(DestinationType.QUEUE);
538 return rc;
539 case ActiveMQDestination.TOPIC_TYPE:
540 rc.setType(DestinationType.TOPIC);
541 return rc;
542 case ActiveMQDestination.TEMP_QUEUE_TYPE:
543 rc.setType(DestinationType.TEMP_QUEUE);
544 return rc;
545 case ActiveMQDestination.TEMP_TOPIC_TYPE:
546 rc.setType(DestinationType.TEMP_TOPIC);
547 return rc;
548 default:
549 return null;
550 }
551 }
552
553 ActiveMQDestination convert(String dest) {
554 int p = dest.indexOf(":");
555 if( p<0 ) {
556 throw new IllegalArgumentException("Not in the valid destination format");
557 }
558 int type = Integer.parseInt(dest.substring(0, p));
559 String name = dest.substring(p+1);
560
561 switch( KahaDestination.DestinationType.valueOf(type) ) {
562 case QUEUE:
563 return new ActiveMQQueue(name);
564 case TOPIC:
565 return new ActiveMQTopic(name);
566 case TEMP_QUEUE:
567 return new ActiveMQTempQueue(name);
568 case TEMP_TOPIC:
569 return new ActiveMQTempTopic(name);
570 default:
571 throw new IllegalArgumentException("Not in the valid destination format");
572 }
573 }
574
575 public long getLastProducerSequenceId(ProducerId id) {
576 return -1;
577 }
578
579 }