001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.reliable;
018
019 import java.io.IOException;
020 import java.util.SortedSet;
021 import java.util.TreeSet;
022
023 import org.apache.activemq.command.Command;
024 import org.apache.activemq.command.ReplayCommand;
025 import org.apache.activemq.command.Response;
026 import org.apache.activemq.openwire.CommandIdComparator;
027 import org.apache.activemq.transport.FutureResponse;
028 import org.apache.activemq.transport.ResponseCorrelator;
029 import org.apache.activemq.transport.Transport;
030 import org.apache.activemq.transport.udp.UdpTransport;
031 import org.slf4j.Logger;
032 import org.slf4j.LoggerFactory;
033
034 /**
035 * This interceptor deals with out of order commands together with being able to
036 * handle dropped commands and the re-requesting dropped commands.
037 *
038 *
039 */
040 public class ReliableTransport extends ResponseCorrelator {
041 private static final Logger LOG = LoggerFactory.getLogger(ReliableTransport.class);
042
043 private ReplayStrategy replayStrategy;
044 private SortedSet<Command> commands = new TreeSet<Command>(new CommandIdComparator());
045 private int expectedCounter = 1;
046 private int replayBufferCommandCount = 50;
047 private int requestTimeout = 2000;
048 private ReplayBuffer replayBuffer;
049 private Replayer replayer;
050 private UdpTransport udpTransport;
051
052 public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
053 super(next);
054 this.replayStrategy = replayStrategy;
055 }
056
057 public ReliableTransport(Transport next, UdpTransport udpTransport) throws IOException {
058 super(next, udpTransport.getSequenceGenerator());
059 this.udpTransport = udpTransport;
060 this.replayer = udpTransport.createReplayer();
061 }
062
063 /**
064 * Requests that a range of commands be replayed
065 */
066 public void requestReplay(int fromCommandId, int toCommandId) {
067 ReplayCommand replay = new ReplayCommand();
068 replay.setFirstNakNumber(fromCommandId);
069 replay.setLastNakNumber(toCommandId);
070 try {
071 oneway(replay);
072 } catch (IOException e) {
073 getTransportListener().onException(e);
074 }
075 }
076
077 public Object request(Object o) throws IOException {
078 final Command command = (Command)o;
079 FutureResponse response = asyncRequest(command, null);
080 while (true) {
081 Response result = response.getResult(requestTimeout);
082 if (result != null) {
083 return result;
084 }
085 onMissingResponse(command, response);
086 }
087 }
088
089 public Object request(Object o, int timeout) throws IOException {
090 final Command command = (Command)o;
091 FutureResponse response = asyncRequest(command, null);
092 while (timeout > 0) {
093 int time = timeout;
094 if (timeout > requestTimeout) {
095 time = requestTimeout;
096 }
097 Response result = response.getResult(time);
098 if (result != null) {
099 return result;
100 }
101 onMissingResponse(command, response);
102 timeout -= time;
103 }
104 return response.getResult(0);
105 }
106
107 public void onCommand(Object o) {
108 Command command = (Command)o;
109 // lets pass wireformat through
110 if (command.isWireFormatInfo()) {
111 super.onCommand(command);
112 return;
113 } else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
114 replayCommands((ReplayCommand)command);
115 return;
116 }
117
118 int actualCounter = command.getCommandId();
119 boolean valid = expectedCounter == actualCounter;
120
121 if (!valid) {
122 synchronized (commands) {
123 int nextCounter = actualCounter;
124 boolean empty = commands.isEmpty();
125 if (!empty) {
126 Command nextAvailable = commands.first();
127 nextCounter = nextAvailable.getCommandId();
128 }
129
130 try {
131 boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter);
132
133 if (keep) {
134 // lets add it to the list for later on
135 if (LOG.isDebugEnabled()) {
136 LOG.debug("Received out of order command which is being buffered for later: " + command);
137 }
138 commands.add(command);
139 }
140 } catch (IOException e) {
141 onException(e);
142 }
143
144 if (!empty) {
145 // lets see if the first item in the set is the next
146 // expected
147 command = commands.first();
148 valid = expectedCounter == command.getCommandId();
149 if (valid) {
150 commands.remove(command);
151 }
152 }
153 }
154 }
155
156 while (valid) {
157 // we've got a valid header so increment counter
158 replayStrategy.onReceivedPacket(this, expectedCounter);
159 expectedCounter++;
160 super.onCommand(command);
161
162 synchronized (commands) {
163 // we could have more commands left
164 valid = !commands.isEmpty();
165 if (valid) {
166 // lets see if the first item in the set is the next
167 // expected
168 command = commands.first();
169 valid = expectedCounter == command.getCommandId();
170 if (valid) {
171 commands.remove(command);
172 }
173 }
174 }
175 }
176 }
177
178 public int getBufferedCommandCount() {
179 synchronized (commands) {
180 return commands.size();
181 }
182 }
183
184 public int getExpectedCounter() {
185 return expectedCounter;
186 }
187
188 /**
189 * This property should never really be set - but is mutable primarily for
190 * test cases
191 */
192 public void setExpectedCounter(int expectedCounter) {
193 this.expectedCounter = expectedCounter;
194 }
195
196 public int getRequestTimeout() {
197 return requestTimeout;
198 }
199
200 /**
201 * Sets the default timeout of requests before starting to request commands
202 * are replayed
203 */
204 public void setRequestTimeout(int requestTimeout) {
205 this.requestTimeout = requestTimeout;
206 }
207
208 public ReplayStrategy getReplayStrategy() {
209 return replayStrategy;
210 }
211
212 public ReplayBuffer getReplayBuffer() {
213 if (replayBuffer == null) {
214 replayBuffer = createReplayBuffer();
215 }
216 return replayBuffer;
217 }
218
219 public void setReplayBuffer(ReplayBuffer replayBuffer) {
220 this.replayBuffer = replayBuffer;
221 }
222
223 public int getReplayBufferCommandCount() {
224 return replayBufferCommandCount;
225 }
226
227 /**
228 * Sets the default number of commands which are buffered
229 */
230 public void setReplayBufferCommandCount(int replayBufferSize) {
231 this.replayBufferCommandCount = replayBufferSize;
232 }
233
234 public void setReplayStrategy(ReplayStrategy replayStrategy) {
235 this.replayStrategy = replayStrategy;
236 }
237
238 public Replayer getReplayer() {
239 return replayer;
240 }
241
242 public void setReplayer(Replayer replayer) {
243 this.replayer = replayer;
244 }
245
246 public String toString() {
247 return next.toString();
248 }
249
250 public void start() throws Exception {
251 if (udpTransport != null) {
252 udpTransport.setReplayBuffer(getReplayBuffer());
253 }
254 if (replayStrategy == null) {
255 throw new IllegalArgumentException("Property replayStrategy not specified");
256 }
257 super.start();
258 }
259
260 /**
261 * Lets attempt to replay the request as a command may have disappeared
262 */
263 protected void onMissingResponse(Command command, FutureResponse response) {
264 LOG.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
265
266 int commandId = command.getCommandId();
267 requestReplay(commandId, commandId);
268 }
269
270 protected ReplayBuffer createReplayBuffer() {
271 return new DefaultReplayBuffer(getReplayBufferCommandCount());
272 }
273
274 protected void replayCommands(ReplayCommand command) {
275 try {
276 if (replayer == null) {
277 onException(new IOException("Cannot replay commands. No replayer property configured"));
278 }
279 if (LOG.isDebugEnabled()) {
280 LOG.debug("Processing replay command: " + command);
281 }
282 getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
283
284 // TODO we could proactively remove ack'd stuff from the replay
285 // buffer
286 // if we only have a single client talking to us
287 } catch (IOException e) {
288 onException(e);
289 }
290 }
291
292 }