001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.util.Set;
020 import java.util.concurrent.CopyOnWriteArraySet;
021 import java.util.concurrent.atomic.AtomicBoolean;
022 import java.util.concurrent.atomic.AtomicInteger;
023
024 import org.apache.activemq.command.ConsumerId;
025 import org.apache.activemq.command.ConsumerInfo;
026 import org.apache.activemq.command.NetworkBridgeFilter;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * Represents a network bridge interface
032 *
033 *
034 */
035 public class DemandSubscription {
036 private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class);
037
038 private final ConsumerInfo remoteInfo;
039 private final ConsumerInfo localInfo;
040 private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
041
042 private AtomicInteger dispatched = new AtomicInteger(0);
043 private AtomicBoolean activeWaiter = new AtomicBoolean();
044 private NetworkBridgeFilter networkBridgeFilter;
045
046 DemandSubscription(ConsumerInfo info) {
047 remoteInfo = info;
048 localInfo = info.copy();
049 localInfo.setNetworkSubscription(true);
050 remoteSubsIds.add(info.getConsumerId());
051 }
052
053 /**
054 * Increment the consumers associated with this subscription
055 *
056 * @param id
057 * @return true if added
058 */
059 public boolean add(ConsumerId id) {
060 return remoteSubsIds.add(id);
061 }
062
063 /**
064 * Increment the consumers associated with this subscription
065 *
066 * @param id
067 * @return true if removed
068 */
069 public boolean remove(ConsumerId id) {
070 return remoteSubsIds.remove(id);
071 }
072
073 /**
074 * @return true if there are no interested consumers
075 */
076 public boolean isEmpty() {
077 return remoteSubsIds.isEmpty();
078 }
079
080 public int size() {
081 return remoteSubsIds.size();
082 }
083 /**
084 * @return Returns the localInfo.
085 */
086 public ConsumerInfo getLocalInfo() {
087 return localInfo;
088 }
089
090 /**
091 * @return Returns the remoteInfo.
092 */
093 public ConsumerInfo getRemoteInfo() {
094 return remoteInfo;
095 }
096
097 public void waitForCompletion() {
098 if (dispatched.get() > 0) {
099 if (LOG.isDebugEnabled()) {
100 LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get());
101 }
102 activeWaiter.set(true);
103 if (dispatched.get() > 0) {
104 synchronized (activeWaiter) {
105 try {
106 activeWaiter.wait();
107 } catch (InterruptedException ignored) {
108 }
109 }
110 if (this.dispatched.get() > 0) {
111 LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried");
112 }
113 }
114 }
115 }
116
117 public void decrementOutstandingResponses() {
118 if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
119 synchronized (activeWaiter) {
120 activeWaiter.notifyAll();
121 }
122 }
123 }
124
125 public boolean incrementOutstandingResponses() {
126 dispatched.incrementAndGet();
127 if (activeWaiter.get()) {
128 decrementOutstandingResponses();
129 return false;
130 }
131 return true;
132 }
133
134 public NetworkBridgeFilter getNetworkBridgeFilter() {
135 return networkBridgeFilter;
136 }
137
138 public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) {
139 this.networkBridgeFilter = networkBridgeFilter;
140 }
141 }