001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.group;
018
019 import org.apache.activemq.command.ConsumerId;
020
021 /**
022 * Uses hash-code buckets to associate consumers with sets of message group IDs.
023 *
024 *
025 */
026 public class MessageGroupHashBucket implements MessageGroupMap {
027
028 private final int bucketCount;
029 private final ConsumerId[] consumers;
030
031 public MessageGroupHashBucket(int bucketCount) {
032 this.bucketCount = bucketCount;
033 this.consumers = new ConsumerId[bucketCount];
034 }
035
036 public void put(String groupId, ConsumerId consumerId) {
037 int bucket = getBucketNumber(groupId);
038 consumers[bucket] = consumerId;
039 }
040
041 public ConsumerId get(String groupId) {
042 int bucket = getBucketNumber(groupId);
043 return consumers[bucket];
044 }
045
046 public ConsumerId removeGroup(String groupId) {
047 int bucket = getBucketNumber(groupId);
048 ConsumerId answer = consumers[bucket];
049 consumers[bucket] = null;
050 return answer;
051 }
052
053 public MessageGroupSet removeConsumer(ConsumerId consumerId) {
054 MessageGroupSet answer = null;
055 for (int i = 0; i < consumers.length; i++) {
056 ConsumerId owner = consumers[i];
057 if (owner != null && owner.equals(consumerId)) {
058 answer = createMessageGroupSet(i, answer);
059 consumers[i] = null;
060 }
061 }
062 if (answer == null) {
063 // make an empty set
064 answer = EmptyMessageGroupSet.INSTANCE;
065 }
066 return answer;
067 }
068
069 public String toString() {
070 int count = 0;
071 for (int i = 0; i < consumers.length; i++) {
072 if (consumers[i] != null) {
073 count++;
074 }
075 }
076 return "active message group buckets: " + count;
077 }
078
079 protected MessageGroupSet createMessageGroupSet(int bucketNumber, final MessageGroupSet parent) {
080 final MessageGroupSet answer = createMessageGroupSet(bucketNumber);
081 if (parent == null) {
082 return answer;
083 } else {
084 // union the two sets together
085 return new MessageGroupSet() {
086 public boolean contains(String groupID) {
087 return parent.contains(groupID) || answer.contains(groupID);
088 }
089 };
090 }
091 }
092
093 protected MessageGroupSet createMessageGroupSet(final int bucketNumber) {
094 return new MessageGroupSet() {
095 public boolean contains(String groupID) {
096 int bucket = getBucketNumber(groupID);
097 return bucket == bucketNumber;
098 }
099 };
100 }
101
102 protected int getBucketNumber(String groupId) {
103 int bucket = groupId.hashCode() % bucketCount;
104 // bucket could be negative
105 if (bucket < 0) {
106 bucket *= -1;
107 }
108 return bucket;
109 }
110 }