001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.usage;
018
019 import java.util.ArrayList;
020 import java.util.Iterator;
021 import java.util.LinkedList;
022 import java.util.List;
023 import java.util.concurrent.CopyOnWriteArrayList;
024 import java.util.concurrent.ThreadPoolExecutor;
025 import java.util.concurrent.atomic.AtomicBoolean;
026 import org.apache.activemq.Service;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * Used to keep track of how much of something is being used so that a
032 * productive working set usage can be controlled. Main use case is manage
033 * memory usage.
034 *
035 * @org.apache.xbean.XBean
036 *
037 */
038 public abstract class Usage<T extends Usage> implements Service {
039
040 private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
041 protected final Object usageMutex = new Object();
042 protected int percentUsage;
043 protected T parent;
044 private UsageCapacity limiter = new DefaultUsageCapacity();
045 private int percentUsageMinDelta = 1;
046 private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
047 private final boolean debug = LOG.isDebugEnabled();
048 protected String name;
049 private float usagePortion = 1.0f;
050 private final List<T> children = new CopyOnWriteArrayList<T>();
051 private final List<Runnable> callbacks = new LinkedList<Runnable>();
052 private int pollingTime = 100;
053 private final AtomicBoolean started=new AtomicBoolean();
054 private ThreadPoolExecutor executor;
055 public Usage(T parent, String name, float portion) {
056 this.parent = parent;
057 this.usagePortion = portion;
058 if (parent != null) {
059 this.limiter.setLimit((long)(parent.getLimit() * portion));
060 name = parent.name + ":" + name;
061 }
062 this.name = name;
063 }
064
065 protected abstract long retrieveUsage();
066
067 /**
068 * @throws InterruptedException
069 */
070 public void waitForSpace() throws InterruptedException {
071 waitForSpace(0);
072 }
073
074 public boolean waitForSpace(long timeout) throws InterruptedException {
075 return waitForSpace(timeout, 100);
076 }
077
078 /**
079 * @param timeout
080 * @throws InterruptedException
081 * @return true if space
082 */
083 public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
084 if (parent != null) {
085 if (!parent.waitForSpace(timeout, highWaterMark)) {
086 return false;
087 }
088 }
089 synchronized (usageMutex) {
090 percentUsage=caclPercentUsage();
091 if (percentUsage >= highWaterMark) {
092 long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
093 long timeleft = deadline;
094 while (timeleft > 0) {
095 percentUsage=caclPercentUsage();
096 if (percentUsage >= highWaterMark) {
097 usageMutex.wait(pollingTime);
098 timeleft = deadline - System.currentTimeMillis();
099 } else {
100 break;
101 }
102 }
103 }
104 return percentUsage < highWaterMark;
105 }
106 }
107
108 public boolean isFull() {
109 return isFull(100);
110 }
111
112 public boolean isFull(int highWaterMark) {
113 if (parent != null && parent.isFull(highWaterMark)) {
114 return true;
115 }
116 synchronized (usageMutex) {
117 percentUsage=caclPercentUsage();
118 return percentUsage >= highWaterMark;
119 }
120 }
121
122 public void addUsageListener(UsageListener listener) {
123 listeners.add(listener);
124 }
125
126 public void removeUsageListener(UsageListener listener) {
127 listeners.remove(listener);
128 }
129
130 public long getLimit() {
131 synchronized (usageMutex) {
132 return limiter.getLimit();
133 }
134 }
135
136 /**
137 * Sets the memory limit in bytes. Setting the limit in bytes will set the
138 * usagePortion to 0 since the UsageManager is not going to be portion based
139 * off the parent.
140 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
141 *
142 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
143 */
144 public void setLimit(long limit) {
145 if (percentUsageMinDelta < 0) {
146 throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
147 }
148 synchronized (usageMutex) {
149 this.limiter.setLimit(limit);
150 this.usagePortion = 0;
151 }
152 onLimitChange();
153 }
154
155 protected void onLimitChange() {
156 // We may need to calculate the limit
157 if (usagePortion > 0 && parent != null) {
158 synchronized (usageMutex) {
159 this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
160 }
161 }
162 // Reset the percent currently being used.
163 int percentUsage;
164 synchronized (usageMutex) {
165 percentUsage = caclPercentUsage();
166 }
167 setPercentUsage(percentUsage);
168 // Let the children know that the limit has changed. They may need to
169 // set
170 // their limits based on ours.
171 for (T child : children) {
172 child.onLimitChange();
173 }
174 }
175
176 public float getUsagePortion() {
177 synchronized (usageMutex) {
178 return usagePortion;
179 }
180 }
181
182 public void setUsagePortion(float usagePortion) {
183 synchronized (usageMutex) {
184 this.usagePortion = usagePortion;
185 }
186 onLimitChange();
187 }
188
189 public int getPercentUsage() {
190 synchronized (usageMutex) {
191 return percentUsage;
192 }
193 }
194
195 public int getPercentUsageMinDelta() {
196 synchronized (usageMutex) {
197 return percentUsageMinDelta;
198 }
199 }
200
201 /**
202 * Sets the minimum number of percentage points the usage has to change
203 * before a UsageListener event is fired by the manager.
204 *
205 * @param percentUsageMinDelta
206 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
207 */
208 public void setPercentUsageMinDelta(int percentUsageMinDelta) {
209 if (percentUsageMinDelta < 1) {
210 throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
211 }
212 int percentUsage;
213 synchronized (usageMutex) {
214 this.percentUsageMinDelta = percentUsageMinDelta;
215 percentUsage = caclPercentUsage();
216 }
217 setPercentUsage(percentUsage);
218 }
219
220 public long getUsage() {
221 synchronized (usageMutex) {
222 return retrieveUsage();
223 }
224 }
225
226 protected void setPercentUsage(int value) {
227 synchronized (usageMutex) {
228 int oldValue = percentUsage;
229 percentUsage = value;
230 if (oldValue != value) {
231 fireEvent(oldValue, value);
232 }
233 }
234 }
235
236 protected int caclPercentUsage() {
237 if (limiter.getLimit() == 0) {
238 return 0;
239 }
240 return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
241 }
242
243 private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
244 if (debug) {
245 LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: "
246 + newPercentUsage + "% of available memory");
247 }
248 if (started.get()) {
249 // Switching from being full to not being full..
250 if (oldPercentUsage >= 100 && newPercentUsage < 100) {
251 synchronized (usageMutex) {
252 usageMutex.notifyAll();
253 if (!callbacks.isEmpty()) {
254 for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
255 Runnable callback = iter.next();
256 getExecutor().execute(callback);
257 }
258 callbacks.clear();
259 }
260 }
261 }
262 if (!listeners.isEmpty()) {
263 // Let the listeners know on a separate thread
264 Runnable listenerNotifier = new Runnable() {
265 public void run() {
266 for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
267 UsageListener l = iter.next();
268 l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
269 }
270 }
271 };
272 if (started.get()) {
273 getExecutor().execute(listenerNotifier);
274 } else {
275 LOG.warn("Not notifying memory usage change to listeners on shutdown");
276 }
277 }
278 }
279 }
280
281 public String getName() {
282 return name;
283 }
284
285 @Override
286 public String toString() {
287 return "Usage(" + getName() + ") percentUsage=" + percentUsage
288 + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
289 + ", percentUsageMinDelta=" + percentUsageMinDelta + "%"
290 + (parent != null ? ";Parent:" + parent.toString() : "");
291 }
292
293 @SuppressWarnings("unchecked")
294 public void start() {
295 if (started.compareAndSet(false, true)){
296 if (parent != null) {
297 parent.addChild(this);
298 }
299 for (T t:children) {
300 t.start();
301 }
302 }
303 }
304
305 @SuppressWarnings("unchecked")
306 public void stop() {
307 if (started.compareAndSet(true, false)){
308 if (parent != null) {
309 parent.removeChild(this);
310 }
311
312 //clear down any callbacks
313 synchronized (usageMutex) {
314 usageMutex.notifyAll();
315 for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
316 Runnable callback = iter.next();
317 callback.run();
318 }
319 this.callbacks.clear();
320 }
321 for (T t:children) {
322 t.stop();
323 }
324 }
325 }
326
327 protected void addChild(T child) {
328 children.add(child);
329 if (started.get()) {
330 child.start();
331 }
332 }
333
334 protected void removeChild(T child) {
335 children.remove(child);
336 }
337
338 /**
339 * @param callback
340 * @return true if the UsageManager was full. The callback will only be
341 * called if this method returns true.
342 */
343 public boolean notifyCallbackWhenNotFull(final Runnable callback) {
344 if (parent != null) {
345 Runnable r = new Runnable() {
346
347 public void run() {
348 synchronized (usageMutex) {
349 if (percentUsage >= 100) {
350 callbacks.add(callback);
351 } else {
352 callback.run();
353 }
354 }
355 }
356 };
357 if (parent.notifyCallbackWhenNotFull(r)) {
358 return true;
359 }
360 }
361 synchronized (usageMutex) {
362 if (percentUsage >= 100) {
363 callbacks.add(callback);
364 return true;
365 } else {
366 return false;
367 }
368 }
369 }
370
371 /**
372 * @return the limiter
373 */
374 public UsageCapacity getLimiter() {
375 return this.limiter;
376 }
377
378 /**
379 * @param limiter the limiter to set
380 */
381 public void setLimiter(UsageCapacity limiter) {
382 this.limiter = limiter;
383 }
384
385 /**
386 * @return the pollingTime
387 */
388 public int getPollingTime() {
389 return this.pollingTime;
390 }
391
392 /**
393 * @param pollingTime the pollingTime to set
394 */
395 public void setPollingTime(int pollingTime) {
396 this.pollingTime = pollingTime;
397 }
398
399 public void setName(String name) {
400 this.name = name;
401 }
402
403 public T getParent() {
404 return parent;
405 }
406
407 public void setParent(T parent) {
408 this.parent = parent;
409 }
410
411 public void setExecutor (ThreadPoolExecutor executor) {
412 this.executor = executor;
413 }
414 public ThreadPoolExecutor getExecutor() {
415 return executor;
416 }
417 }