001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import org.apache.activemq.broker.region.Destination;
020 import org.apache.activemq.broker.region.Region;
021 import org.apache.activemq.command.Message;
022 import org.apache.activemq.command.MessageId;
023 import org.apache.activemq.state.ProducerState;
024 import org.slf4j.Logger;
025 import org.slf4j.LoggerFactory;
026
027 import java.io.IOException;
028 import java.util.concurrent.atomic.AtomicLong;
029
030 /**
031 * Holds internal state in the broker for a MessageProducer
032 *
033 *
034 */
035 public class ProducerBrokerExchange {
036
037 private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class);
038 private ConnectionContext connectionContext;
039 private Destination regionDestination;
040 private Region region;
041 private ProducerState producerState;
042 private boolean mutable = true;
043 private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
044 private boolean auditProducerSequenceIds;
045 private boolean isNetworkProducer;
046 private BrokerService brokerService;
047
048 public ProducerBrokerExchange() {
049 }
050
051 public ProducerBrokerExchange copy() {
052 ProducerBrokerExchange rc = new ProducerBrokerExchange();
053 rc.connectionContext = connectionContext.copy();
054 rc.regionDestination = regionDestination;
055 rc.region = region;
056 rc.producerState = producerState;
057 rc.mutable = mutable;
058 return rc;
059 }
060
061
062 /**
063 * @return the connectionContext
064 */
065 public ConnectionContext getConnectionContext() {
066 return this.connectionContext;
067 }
068
069 /**
070 * @param connectionContext the connectionContext to set
071 */
072 public void setConnectionContext(ConnectionContext connectionContext) {
073 this.connectionContext = connectionContext;
074 }
075
076 /**
077 * @return the mutable
078 */
079 public boolean isMutable() {
080 return this.mutable;
081 }
082
083 /**
084 * @param mutable the mutable to set
085 */
086 public void setMutable(boolean mutable) {
087 this.mutable = mutable;
088 }
089
090 /**
091 * @return the regionDestination
092 */
093 public Destination getRegionDestination() {
094 return this.regionDestination;
095 }
096
097 /**
098 * @param regionDestination the regionDestination to set
099 */
100 public void setRegionDestination(Destination regionDestination) {
101 this.regionDestination = regionDestination;
102 }
103
104 /**
105 * @return the region
106 */
107 public Region getRegion() {
108 return this.region;
109 }
110
111 /**
112 * @param region the region to set
113 */
114 public void setRegion(Region region) {
115 this.region = region;
116 }
117
118 /**
119 * @return the producerState
120 */
121 public ProducerState getProducerState() {
122 return this.producerState;
123 }
124
125 /**
126 * @param producerState the producerState to set
127 */
128 public void setProducerState(ProducerState producerState) {
129 this.producerState = producerState;
130 }
131
132 /**
133 * Enforce duplicate suppression using info from persistence adapter
134 * @param messageSend
135 * @return false if message should be ignored as a duplicate
136 */
137 public boolean canDispatch(Message messageSend) {
138 boolean canDispatch = true;
139 if (auditProducerSequenceIds && messageSend.isPersistent()) {
140 final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
141 if (isNetworkProducer) {
142 // messages are multiplexed on this producer so we need to query the persistenceAdapter
143 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
144 if (producerSequenceId <= lastStoredForMessageProducer) {
145 canDispatch = false;
146 if (LOG.isDebugEnabled()) {
147 LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] from network producer with producerSequenceId ["
148 + producerSequenceId + "] less than last stored: " + lastStoredForMessageProducer);
149 }
150 }
151 } else if (producerSequenceId <= lastSendSequenceNumber.get()) {
152 canDispatch = false;
153 if (LOG.isDebugEnabled()) {
154 LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] with producerSequenceId ["
155 + producerSequenceId + "] less than last stored: " + lastSendSequenceNumber);
156 }
157 } else {
158 // track current so we can suppress duplicates later in the stream
159 lastSendSequenceNumber.set(producerSequenceId);
160 }
161 }
162 return canDispatch;
163 }
164
165 private long getStoredSequenceIdForMessage(MessageId messageId) {
166 try {
167 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
168 } catch (IOException ignored) {
169 LOG.debug("Failed to determine last producer sequence id for: " +messageId, ignored);
170 }
171 return -1;
172 }
173
174 public void setLastStoredSequenceId(long l) {
175 auditProducerSequenceIds = true;
176 if (connectionContext.isNetworkConnection()) {
177 brokerService = connectionContext.getBroker().getBrokerService();
178 isNetworkProducer = true;
179 }
180 lastSendSequenceNumber.set(l);
181 LOG.debug("last stored sequence id set: " + l);
182 }
183 }