001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.util;
018
019 import org.apache.activemq.broker.BrokerPluginSupport;
020 import org.apache.activemq.broker.ProducerBrokerExchange;
021 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
022 import org.apache.activemq.command.ActiveMQDestination;
023 import org.apache.activemq.command.ActiveMQMessage;
024 import org.apache.activemq.command.Message;
025 import org.slf4j.Logger;
026 import org.slf4j.LoggerFactory;
027
028 /**
029 * A Broker interceptor which updates a JMS Client's timestamp on the message
030 * with a broker timestamp. Useful when the clocks on client machines are known
031 * to not be correct and you can only trust the time set on the broker machines.
032 *
033 * Enabling this plugin will break JMS compliance since the timestamp that the
034 * producer sees on the messages after as send() will be different from the
035 * timestamp the consumer will observe when he receives the message. This plugin
036 * is not enabled in the default ActiveMQ configuration.
037 *
038 * 2 new attributes have been added which will allow the administrator some override control
039 * over the expiration time for incoming messages:
040 *
041 * Attribute 'zeroExpirationOverride' can be used to apply an expiration
042 * time to incoming messages with no expiration defined (messages that would never expire)
043 *
044 * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
045 *
046 * @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
047 *
048 *
049 */
050 public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
051 private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class);
052 /**
053 * variable which (when non-zero) is used to override
054 * the expiration date for messages that arrive with
055 * no expiration date set (in Milliseconds).
056 */
057 long zeroExpirationOverride = 0;
058
059 /**
060 * variable which (when non-zero) is used to limit
061 * the expiration date (in Milliseconds).
062 */
063 long ttlCeiling = 0;
064
065 /**
066 * If true, the plugin will not update timestamp to past values
067 * False by default
068 */
069 boolean futureOnly = false;
070
071
072 /**
073 * if true, update timestamp even if message has passed through a network
074 * default false
075 */
076 boolean processNetworkMessages = false;
077
078 /**
079 * setter method for zeroExpirationOverride
080 */
081 public void setZeroExpirationOverride(long ttl)
082 {
083 this.zeroExpirationOverride = ttl;
084 }
085
086 /**
087 * setter method for ttlCeiling
088 */
089 public void setTtlCeiling(long ttlCeiling)
090 {
091 this.ttlCeiling = ttlCeiling;
092 }
093
094 public void setFutureOnly(boolean futureOnly) {
095 this.futureOnly = futureOnly;
096 }
097
098 public void setProcessNetworkMessages(Boolean processNetworkMessages) {
099 this.processNetworkMessages = processNetworkMessages;
100 }
101
102 @Override
103 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
104
105 if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
106 (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) {
107 // timestamp not been disabled and has not passed through a network or processNetworkMessages=true
108
109 long oldExpiration = message.getExpiration();
110 long newTimeStamp = System.currentTimeMillis();
111 long timeToLive = zeroExpirationOverride;
112 long oldTimestamp = message.getTimestamp();
113 if (oldExpiration > 0) {
114 timeToLive = oldExpiration - oldTimestamp;
115 }
116 if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) {
117 timeToLive = ttlCeiling;
118 }
119 long expiration = timeToLive + newTimeStamp;
120 // In the scenario that the Broker is behind the clients we never want to set the
121 // Timestamp and Expiration in the past
122 if(!futureOnly || (expiration > oldExpiration)) {
123 if (timeToLive > 0 && expiration > 0) {
124 message.setExpiration(expiration);
125 }
126 message.setTimestamp(newTimeStamp);
127 if (LOG.isDebugEnabled()) {
128 LOG.debug("Set message " + message.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
129 }
130 }
131 }
132 super.send(producerExchange, message);
133 }
134
135 private boolean isDestinationDLQ(Message message) {
136 DeadLetterStrategy deadLetterStrategy;
137 Message tmp;
138
139 if (message != null && message.getRegionDestination() != null) {
140 deadLetterStrategy = message.getRegionDestination().getDeadLetterStrategy();
141 if (deadLetterStrategy != null) {
142 // Cheap copy, since we only need two fields
143 tmp = new ActiveMQMessage();
144 tmp.setDestination(message.getOriginalDestination());
145 tmp.setRegionDestination(message.getRegionDestination());
146
147 // Determine if we are headed for a DLQ
148 ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(tmp, null);
149 if (deadLetterDestination.equals(message.getDestination())) {
150 return true;
151 }
152 }
153 }
154 return false;
155 }
156 }