001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.scheduler;
018
019 import java.io.File;
020 import java.util.concurrent.atomic.AtomicBoolean;
021
022 import org.apache.activemq.ScheduledMessage;
023 import org.apache.activemq.advisory.AdvisorySupport;
024 import org.apache.activemq.broker.Broker;
025 import org.apache.activemq.broker.BrokerFilter;
026 import org.apache.activemq.broker.ConnectionContext;
027 import org.apache.activemq.broker.ProducerBrokerExchange;
028 import org.apache.activemq.command.ActiveMQDestination;
029 import org.apache.activemq.command.Message;
030 import org.apache.activemq.command.MessageId;
031 import org.apache.activemq.command.ProducerId;
032 import org.apache.activemq.command.ProducerInfo;
033 import org.apache.activemq.openwire.OpenWireFormat;
034 import org.apache.activemq.security.SecurityContext;
035 import org.apache.activemq.state.ProducerState;
036 import org.apache.activemq.util.IdGenerator;
037 import org.apache.activemq.util.LongSequenceGenerator;
038 import org.apache.activemq.util.TypeConversionSupport;
039 import org.apache.activemq.wireformat.WireFormat;
040 import org.slf4j.Logger;
041 import org.slf4j.LoggerFactory;
042 import org.apache.kahadb.util.ByteSequence;
043
044 public class SchedulerBroker extends BrokerFilter implements JobListener {
045 private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
046 private static final IdGenerator ID_GENERATOR = new IdGenerator();
047 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
048 private final AtomicBoolean started = new AtomicBoolean();
049 private final WireFormat wireFormat = new OpenWireFormat();
050 private final ConnectionContext context = new ConnectionContext();
051 private final ProducerId producerId = new ProducerId();
052 private File directory;
053
054 private JobSchedulerStore store;
055 private JobScheduler scheduler;
056
057 public SchedulerBroker(Broker next, File directory) throws Exception {
058 super(next);
059 this.directory = directory;
060 this.producerId.setConnectionId(ID_GENERATOR.generateId());
061 this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
062 context.setBroker(next);
063 LOG.info("Scheduler using directory: " + directory);
064
065 }
066
067 public synchronized JobScheduler getJobScheduler() throws Exception {
068 return new JobSchedulerFacade(this);
069 }
070
071 /**
072 * @return the directory
073 */
074 public File getDirectory() {
075 return this.directory;
076 }
077 /**
078 * @param directory
079 * the directory to set
080 */
081 public void setDirectory(File directory) {
082 this.directory = directory;
083 }
084
085 @Override
086 public void start() throws Exception {
087 this.started.set(true);
088 getInternalScheduler();
089 super.start();
090 }
091
092 @Override
093 public void stop() throws Exception {
094 if (this.started.compareAndSet(true, false)) {
095
096 if (this.store != null) {
097 this.store.stop();
098 }
099 if (this.scheduler != null) {
100 this.scheduler.removeListener(this);
101 this.scheduler = null;
102 }
103 }
104 super.stop();
105 }
106
107 @Override
108 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
109 long delay = 0;
110 long period = 0;
111 int repeat = 0;
112 String cronEntry = "";
113 String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
114 Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
115 Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
116 Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
117
118 String physicalName = messageSend.getDestination().getPhysicalName();
119 boolean schedularManage = physicalName.regionMatches(true, 0,
120 ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
121 ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
122
123 if (schedularManage == true) {
124
125 JobScheduler scheduler = getInternalScheduler();
126 ActiveMQDestination replyTo = messageSend.getReplyTo();
127
128 String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
129
130 if (action != null ) {
131
132 Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
133 Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
134
135 if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
136
137 if( startTime != null && endTime != null ) {
138
139 long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
140 long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
141
142 for (Job job : scheduler.getAllJobs(start, finish)) {
143 sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
144 }
145 } else {
146 for (Job job : scheduler.getAllJobs()) {
147 sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
148 }
149 }
150 }
151 if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
152 scheduler.remove(jobId);
153 } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
154
155 if( startTime != null && endTime != null ) {
156
157 long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
158 long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
159
160 scheduler.removeAllJobs(start, finish);
161 } else {
162 scheduler.removeAllJobs();
163 }
164 }
165 }
166
167 } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
168 //clear transaction context
169 Message msg = messageSend.copy();
170 msg.setTransactionId(null);
171 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
172 if (cronValue != null) {
173 cronEntry = cronValue.toString();
174 }
175 if (periodValue != null) {
176 period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
177 }
178 if (delayValue != null) {
179 delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
180 }
181 Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
182 if (repeatValue != null) {
183 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
184 }
185 getInternalScheduler().schedule(msg.getMessageId().toString(),
186 new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);
187
188 } else {
189 super.send(producerExchange, messageSend);
190 }
191 }
192
193 public void scheduledJob(String id, ByteSequence job) {
194 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job
195 .getOffset(), job.getLength());
196 try {
197 Message messageSend = (Message) this.wireFormat.unmarshal(packet);
198 messageSend.setOriginalTransactionId(null);
199 Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
200 Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
201 String cronStr = cronValue != null ? cronValue.toString() : null;
202 int repeat = 0;
203 if (repeatValue != null) {
204 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
205 }
206
207 if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
208 // create a unique id - the original message could be sent
209 // lots of times
210 messageSend.setMessageId(
211 new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
212 }
213
214 // Add the jobId as a property
215 messageSend.setProperty("scheduledJobId", id);
216
217 // if this goes across a network - we don't want it rescheduled
218 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
219 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
220 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
221 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
222
223 if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) {
224
225 long oldExpiration = messageSend.getExpiration();
226 long newTimeStamp = System.currentTimeMillis();
227 long timeToLive = 0;
228 long oldTimestamp = messageSend.getTimestamp();
229
230 if (oldExpiration > 0) {
231 timeToLive = oldExpiration - oldTimestamp;
232 }
233
234 long expiration = timeToLive + newTimeStamp;
235
236 if(expiration > oldExpiration) {
237 if (timeToLive > 0 && expiration > 0) {
238 messageSend.setExpiration(expiration);
239 }
240 messageSend.setTimestamp(newTimeStamp);
241 if (LOG.isDebugEnabled()) {
242 LOG.debug("Set message " + messageSend.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
243 }
244 }
245 }
246
247 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
248 producerExchange.setConnectionContext(context);
249 producerExchange.setMutable(true);
250 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
251 super.send(producerExchange, messageSend);
252 } catch (Exception e) {
253 LOG.error("Failed to send scheduled message " + id, e);
254 }
255 }
256
257 protected synchronized JobScheduler getInternalScheduler() throws Exception {
258 if (this.started.get()) {
259 if (this.scheduler == null) {
260 this.scheduler = getStore().getJobScheduler("JMS");
261 this.scheduler.addListener(this);
262 }
263 return this.scheduler;
264 }
265 return null;
266 }
267
268 private JobSchedulerStore getStore() throws Exception {
269 if (started.get()) {
270 if (this.store == null) {
271 this.store = new JobSchedulerStore();
272 this.store.setDirectory(directory);
273 this.store.start();
274 }
275 return this.store;
276 }
277 return null;
278 }
279
280 protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
281 throws Exception {
282
283 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
284 try {
285 Message msg = (Message) this.wireFormat.unmarshal(packet);
286 msg.setOriginalTransactionId(null);
287 msg.setPersistent(false);
288 msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
289 msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
290 msg.setDestination(replyTo);
291 msg.setResponseRequired(false);
292 msg.setProducerId(this.producerId);
293
294 // Add the jobId as a property
295 msg.setProperty("scheduledJobId", job.getJobId());
296
297 final boolean originalFlowControl = context.isProducerFlowControl();
298 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
299 producerExchange.setConnectionContext(context);
300 producerExchange.setMutable(true);
301 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
302 try {
303 context.setProducerFlowControl(false);
304 this.next.send(producerExchange, msg);
305 } finally {
306 context.setProducerFlowControl(originalFlowControl);
307 }
308 } catch (Exception e) {
309 LOG.error("Failed to send scheduled message " + job.getJobId(), e);
310 }
311
312 }
313 }