001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020 import java.io.InterruptedIOException;
021 import java.net.Socket;
022 import java.util.concurrent.CountDownLatch;
023 import java.util.concurrent.TimeUnit;
024 import java.util.concurrent.atomic.AtomicBoolean;
025
026 import org.apache.activemq.command.Command;
027 import org.apache.activemq.command.WireFormatInfo;
028 import org.apache.activemq.openwire.OpenWireFormat;
029 import org.apache.activemq.util.IOExceptionSupport;
030 import org.slf4j.Logger;
031 import org.slf4j.LoggerFactory;
032
033 /**
034 * Negotiates the wire format with a new connection
035 */
036 public class WireFormatNegotiator extends TransportFilter {
037
038 private static final Logger LOG = LoggerFactory.getLogger(WireFormatNegotiator.class);
039
040 private OpenWireFormat wireFormat;
041 private final int minimumVersion;
042 private long negotiateTimeout = 15000L;
043
044 private final AtomicBoolean firstStart = new AtomicBoolean(true);
045 private final CountDownLatch readyCountDownLatch = new CountDownLatch(1);
046 private final CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
047
048 /**
049 * Negotiator
050 *
051 * @param next
052 */
053 public WireFormatNegotiator(Transport next, OpenWireFormat wireFormat, int minimumVersion) {
054 super(next);
055 this.wireFormat = wireFormat;
056 if (minimumVersion <= 0) {
057 minimumVersion = 1;
058 }
059 this.minimumVersion = minimumVersion;
060
061 // Setup the initial negociation timeout to be the same as the inital max inactivity delay specified on the wireformat
062 // Does not make sense for us to take longer.
063 try {
064 if( wireFormat.getPreferedWireFormatInfo() !=null ) {
065 setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay());
066 }
067 } catch (IOException e) {
068 }
069 }
070
071 public void start() throws Exception {
072 super.start();
073 if (firstStart.compareAndSet(true, false)) {
074 sendWireFormat();
075 }
076 }
077
078 public void sendWireFormat() throws IOException {
079 try {
080 WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
081 if (LOG.isDebugEnabled()) {
082 LOG.debug("Sending: " + info);
083 }
084 sendWireFormat(info);
085 } finally {
086 wireInfoSentDownLatch.countDown();
087 }
088 }
089
090 public void stop() throws Exception {
091 super.stop();
092 readyCountDownLatch.countDown();
093 }
094
095 public void oneway(Object command) throws IOException {
096 try {
097 if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
098 throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
099 }
100 } catch (InterruptedException e) {
101 Thread.currentThread().interrupt();
102 throw new InterruptedIOException();
103 }
104 super.oneway(command);
105 }
106
107 public void onCommand(Object o) {
108 Command command = (Command)o;
109 if (command.isWireFormatInfo()) {
110 WireFormatInfo info = (WireFormatInfo)command;
111 negociate(info);
112 }
113 getTransportListener().onCommand(command);
114 }
115
116 public void negociate(WireFormatInfo info) {
117 if (LOG.isDebugEnabled()) {
118 LOG.debug("Received WireFormat: " + info);
119 }
120
121 try {
122 wireInfoSentDownLatch.await();
123
124 if (LOG.isDebugEnabled()) {
125 LOG.debug(this + " before negotiation: " + wireFormat);
126 }
127 if (!info.isValid()) {
128 onException(new IOException("Remote wire format magic is invalid"));
129 } else if (info.getVersion() < minimumVersion) {
130 onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
131 }
132
133 wireFormat.renegotiateWireFormat(info);
134 Socket socket = next.narrow(Socket.class);
135 if (socket != null) {
136 socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled());
137 }
138
139 if (LOG.isDebugEnabled()) {
140 LOG.debug(this + " after negotiation: " + wireFormat);
141 }
142
143 } catch (IOException e) {
144 onException(e);
145 } catch (InterruptedException e) {
146 onException((IOException)new InterruptedIOException().initCause(e));
147 } catch (Exception e) {
148 onException(IOExceptionSupport.create(e));
149 }
150 readyCountDownLatch.countDown();
151 onWireFormatNegotiated(info);
152 }
153
154 public void onException(IOException error) {
155 readyCountDownLatch.countDown();
156 /*
157 * try { super.oneway(new ExceptionResponse(error)); } catch
158 * (IOException e) { // ignore as we are already throwing an exception }
159 */
160 super.onException(error);
161 }
162
163 public String toString() {
164 return next.toString();
165 }
166
167 protected void sendWireFormat(WireFormatInfo info) throws IOException {
168 next.oneway(info);
169 }
170
171 protected void onWireFormatNegotiated(WireFormatInfo info) {
172 }
173
174 public long getNegotiateTimeout() {
175 return negotiateTimeout;
176 }
177
178 public void setNegotiateTimeout(long negotiateTimeout) {
179 this.negotiateTimeout = negotiateTimeout;
180 }
181 }