001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.plugin;
018
019 import java.io.File;
020 import java.io.FileInputStream;
021 import java.io.FileOutputStream;
022 import java.io.IOException;
023 import java.io.ObjectInputStream;
024 import java.io.ObjectOutputStream;
025 import java.util.concurrent.ConcurrentHashMap;
026
027 import org.apache.activemq.broker.Broker;
028 import org.apache.activemq.broker.BrokerFilter;
029 import org.apache.activemq.broker.ConnectionContext;
030 import org.apache.activemq.broker.region.Subscription;
031 import org.apache.activemq.command.ConsumerInfo;
032 import org.slf4j.Logger;
033 import org.slf4j.LoggerFactory;
034
035 /**
036 * A plugin which allows the caching of the selector from a subscription queue.
037 * <p/>
038 * This stops the build-up of unwanted messages, especially when consumers may
039 * disconnect from time to time when using virtual destinations.
040 * <p/>
041 * This is influenced by code snippets developed by Maciej Rakowicz
042 *
043 * @author Roelof Naude roelof(dot)naude(at)gmail.com
044 * @see https://issues.apache.org/activemq/browse/AMQ-3004
045 * @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
046 */
047 public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
048 private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
049
050 /**
051 * The subscription's selector cache. We cache compiled expressions keyed
052 * by the target destination.
053 */
054 private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>();
055
056 private final File persistFile;
057
058 private boolean running = true;
059 private Thread persistThread;
060 private static final long MAX_PERSIST_INTERVAL = 600000;
061 private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread";
062
063 /**
064 * Constructor
065 */
066 public SubQueueSelectorCacheBroker(Broker next, final File persistFile) {
067 super(next);
068 this.persistFile = persistFile;
069 LOG.info("Using persisted selector cache from[" + persistFile + "]");
070
071 readCache();
072
073 persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME);
074 persistThread.start();
075 }
076
077 @Override
078 public void stop() throws Exception {
079 running = false;
080 if (persistThread != null) {
081 persistThread.interrupt();
082 persistThread.join();
083 } //if
084 }
085
086 @Override
087 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
088 LOG.debug("Caching consumer selector [" + info.getSelector() + "] on a " + info.getDestination().getQualifiedName());
089 if (info.getSelector() != null) {
090 subSelectorCache.put(info.getDestination().getQualifiedName(), info.getSelector());
091 } //if
092 return super.addConsumer(context, info);
093 }
094
095 private void readCache() {
096 if (persistFile != null && persistFile.exists()) {
097 try {
098 FileInputStream fis = new FileInputStream(persistFile);
099 try {
100 ObjectInputStream in = new ObjectInputStream(fis);
101 try {
102 subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject();
103 } catch (ClassNotFoundException ex) {
104 LOG.error("Invalid selector cache data found. Please remove file.", ex);
105 } finally {
106 in.close();
107 } //try
108 } finally {
109 fis.close();
110 } //try
111 } catch (IOException ex) {
112 LOG.error("Unable to read persisted selector cache...it will be ignored!", ex);
113 } //try
114 } //if
115 }
116
117 /**
118 * Persist the selector cache.
119 */
120 private void persistCache() {
121 LOG.debug("Persisting selector cache....");
122 try {
123 FileOutputStream fos = new FileOutputStream(persistFile);
124 try {
125 ObjectOutputStream out = new ObjectOutputStream(fos);
126 try {
127 out.writeObject(subSelectorCache);
128 } finally {
129 out.flush();
130 out.close();
131 } //try
132 } catch (IOException ex) {
133 LOG.error("Unable to persist selector cache", ex);
134 } finally {
135 fos.close();
136 } //try
137 } catch (IOException ex) {
138 LOG.error("Unable to access file[" + persistFile + "]", ex);
139 } //try
140 }
141
142 /**
143 * @return The JMS selector for the specified {@code destination}
144 */
145 public String getSelector(final String destination) {
146 return subSelectorCache.get(destination);
147 }
148
149 /**
150 * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms.
151 *
152 * @see java.lang.Runnable#run()
153 */
154 public void run() {
155 while (running) {
156 try {
157 Thread.sleep(MAX_PERSIST_INTERVAL);
158 } catch (InterruptedException ex) {
159 } //try
160
161 persistCache();
162 }
163 }
164 }
165