001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region;
018
019 import java.util.HashMap;
020 import java.util.HashSet;
021 import java.util.Map;
022 import java.util.Set;
023 import java.util.Timer;
024 import java.util.TimerTask;
025
026 import org.apache.activemq.broker.ConnectionContext;
027 import org.apache.activemq.command.ActiveMQDestination;
028 import org.apache.activemq.thread.TaskRunnerFactory;
029 import org.apache.activemq.usage.SystemUsage;
030 import org.slf4j.Logger;
031 import org.slf4j.LoggerFactory;
032
033 /**
034 *
035 */
036 public abstract class AbstractTempRegion extends AbstractRegion {
037 private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class);
038
039 private Map<CachedDestination, Destination> cachedDestinations = new HashMap<CachedDestination, Destination>();
040 private final boolean doCacheTempDestinations;
041 private final int purgeTime;
042 private Timer purgeTimer;
043 private TimerTask purgeTask;
044
045
046 /**
047 * @param broker
048 * @param destinationStatistics
049 * @param memoryManager
050 * @param taskRunnerFactory
051 * @param destinationFactory
052 */
053 public AbstractTempRegion(RegionBroker broker,
054 DestinationStatistics destinationStatistics,
055 SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
056 DestinationFactory destinationFactory) {
057 super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
058 destinationFactory);
059 this.doCacheTempDestinations=broker.getBrokerService().isCacheTempDestinations();
060 this.purgeTime = broker.getBrokerService().getTimeBeforePurgeTempDestinations();
061 if (this.doCacheTempDestinations) {
062 this.purgeTimer = new Timer("ActiveMQ Temp destination purge timer", true);
063 this.purgeTask = new TimerTask() {
064 public void run() {
065 doPurge();
066 }
067
068 };
069 this.purgeTimer.schedule(purgeTask, purgeTime, purgeTime);
070 }
071
072 }
073
074 public void stop() throws Exception {
075 super.stop();
076 if (purgeTimer != null) {
077 purgeTimer.cancel();
078 }
079 }
080
081 protected abstract Destination doCreateDestination(
082 ConnectionContext context, ActiveMQDestination destination)
083 throws Exception;
084
085 protected synchronized Destination createDestination(
086 ConnectionContext context, ActiveMQDestination destination)
087 throws Exception {
088 Destination result = cachedDestinations.remove(new CachedDestination(
089 destination));
090 if (result == null) {
091 result = doCreateDestination(context, destination);
092 }
093 return result;
094 }
095
096 protected final synchronized void dispose(ConnectionContext context,
097 Destination dest) throws Exception {
098 // add to cache
099 if (this.doCacheTempDestinations) {
100 cachedDestinations.put(new CachedDestination(dest
101 .getActiveMQDestination()), dest);
102 }else {
103 try {
104 dest.dispose(context);
105 dest.stop();
106 } catch (Exception e) {
107 LOG.warn("Failed to dispose of " + dest, e);
108 }
109 }
110 }
111
112 private void doDispose(Destination dest) {
113 ConnectionContext context = new ConnectionContext();
114 try {
115 dest.dispose(context);
116 dest.stop();
117 } catch (Exception e) {
118 LOG.warn("Failed to dispose of " + dest, e);
119 }
120
121 }
122
123 private synchronized void doPurge() {
124 long currentTime = System.currentTimeMillis();
125 if (cachedDestinations.size() > 0) {
126 Set<CachedDestination> tmp = new HashSet<CachedDestination>(
127 cachedDestinations.keySet());
128 for (CachedDestination key : tmp) {
129 if ((key.timeStamp + purgeTime) < currentTime) {
130 Destination dest = cachedDestinations.remove(key);
131 if (dest != null) {
132 doDispose(dest);
133 }
134 }
135 }
136 }
137 }
138
139 static class CachedDestination {
140 long timeStamp;
141
142 ActiveMQDestination destination;
143
144 CachedDestination(ActiveMQDestination destination) {
145 this.destination = destination;
146 this.timeStamp = System.currentTimeMillis();
147 }
148
149 public int hashCode() {
150 return destination.hashCode();
151 }
152
153 public boolean equals(Object o) {
154 if (o instanceof CachedDestination) {
155 CachedDestination other = (CachedDestination) o;
156 return other.destination.equals(this.destination);
157 }
158 return false;
159 }
160
161 }
162
163 }