001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import org.apache.activemq.command.ConsumerId;
020 import org.apache.activemq.command.Message;
021 import org.apache.activemq.command.MessageId;
022
023 /**
024 * Keeps track of a message that is flowing through the Broker. This object may
025 * hold a hard reference to the message or only hold the id of the message if
026 * the message has been persisted on in a MessageStore.
027 *
028 *
029 */
030 public class IndirectMessageReference implements QueueMessageReference {
031
032 /** The subscription that has locked the message */
033 private LockOwner lockOwner;
034 /** Has the message been dropped? */
035 private boolean dropped;
036 /** Has the message been acked? */
037 private boolean acked;
038 /** Direct reference to the message */
039 private final Message message;
040
041 /**
042 * @param message
043 */
044 public IndirectMessageReference(final Message message) {
045 this.message = message;
046 message.getMessageId();
047 message.getGroupID();
048 message.getGroupSequence();
049 }
050
051 public Message getMessageHardRef() {
052 return message;
053 }
054
055 public int getReferenceCount() {
056 return message.getReferenceCount();
057 }
058
059 public int incrementReferenceCount() {
060 return message.incrementReferenceCount();
061 }
062
063 public int decrementReferenceCount() {
064 return message.decrementReferenceCount();
065 }
066
067 public Message getMessage() {
068 return message;
069 }
070
071 public String toString() {
072 return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null);
073 }
074
075 public void incrementRedeliveryCounter() {
076 message.incrementRedeliveryCounter();
077 }
078
079 public synchronized boolean isDropped() {
080 return dropped;
081 }
082
083 public synchronized void drop() {
084 dropped = true;
085 lockOwner = null;
086 message.decrementReferenceCount();
087 }
088
089 public boolean lock(LockOwner subscription) {
090 synchronized (this) {
091 if (dropped || lockOwner != null) {
092 return false;
093 }
094 lockOwner = subscription;
095 return true;
096 }
097 }
098
099 public synchronized boolean unlock() {
100 boolean result = lockOwner != null;
101 lockOwner = null;
102 return result;
103 }
104
105 public synchronized LockOwner getLockOwner() {
106 return lockOwner;
107 }
108
109 public int getRedeliveryCounter() {
110 return message.getRedeliveryCounter();
111 }
112
113 public MessageId getMessageId() {
114 return message.getMessageId();
115 }
116
117 public Destination getRegionDestination() {
118 return message.getRegionDestination();
119 }
120
121 public boolean isPersistent() {
122 return message.isPersistent();
123 }
124
125 public synchronized boolean isLocked() {
126 return lockOwner != null;
127 }
128
129 public synchronized boolean isAcked() {
130 return acked;
131 }
132
133 public synchronized void setAcked(boolean b) {
134 acked = b;
135 }
136
137 public String getGroupID() {
138 return message.getGroupID();
139 }
140
141 public int getGroupSequence() {
142 return message.getGroupSequence();
143 }
144
145 public ConsumerId getTargetConsumerId() {
146 return message.getTargetConsumerId();
147 }
148
149 public long getExpiration() {
150 return message.getExpiration();
151 }
152
153 public boolean isExpired() {
154 return message.isExpired();
155 }
156
157 public synchronized int getSize() {
158 return message.getSize();
159 }
160
161 public boolean isAdvisory() {
162 return message.isAdvisory();
163 }
164 }