001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.memory;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.HashSet;
022 import java.util.Iterator;
023 import java.util.Set;
024 import java.util.concurrent.ConcurrentHashMap;
025
026 import org.apache.activemq.broker.ConnectionContext;
027 import org.apache.activemq.command.ActiveMQDestination;
028 import org.apache.activemq.command.ActiveMQQueue;
029 import org.apache.activemq.command.ActiveMQTopic;
030 import org.apache.activemq.command.ProducerId;
031 import org.apache.activemq.store.MessageStore;
032 import org.apache.activemq.store.PersistenceAdapter;
033 import org.apache.activemq.store.ProxyMessageStore;
034 import org.apache.activemq.store.TopicMessageStore;
035 import org.apache.activemq.store.TransactionStore;
036 import org.apache.activemq.usage.SystemUsage;
037 import org.slf4j.Logger;
038 import org.slf4j.LoggerFactory;
039
040 /**
041 * @org.apache.xbean.XBean
042 *
043 */
044 public class MemoryPersistenceAdapter implements PersistenceAdapter {
045 private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
046
047 MemoryTransactionStore transactionStore;
048 ConcurrentHashMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
049 ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
050 private boolean useExternalMessageReferences;
051
052 public Set<ActiveMQDestination> getDestinations() {
053 Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
054 for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
055 rc.add(iter.next());
056 }
057 for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) {
058 rc.add(iter.next());
059 }
060 return rc;
061 }
062
063 public static MemoryPersistenceAdapter newInstance(File file) {
064 return new MemoryPersistenceAdapter();
065 }
066
067 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
068 MessageStore rc = queues.get(destination);
069 if (rc == null) {
070 rc = new MemoryMessageStore(destination);
071 if (transactionStore != null) {
072 rc = transactionStore.proxy(rc);
073 }
074 queues.put(destination, rc);
075 }
076 return rc;
077 }
078
079 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
080 TopicMessageStore rc = topics.get(destination);
081 if (rc == null) {
082 rc = new MemoryTopicMessageStore(destination);
083 if (transactionStore != null) {
084 rc = transactionStore.proxy(rc);
085 }
086 topics.put(destination, rc);
087 }
088 return rc;
089 }
090
091 /**
092 * Cleanup method to remove any state associated with the given destination
093 *
094 * @param destination Destination to forget
095 */
096 public void removeQueueMessageStore(ActiveMQQueue destination) {
097 queues.remove(destination);
098 }
099
100 /**
101 * Cleanup method to remove any state associated with the given destination
102 *
103 * @param destination Destination to forget
104 */
105 public void removeTopicMessageStore(ActiveMQTopic destination) {
106 topics.remove(destination);
107 }
108
109 public TransactionStore createTransactionStore() throws IOException {
110 if (transactionStore == null) {
111 transactionStore = new MemoryTransactionStore(this);
112 }
113 return transactionStore;
114 }
115
116 public void beginTransaction(ConnectionContext context) {
117 }
118
119 public void commitTransaction(ConnectionContext context) {
120 }
121
122 public void rollbackTransaction(ConnectionContext context) {
123 }
124
125 public void start() throws Exception {
126 }
127
128 public void stop() throws Exception {
129 }
130
131 public long getLastMessageBrokerSequenceId() throws IOException {
132 return 0;
133 }
134
135 public void deleteAllMessages() throws IOException {
136 for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
137 MemoryMessageStore store = asMemoryMessageStore(iter.next());
138 if (store != null) {
139 store.delete();
140 }
141 }
142 for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) {
143 MemoryMessageStore store = asMemoryMessageStore(iter.next());
144 if (store != null) {
145 store.delete();
146 }
147 }
148
149 if (transactionStore != null) {
150 transactionStore.delete();
151 }
152 }
153
154 public boolean isUseExternalMessageReferences() {
155 return useExternalMessageReferences;
156 }
157
158 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
159 this.useExternalMessageReferences = useExternalMessageReferences;
160 }
161
162 protected MemoryMessageStore asMemoryMessageStore(Object value) {
163 if (value instanceof MemoryMessageStore) {
164 return (MemoryMessageStore)value;
165 }
166 if (value instanceof ProxyMessageStore) {
167 MessageStore delegate = ((ProxyMessageStore)value).getDelegate();
168 if (delegate instanceof MemoryMessageStore) {
169 return (MemoryMessageStore) delegate;
170 }
171 }
172 LOG.warn("Expected an instance of MemoryMessageStore but was: " + value);
173 return null;
174 }
175
176 /**
177 * @param usageManager The UsageManager that is controlling the broker's
178 * memory usage.
179 */
180 public void setUsageManager(SystemUsage usageManager) {
181 }
182
183 public String toString() {
184 return "MemoryPersistenceAdapter";
185 }
186
187 public void setBrokerName(String brokerName) {
188 }
189
190 public void setDirectory(File dir) {
191 }
192
193 public File getDirectory(){
194 return null;
195 }
196
197 public void checkpoint(boolean sync) throws IOException {
198 }
199
200 public long size(){
201 return 0;
202 }
203
204 public void setCreateTransactionStore(boolean create) throws IOException {
205 if (create) {
206 createTransactionStore();
207 }
208 }
209
210 public long getLastProducerSequenceId(ProducerId id) {
211 // memory map does duplicate suppression
212 return -1;
213 }
214 }