001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.policy;
018
019 import java.util.concurrent.atomic.AtomicLong;
020 import javax.jms.JMSException;
021 import javax.jms.Message;
022 import javax.jms.MessageListener;
023 import org.apache.activemq.ActiveMQMessageTransformation;
024 import org.apache.activemq.broker.Broker;
025 import org.apache.activemq.broker.ConnectionContext;
026 import org.apache.activemq.broker.region.Destination;
027 import org.apache.activemq.broker.region.MessageReference;
028 import org.apache.activemq.broker.region.SubscriptionRecovery;
029 import org.apache.activemq.broker.region.Topic;
030 import org.apache.activemq.command.ActiveMQDestination;
031 import org.apache.activemq.command.ActiveMQMessage;
032 import org.apache.activemq.command.ConnectionId;
033 import org.apache.activemq.command.MessageId;
034 import org.apache.activemq.command.ProducerId;
035 import org.apache.activemq.command.SessionId;
036 import org.apache.activemq.util.IdGenerator;
037 import org.slf4j.Logger;
038 import org.slf4j.LoggerFactory;
039
040 /**
041 * This implementation of {@link SubscriptionRecoveryPolicy} will perform a user
042 * specific query mechanism to load any messages they may have missed.
043 *
044 * @org.apache.xbean.XBean
045 *
046 */
047 public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
048
049 private static final Logger LOG = LoggerFactory.getLogger(QueryBasedSubscriptionRecoveryPolicy.class);
050
051 private MessageQuery query;
052 private final AtomicLong messageSequence = new AtomicLong(0);
053 private final IdGenerator idGenerator = new IdGenerator();
054 private final ProducerId producerId = createProducerId();
055
056 public SubscriptionRecoveryPolicy copy() {
057 QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy();
058 rc.setQuery(query);
059 return rc;
060 }
061
062 public boolean add(ConnectionContext context, MessageReference message) throws Exception {
063 return query.validateUpdate(message.getMessage());
064 }
065
066 public void recover(final ConnectionContext context, final Topic topic, final SubscriptionRecovery sub) throws Exception {
067 if (query != null) {
068 ActiveMQDestination destination = sub.getActiveMQDestination();
069 query.execute(destination, new MessageListener() {
070
071 public void onMessage(Message message) {
072 dispatchInitialMessage(message, topic, context, sub);
073 }
074 });
075 }
076 }
077
078 public void start() throws Exception {
079 if (query == null) {
080 throw new IllegalArgumentException("No query property configured");
081 }
082 }
083
084 public void stop() throws Exception {
085 }
086
087 public MessageQuery getQuery() {
088 return query;
089 }
090
091 /**
092 * Sets the query strategy to load initial messages
093 */
094 public void setQuery(MessageQuery query) {
095 this.query = query;
096 }
097
098 public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception {
099 return new org.apache.activemq.command.Message[0];
100 }
101
102 public void setBroker(Broker broker) {
103 }
104
105 protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) {
106 try {
107 ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null);
108 ActiveMQDestination destination = activeMessage.getDestination();
109 if (destination == null) {
110 destination = sub.getActiveMQDestination();
111 activeMessage.setDestination(destination);
112 }
113 activeMessage.setRegionDestination(regionDestination);
114 configure(activeMessage);
115 sub.addRecoveredMessage(context, activeMessage);
116 } catch (Throwable e) {
117 LOG.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e);
118 }
119 }
120
121 protected void configure(ActiveMQMessage msg) throws JMSException {
122 long sequenceNumber = messageSequence.incrementAndGet();
123 msg.setMessageId(new MessageId(producerId, sequenceNumber));
124 msg.onSend();
125 msg.setProducerId(producerId);
126 }
127
128 protected ProducerId createProducerId() {
129 String id = idGenerator.generateId();
130 ConnectionId connectionId = new ConnectionId(id);
131 SessionId sessionId = new SessionId(connectionId, 1);
132 return new ProducerId(sessionId, 1);
133 }
134 }