001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.Map;
024
025 import org.apache.activemq.command.Command;
026 import org.apache.activemq.command.ExceptionResponse;
027 import org.apache.activemq.command.Response;
028 import org.apache.activemq.util.IntSequenceGenerator;
029 import org.slf4j.Logger;
030 import org.slf4j.LoggerFactory;
031
032 /**
033 * Adds the incrementing sequence number to commands along with performing the
034 * corelation of responses to requests to create a blocking request-response
035 * semantics.
036 *
037 *
038 */
039 public class ResponseCorrelator extends TransportFilter {
040
041 private static final Logger LOG = LoggerFactory.getLogger(ResponseCorrelator.class);
042 private final Map<Integer, FutureResponse> requestMap = new HashMap<Integer, FutureResponse>();
043 private IntSequenceGenerator sequenceGenerator;
044 private final boolean debug = LOG.isDebugEnabled();
045 private IOException error;
046
047 public ResponseCorrelator(Transport next) {
048 this(next, new IntSequenceGenerator());
049 }
050
051 public ResponseCorrelator(Transport next, IntSequenceGenerator sequenceGenerator) {
052 super(next);
053 this.sequenceGenerator = sequenceGenerator;
054 }
055
056 public void oneway(Object o) throws IOException {
057 Command command = (Command)o;
058 command.setCommandId(sequenceGenerator.getNextSequenceId());
059 command.setResponseRequired(false);
060 next.oneway(command);
061 }
062
063 public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
064 Command command = (Command) o;
065 command.setCommandId(sequenceGenerator.getNextSequenceId());
066 command.setResponseRequired(true);
067 FutureResponse future = new FutureResponse(responseCallback);
068 IOException priorError = null;
069 synchronized (requestMap) {
070 priorError = this.error;
071 if (priorError == null) {
072 requestMap.put(new Integer(command.getCommandId()), future);
073 }
074 }
075
076 if (priorError != null) {
077 future.set(new ExceptionResponse(priorError));
078 throw priorError;
079 }
080
081 next.oneway(command);
082 return future;
083 }
084
085 public Object request(Object command) throws IOException {
086 FutureResponse response = asyncRequest(command, null);
087 return response.getResult();
088 }
089
090 public Object request(Object command, int timeout) throws IOException {
091 FutureResponse response = asyncRequest(command, null);
092 return response.getResult(timeout);
093 }
094
095 public void onCommand(Object o) {
096 Command command = null;
097 if (o instanceof Command) {
098 command = (Command)o;
099 } else {
100 throw new ClassCastException("Object cannot be converted to a Command, Object: " + o);
101 }
102 if (command.isResponse()) {
103 Response response = (Response)command;
104 FutureResponse future = null;
105 synchronized (requestMap) {
106 future = requestMap.remove(Integer.valueOf(response.getCorrelationId()));
107 }
108 if (future != null) {
109 future.set(response);
110 } else {
111 if (debug) {
112 LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId());
113 }
114 }
115 } else {
116 getTransportListener().onCommand(command);
117 }
118 }
119
120 /**
121 * If an async exception occurs, then assume no responses will arrive for
122 * any of current requests. Lets let them know of the problem.
123 */
124 public void onException(IOException error) {
125 dispose(error);
126 super.onException(error);
127 }
128
129 @Override
130 public void stop() throws Exception {
131 dispose(new IOException("Stopped."));
132 super.stop();
133 }
134
135 private void dispose(IOException error) {
136 ArrayList<FutureResponse> requests=null;
137 synchronized(requestMap) {
138 if( this.error==null) {
139 this.error = error;
140 requests = new ArrayList<FutureResponse>(requestMap.values());
141 requestMap.clear();
142 }
143 }
144 if( requests!=null ) {
145 for (Iterator<FutureResponse> iter = requests.iterator(); iter.hasNext();) {
146 FutureResponse fr = iter.next();
147 fr.set(new ExceptionResponse(error));
148 }
149 }
150 }
151
152 public IntSequenceGenerator getSequenceGenerator() {
153 return sequenceGenerator;
154 }
155
156 public String toString() {
157 return next.toString();
158 }
159 }