001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020
021 import org.apache.activemq.command.WireFormatInfo;
022 import org.apache.activemq.wireformat.WireFormat;
023 import org.slf4j.Logger;
024 import org.slf4j.LoggerFactory;
025
026 /**
027 * Used to make sure that commands are arriving periodically from the peer of
028 * the transport.
029 */
030 public class InactivityMonitor extends AbstractInactivityMonitor {
031
032 private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
033
034 private WireFormatInfo localWireFormatInfo;
035 private WireFormatInfo remoteWireFormatInfo;
036
037 private boolean ignoreRemoteWireFormat = false;
038 private boolean ignoreAllWireFormatInfo = false;
039
040 public InactivityMonitor(Transport next, WireFormat wireFormat) {
041 super(next, wireFormat);
042 if (this.wireFormat == null) {
043 this.ignoreAllWireFormatInfo = true;
044 }
045 }
046
047 protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
048 IOException error = null;
049 remoteWireFormatInfo = info;
050 try {
051 startMonitorThreads();
052 } catch (IOException e) {
053 error = e;
054 }
055 if (error != null) {
056 onException(error);
057 }
058 }
059
060 protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
061 localWireFormatInfo = info;
062 startMonitorThreads();
063 }
064
065 @Override
066 protected synchronized void startMonitorThreads() throws IOException {
067 if (isMonitorStarted()) {
068 return;
069 }
070
071 long readCheckTime = getReadCheckTime();
072
073 if (readCheckTime > 0) {
074 setWriteCheckTime(writeCheckValueFromReadCheck(readCheckTime));
075 }
076
077 super.startMonitorThreads();
078 }
079
080 private long writeCheckValueFromReadCheck(long readCheckTime) {
081 return readCheckTime>3 ? readCheckTime/3 : readCheckTime;
082 }
083
084 @Override
085 protected boolean configuredOk() throws IOException {
086 boolean configured = false;
087 if (ignoreAllWireFormatInfo) {
088 configured = true;
089 } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
090 if (!ignoreRemoteWireFormat) {
091 if (LOG.isDebugEnabled()) {
092 LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
093 }
094
095 long readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
096 long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
097
098 setReadCheckTime(readCheckTime);
099 setInitialDelayTime(Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()));
100 setWriteCheckTime(writeCheckTime);
101
102 } else {
103 if (LOG.isDebugEnabled()) {
104 LOG.debug("Using local: " + localWireFormatInfo);
105 }
106
107 long readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
108 long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
109
110 setReadCheckTime(readCheckTime);
111 setInitialDelayTime(localWireFormatInfo.getMaxInactivityDurationInitalDelay());
112 setWriteCheckTime(writeCheckTime);
113 }
114 configured = true;
115 }
116
117 return configured;
118 }
119
120 public boolean isIgnoreAllWireFormatInfo() {
121 return ignoreAllWireFormatInfo;
122 }
123
124 public void setIgnoreAllWireFormatInfo(boolean ignoreAllWireFormatInfo) {
125 this.ignoreAllWireFormatInfo = ignoreAllWireFormatInfo;
126 }
127
128 public boolean isIgnoreRemoteWireFormat() {
129 return ignoreRemoteWireFormat;
130 }
131
132 public void setIgnoreRemoteWireFormat(boolean ignoreRemoteWireFormat) {
133 this.ignoreRemoteWireFormat = ignoreRemoteWireFormat;
134 }
135 }