001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.policy;
018
019 import java.util.Iterator;
020 import java.util.List;
021 import org.apache.activemq.broker.Broker;
022 import org.apache.activemq.broker.ConnectionContext;
023 import org.apache.activemq.broker.region.MessageReference;
024 import org.apache.activemq.broker.region.SubscriptionRecovery;
025 import org.apache.activemq.broker.region.Topic;
026 import org.apache.activemq.command.ActiveMQDestination;
027 import org.apache.activemq.command.Message;
028 import org.apache.activemq.memory.list.DestinationBasedMessageList;
029 import org.apache.activemq.memory.list.MessageList;
030 import org.apache.activemq.memory.list.SimpleMessageList;
031
032 /**
033 * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed
034 * amount of memory available in RAM for message history which is evicted in
035 * time order.
036 *
037 * @org.apache.xbean.XBean
038 *
039 */
040 public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
041
042 private MessageList buffer;
043 private int maximumSize = 64 * 1024;
044 private boolean useSharedBuffer = true;
045
046 public FixedSizedSubscriptionRecoveryPolicy() {
047
048 }
049
050 public SubscriptionRecoveryPolicy copy() {
051 FixedSizedSubscriptionRecoveryPolicy rc = new FixedSizedSubscriptionRecoveryPolicy();
052 rc.setMaximumSize(maximumSize);
053 rc.setUseSharedBuffer(useSharedBuffer);
054 return rc;
055 }
056
057 public boolean add(ConnectionContext context, MessageReference message) throws Exception {
058 buffer.add(message);
059 return true;
060 }
061
062 public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
063 // Re-dispatch the messages from the buffer.
064 List copy = buffer.getMessages(sub.getActiveMQDestination());
065 if (!copy.isEmpty()) {
066 for (Iterator iter = copy.iterator(); iter.hasNext();) {
067 MessageReference node = (MessageReference)iter.next();
068 sub.addRecoveredMessage(context, node);
069 }
070 }
071 }
072
073 public void start() throws Exception {
074 buffer = createMessageList();
075 }
076
077 public void stop() throws Exception {
078 buffer.clear();
079 }
080
081 // Properties
082 // -------------------------------------------------------------------------
083 public MessageList getBuffer() {
084 return buffer;
085 }
086
087 public void setBuffer(MessageList buffer) {
088 this.buffer = buffer;
089 }
090
091 public int getMaximumSize() {
092 return maximumSize;
093 }
094
095 /**
096 * Sets the maximum amount of RAM in bytes that this buffer can hold in RAM
097 */
098 public void setMaximumSize(int maximumSize) {
099 this.maximumSize = maximumSize;
100 }
101
102 public boolean isUseSharedBuffer() {
103 return useSharedBuffer;
104 }
105
106 public void setUseSharedBuffer(boolean useSharedBuffer) {
107 this.useSharedBuffer = useSharedBuffer;
108 }
109
110 public Message[] browse(ActiveMQDestination destination) throws Exception {
111 return buffer.browse(destination);
112 }
113
114 public void setBroker(Broker broker) {
115 }
116
117 // Implementation methods
118
119 // -------------------------------------------------------------------------
120 protected MessageList createMessageList() {
121 if (useSharedBuffer) {
122 return new SimpleMessageList(maximumSize);
123 } else {
124 return new DestinationBasedMessageList(maximumSize);
125 }
126 }
127
128 }