001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.io.IOException;
020 import org.apache.activemq.broker.ConnectionContext;
021 import org.apache.activemq.command.ActiveMQDestination;
022 import org.apache.activemq.command.Message;
023 import org.apache.activemq.command.MessageAck;
024 import org.apache.activemq.command.MessageId;
025 import org.apache.activemq.kaha.MapContainer;
026 import org.apache.activemq.kaha.StoreEntry;
027 import org.apache.activemq.store.MessageRecoveryListener;
028 import org.apache.activemq.store.MessageStore;
029 import org.apache.activemq.store.AbstractMessageStore;
030 import org.apache.activemq.usage.MemoryUsage;
031 import org.apache.activemq.usage.SystemUsage;
032
033 /**
034 * An implementation of {@link org.apache.activemq.store.MessageStore} which
035 * uses a JPS Container
036 *
037 *
038 */
039 public class KahaMessageStore extends AbstractMessageStore {
040
041 protected final MapContainer<MessageId, Message> messageContainer;
042 protected StoreEntry batchEntry;
043
044 public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination)
045 throws IOException {
046 super(destination);
047 this.messageContainer = container;
048 }
049
050 protected MessageId getMessageId(Object object) {
051 return ((Message)object).getMessageId();
052 }
053
054 public Object getId() {
055 return messageContainer.getId();
056 }
057
058 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
059 messageContainer.put(message.getMessageId(), message);
060 // TODO: we should do the following but it is not need if the message is
061 // being added within a persistence
062 // transaction
063 // but since I can't tell if one is running right now.. I'll leave this
064 // out for now.
065 // if( message.isResponseRequired() ) {
066 // messageContainer.force();
067 // }
068 }
069
070 public synchronized Message getMessage(MessageId identity) throws IOException {
071 Message result = messageContainer.get(identity);
072 return result;
073 }
074
075 protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception {
076 listener.recoverMessage(msg);
077 return listener.hasSpace();
078 }
079
080 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
081 removeMessage(ack.getLastMessageId());
082 }
083
084 public synchronized void removeMessage(MessageId msgId) throws IOException {
085 StoreEntry entry = messageContainer.getEntry(msgId);
086 if (entry != null) {
087 messageContainer.remove(entry);
088 if (messageContainer.isEmpty() || (batchEntry != null && batchEntry.equals(entry))) {
089 resetBatching();
090 }
091 }
092 }
093
094 public synchronized void recover(MessageRecoveryListener listener) throws Exception {
095 for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
096 .getNext(entry)) {
097 Message msg = (Message)messageContainer.getValue(entry);
098 if (!recoverMessage(listener, msg)) {
099 break;
100 }
101 }
102 }
103
104 public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
105 messageContainer.clear();
106 }
107
108 public synchronized void delete() {
109 messageContainer.clear();
110 }
111
112 /**
113 * @return the number of messages held by this destination
114 * @see org.apache.activemq.store.MessageStore#getMessageCount()
115 */
116 public int getMessageCount() {
117 return messageContainer.size();
118 }
119
120 /**
121 * @param id
122 * @return null
123 * @throws Exception
124 * @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId)
125 */
126 public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception {
127 return null;
128 }
129
130 /**
131 * @param lastMessageId
132 * @param maxReturned
133 * @param listener
134 * @throws Exception
135 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId,
136 * int, org.apache.activemq.store.MessageRecoveryListener)
137 */
138 public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
139 throws Exception {
140 StoreEntry entry = batchEntry;
141 if (entry == null) {
142 entry = messageContainer.getFirst();
143 } else {
144 entry = messageContainer.refresh(entry);
145 entry = messageContainer.getNext(entry);
146 if (entry == null) {
147 batchEntry = null;
148 }
149 }
150 if (entry != null) {
151 int count = 0;
152 do {
153 Message msg = messageContainer.getValue(entry);
154 if (msg != null) {
155 recoverMessage(listener, msg);
156 count++;
157 }
158 batchEntry = entry;
159 entry = messageContainer.getNext(entry);
160 } while (entry != null && count < maxReturned && listener.hasSpace());
161 }
162 }
163
164 /**
165 * @param nextToDispatch
166 * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
167 */
168 public synchronized void resetBatching() {
169 batchEntry = null;
170 }
171
172 /**
173 * @return true if the store supports cursors
174 */
175 public boolean isSupportForCursors() {
176 return true;
177 }
178
179 @Override
180 public void setBatch(MessageId messageId) {
181 batchEntry = messageContainer.getEntry(messageId);
182 }
183
184 }