001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.thread;
018
019 import java.util.concurrent.Executor;
020 import java.util.concurrent.ExecutorService;
021 import java.util.concurrent.SynchronousQueue;
022 import java.util.concurrent.ThreadFactory;
023 import java.util.concurrent.ThreadPoolExecutor;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicBoolean;
026 import java.util.concurrent.atomic.AtomicLong;
027
028 /**
029 * Manages the thread pool for long running tasks. Long running tasks are not
030 * always active but when they are active, they may need a few iterations of
031 * processing for them to become idle. The manager ensures that each task is
032 * processes but that no one task overtakes the system. This is kinda like
033 * cooperative multitasking.
034 *
035 * @org.apache.xbean.XBean
036 */
037 public class TaskRunnerFactory implements Executor {
038
039 private ExecutorService executor;
040 private int maxIterationsPerRun;
041 private String name;
042 private int priority;
043 private boolean daemon;
044 private AtomicLong id = new AtomicLong(0);
045 private boolean dedicatedTaskRunner;
046 private AtomicBoolean initDone = new AtomicBoolean(false);
047
048 public TaskRunnerFactory() {
049 this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
050 }
051
052 private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
053 this(name,priority,daemon,maxIterationsPerRun,false);
054 }
055
056 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
057 this.name = name;
058 this.priority = priority;
059 this.daemon = daemon;
060 this.maxIterationsPerRun = maxIterationsPerRun;
061 this.dedicatedTaskRunner = dedicatedTaskRunner;
062 }
063
064 public void init() {
065 if (initDone.compareAndSet(false, true)) {
066 // If your OS/JVM combination has a good thread model, you may want to
067 // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
068 if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
069 executor = null;
070 } else if (executor == null) {
071 executor = createDefaultExecutor();
072 }
073 }
074 }
075
076 public void shutdown() {
077 if (executor != null) {
078 executor.shutdownNow();
079 }
080 initDone.set(false);
081 }
082
083 public TaskRunner createTaskRunner(Task task, String name) {
084 init();
085 if (executor != null) {
086 return new PooledTaskRunner(executor, task, maxIterationsPerRun);
087 } else {
088 return new DedicatedTaskRunner(task, name, priority, daemon);
089 }
090 }
091
092 public void execute(Runnable runnable) {
093 execute(runnable, "ActiveMQ Task");
094 }
095
096 public void execute(Runnable runnable, String name) {
097 init();
098 if (executor != null) {
099 executor.execute(runnable);
100 } else {
101 new Thread(runnable, name + "-" + id.incrementAndGet()).start();
102 }
103 }
104
105 protected ExecutorService createDefaultExecutor() {
106 ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
107 public Thread newThread(Runnable runnable) {
108 Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
109 thread.setDaemon(daemon);
110 thread.setPriority(priority);
111 return thread;
112 }
113 });
114 return rc;
115 }
116
117 public ExecutorService getExecutor() {
118 return executor;
119 }
120
121 public void setExecutor(ExecutorService executor) {
122 this.executor = executor;
123 }
124
125 public int getMaxIterationsPerRun() {
126 return maxIterationsPerRun;
127 }
128
129 public void setMaxIterationsPerRun(int maxIterationsPerRun) {
130 this.maxIterationsPerRun = maxIterationsPerRun;
131 }
132
133 public String getName() {
134 return name;
135 }
136
137 public void setName(String name) {
138 this.name = name;
139 }
140
141 public int getPriority() {
142 return priority;
143 }
144
145 public void setPriority(int priority) {
146 this.priority = priority;
147 }
148
149 public boolean isDaemon() {
150 return daemon;
151 }
152
153 public void setDaemon(boolean daemon) {
154 this.daemon = daemon;
155 }
156
157 public boolean isDedicatedTaskRunner() {
158 return dedicatedTaskRunner;
159 }
160
161 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
162 this.dedicatedTaskRunner = dedicatedTaskRunner;
163 }
164 }