001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020 import java.net.URI;
021
022 /**
023 *
024 */
025 public class TransportFilter implements TransportListener, Transport {
026 protected final Transport next;
027 protected TransportListener transportListener;
028
029 public TransportFilter(Transport next) {
030 this.next = next;
031 }
032
033 public TransportListener getTransportListener() {
034 return transportListener;
035 }
036
037 public void setTransportListener(TransportListener channelListener) {
038 this.transportListener = channelListener;
039 if (channelListener == null) {
040 next.setTransportListener(null);
041 } else {
042 next.setTransportListener(this);
043 }
044 }
045
046 /**
047 * @see org.apache.activemq.Service#start()
048 * @throws IOException
049 * if the next channel has not been set.
050 */
051 public void start() throws Exception {
052 if (next == null) {
053 throw new IOException("The next channel has not been set.");
054 }
055 if (transportListener == null) {
056 throw new IOException("The command listener has not been set.");
057 }
058 next.start();
059 }
060
061 /**
062 * @see org.apache.activemq.Service#stop()
063 */
064 public void stop() throws Exception {
065 next.stop();
066 }
067
068 public void onCommand(Object command) {
069 transportListener.onCommand(command);
070 }
071
072 /**
073 * @return Returns the next.
074 */
075 public Transport getNext() {
076 return next;
077 }
078
079 @Override
080 public String toString() {
081 return next.toString();
082 }
083
084 public void oneway(Object command) throws IOException {
085 next.oneway(command);
086 }
087
088 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
089 return next.asyncRequest(command, null);
090 }
091
092 public Object request(Object command) throws IOException {
093 return next.request(command);
094 }
095
096 public Object request(Object command, int timeout) throws IOException {
097 return next.request(command, timeout);
098 }
099
100 public void onException(IOException error) {
101 transportListener.onException(error);
102 }
103
104 public void transportInterupted() {
105 transportListener.transportInterupted();
106 }
107
108 public void transportResumed() {
109 transportListener.transportResumed();
110 }
111
112 public <T> T narrow(Class<T> target) {
113 if (target.isAssignableFrom(getClass())) {
114 return target.cast(this);
115 }
116 return next.narrow(target);
117 }
118
119 public String getRemoteAddress() {
120 return next.getRemoteAddress();
121 }
122
123 /**
124 * @return
125 * @see org.apache.activemq.transport.Transport#isFaultTolerant()
126 */
127 public boolean isFaultTolerant() {
128 return next.isFaultTolerant();
129 }
130
131 public boolean isDisposed() {
132 return next.isDisposed();
133 }
134
135 public boolean isConnected() {
136 return next.isConnected();
137 }
138
139 public void reconnect(URI uri) throws IOException {
140 next.reconnect(uri);
141 }
142
143 public int getReceiveCounter() {
144 return next.getReceiveCounter();
145 }
146
147 public boolean isReconnectSupported() {
148 return next.isReconnectSupported();
149 }
150
151 public boolean isUpdateURIsSupported() {
152 return next.isUpdateURIsSupported();
153 }
154
155 public void updateURIs(boolean rebalance,URI[] uris) throws IOException {
156 next.updateURIs(rebalance,uris);
157 }
158 }