001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.thread;
018
019 import java.util.concurrent.Executor;
020
021 /**
022 *
023 */
024 public class DeterministicTaskRunner implements TaskRunner {
025
026 private final Executor executor;
027 private final Task task;
028 private final Runnable runable;
029 private boolean shutdown;
030
031 /**Constructor
032 * @param executor
033 * @param task
034 */
035 public DeterministicTaskRunner(Executor executor, Task task) {
036 this.executor = executor;
037 this.task = task;
038 this.runable = new Runnable() {
039 public void run() {
040 Thread.currentThread();
041 runTask();
042 }
043 };
044 }
045
046 /**
047 * We Expect MANY wakeup calls on the same TaskRunner - but each
048 * needs to run
049 */
050 public void wakeup() throws InterruptedException {
051 synchronized (runable) {
052
053 if (shutdown) {
054 return;
055 }
056 executor.execute(runable);
057
058 }
059 }
060
061 /**
062 * shut down the task
063 *
064 * @throws InterruptedException
065 */
066 public void shutdown(long timeout) throws InterruptedException {
067 synchronized (runable) {
068 shutdown = true;
069 }
070 }
071
072 public void shutdown() throws InterruptedException {
073 shutdown(0);
074 }
075
076 final void runTask() {
077
078 synchronized (runable) {
079 if (shutdown) {
080 runable.notifyAll();
081 return;
082 }
083 }
084 task.iterate();
085 }
086 }