001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.memory;
018
019 import java.io.IOException;
020 import java.util.Collections;
021 import java.util.Iterator;
022 import java.util.LinkedHashMap;
023 import java.util.Map;
024 import java.util.Map.Entry;
025
026 import org.apache.activemq.broker.ConnectionContext;
027 import org.apache.activemq.command.ActiveMQDestination;
028 import org.apache.activemq.command.Message;
029 import org.apache.activemq.command.MessageAck;
030 import org.apache.activemq.command.MessageId;
031 import org.apache.activemq.store.MessageRecoveryListener;
032 import org.apache.activemq.store.MessageStore;
033 import org.apache.activemq.store.AbstractMessageStore;
034 import org.apache.activemq.usage.MemoryUsage;
035 import org.apache.activemq.usage.SystemUsage;
036
037 /**
038 * An implementation of {@link org.apache.activemq.store.MessageStore} which
039 * uses a
040 *
041 *
042 */
043 public class MemoryMessageStore extends AbstractMessageStore {
044
045 protected final Map<MessageId, Message> messageTable;
046 protected MessageId lastBatchId;
047
048 public MemoryMessageStore(ActiveMQDestination destination) {
049 this(destination, new LinkedHashMap<MessageId, Message>());
050 }
051
052 public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) {
053 super(destination);
054 this.messageTable = Collections.synchronizedMap(messageTable);
055 }
056
057 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
058 synchronized (messageTable) {
059 messageTable.put(message.getMessageId(), message);
060 }
061 message.incrementReferenceCount();
062 }
063
064 // public void addMessageReference(ConnectionContext context,MessageId
065 // messageId,long expirationTime,String messageRef)
066 // throws IOException{
067 // synchronized(messageTable){
068 // messageTable.put(messageId,messageRef);
069 // }
070 // }
071
072 public Message getMessage(MessageId identity) throws IOException {
073 return messageTable.get(identity);
074 }
075
076 // public String getMessageReference(MessageId identity) throws IOException{
077 // return (String)messageTable.get(identity);
078 // }
079
080 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
081 removeMessage(ack.getLastMessageId());
082 }
083
084 public void removeMessage(MessageId msgId) throws IOException {
085 synchronized (messageTable) {
086 Message removed = messageTable.remove(msgId);
087 if( removed !=null ) {
088 removed.decrementReferenceCount();
089 }
090 if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
091 lastBatchId = null;
092 }
093 }
094 }
095
096 public void recover(MessageRecoveryListener listener) throws Exception {
097 // the message table is a synchronizedMap - so just have to synchronize
098 // here
099 synchronized (messageTable) {
100 for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) {
101 Object msg = iter.next();
102 if (msg.getClass() == MessageId.class) {
103 listener.recoverMessageReference((MessageId)msg);
104 } else {
105 listener.recoverMessage((Message)msg);
106 }
107 }
108 }
109 }
110
111 public void removeAllMessages(ConnectionContext context) throws IOException {
112 synchronized (messageTable) {
113 messageTable.clear();
114 }
115 }
116
117 public void delete() {
118 synchronized (messageTable) {
119 messageTable.clear();
120 }
121 }
122
123
124 public int getMessageCount() {
125 return messageTable.size();
126 }
127
128 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
129 synchronized (messageTable) {
130 boolean pastLackBatch = lastBatchId == null;
131 int count = 0;
132 for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
133 Map.Entry entry = (Entry)iter.next();
134 if (pastLackBatch) {
135 count++;
136 Object msg = entry.getValue();
137 lastBatchId = (MessageId)entry.getKey();
138 if (msg.getClass() == MessageId.class) {
139 listener.recoverMessageReference((MessageId)msg);
140 } else {
141 listener.recoverMessage((Message)msg);
142 }
143 } else {
144 pastLackBatch = entry.getKey().equals(lastBatchId);
145 }
146 }
147 }
148 }
149
150 public void resetBatching() {
151 lastBatchId = null;
152 }
153
154 @Override
155 public void setBatch(MessageId messageId) {
156 lastBatchId = messageId;
157 }
158
159 }