001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.util.Iterator;
020
021 import org.apache.activemq.command.MessageId;
022 import org.apache.activemq.kaha.MapContainer;
023 import org.apache.activemq.kaha.StoreEntry;
024
025 /**
026 * Holds information for the subscriber
027 *
028 *
029 */
030 public class TopicSubContainer {
031 private transient MapContainer mapContainer;
032 private transient StoreEntry batchEntry;
033
034 public TopicSubContainer(MapContainer container) {
035 this.mapContainer = container;
036 }
037
038 /**
039 * @return the batchEntry
040 */
041 public StoreEntry getBatchEntry() {
042 return this.batchEntry;
043 }
044
045 /**
046 * @param id
047 * @param batchEntry the batchEntry to set
048 */
049 public void setBatchEntry(String id, StoreEntry batchEntry) {
050 this.batchEntry = batchEntry;
051 }
052
053 public void reset() {
054 batchEntry = null;
055 }
056
057 public boolean isEmpty() {
058 return mapContainer.isEmpty();
059 }
060
061 public StoreEntry add(ConsumerMessageRef ref) {
062 return mapContainer.place(ref.getMessageId(),ref);
063 }
064
065 public ConsumerMessageRef remove(MessageId id) {
066 ConsumerMessageRef result = null;
067 StoreEntry entry = mapContainer.getEntry(id);
068 if (entry != null) {
069 result = (ConsumerMessageRef) mapContainer.getValue(entry);
070 mapContainer.remove(entry);
071 if (batchEntry != null && batchEntry.equals(entry)) {
072 reset();
073 }
074 }
075 if(mapContainer.isEmpty()) {
076 reset();
077 }
078 return result;
079 }
080
081
082 public ConsumerMessageRef get(StoreEntry entry) {
083 return (ConsumerMessageRef)mapContainer.getValue(entry);
084 }
085
086 public StoreEntry getEntry() {
087 return mapContainer.getFirst();
088 }
089
090 public StoreEntry refreshEntry(StoreEntry entry) {
091 return mapContainer.refresh(entry);
092 }
093
094 public StoreEntry getNextEntry(StoreEntry entry) {
095 return mapContainer.getNext(entry);
096 }
097
098 public Iterator iterator() {
099 return mapContainer.values().iterator();
100 }
101
102 public int size() {
103 return mapContainer.size();
104 }
105
106 public void clear() {
107 reset();
108 mapContainer.clear();
109 }
110 }