001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.policy;
018
019 import org.apache.activemq.broker.region.DurableTopicSubscription;
020 import org.apache.activemq.broker.region.Subscription;
021 import org.apache.activemq.command.ActiveMQDestination;
022 import org.apache.activemq.command.ActiveMQQueue;
023 import org.apache.activemq.command.ActiveMQTopic;
024 import org.apache.activemq.command.Message;
025
026 /**
027 * A {@link DeadLetterStrategy} where each destination has its own individual
028 * DLQ using the subject naming hierarchy.
029 *
030 * @org.apache.xbean.XBean
031 *
032 */
033 public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
034
035 private String topicPrefix = "ActiveMQ.DLQ.Topic.";
036 private String queuePrefix = "ActiveMQ.DLQ.Queue.";
037 private boolean useQueueForQueueMessages = true;
038 private boolean useQueueForTopicMessages = true;
039 private boolean destinationPerDurableSubscriber;
040
041 public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
042 if (message.getDestination().isQueue()) {
043 return createDestination(message, queuePrefix, useQueueForQueueMessages, subscription);
044 } else {
045 return createDestination(message, topicPrefix, useQueueForTopicMessages, subscription);
046 }
047 }
048
049 // Properties
050 // -------------------------------------------------------------------------
051
052 public String getQueuePrefix() {
053 return queuePrefix;
054 }
055
056 /**
057 * Sets the prefix to use for all dead letter queues for queue messages
058 */
059 public void setQueuePrefix(String queuePrefix) {
060 this.queuePrefix = queuePrefix;
061 }
062
063 public String getTopicPrefix() {
064 return topicPrefix;
065 }
066
067 /**
068 * Sets the prefix to use for all dead letter queues for topic messages
069 */
070 public void setTopicPrefix(String topicPrefix) {
071 this.topicPrefix = topicPrefix;
072 }
073
074 public boolean isUseQueueForQueueMessages() {
075 return useQueueForQueueMessages;
076 }
077
078 /**
079 * Sets whether a queue or topic should be used for queue messages sent to a
080 * DLQ. The default is to use a Queue
081 */
082 public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
083 this.useQueueForQueueMessages = useQueueForQueueMessages;
084 }
085
086 public boolean isUseQueueForTopicMessages() {
087 return useQueueForTopicMessages;
088 }
089
090 /**
091 * Sets whether a queue or topic should be used for topic messages sent to a
092 * DLQ. The default is to use a Queue
093 */
094 public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
095 this.useQueueForTopicMessages = useQueueForTopicMessages;
096 }
097
098 public boolean isDestinationPerDurableSubscriber() {
099 return destinationPerDurableSubscriber;
100 }
101
102 /**
103 * sets whether durable topic subscriptions are to get individual dead letter destinations.
104 * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
105 * The default is false.
106 * @param destinationPerDurableSubscriber
107 */
108 public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
109 this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
110 }
111
112 // Implementation methods
113 // -------------------------------------------------------------------------
114 protected ActiveMQDestination createDestination(Message message,
115 String prefix,
116 boolean useQueue,
117 Subscription subscription ) {
118 String name = null;
119
120 if (message.getRegionDestination() != null
121 && message.getRegionDestination().getActiveMQDestination() != null
122 && message.getRegionDestination().getActiveMQDestination().getPhysicalName() != null
123 && !message.getRegionDestination().getActiveMQDestination().getPhysicalName().isEmpty()){
124 name = prefix + message.getRegionDestination().getActiveMQDestination().getPhysicalName();
125 } else {
126 name = prefix + message.getDestination().getPhysicalName();
127 }
128
129 if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
130 name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
131 }
132 if (useQueue) {
133 return new ActiveMQQueue(name);
134 } else {
135 return new ActiveMQTopic(name);
136 }
137 }
138 }