001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.vm;
018
019 import java.io.IOException;
020 import java.net.InetSocketAddress;
021 import java.net.URI;
022 import java.util.concurrent.atomic.AtomicInteger;
023
024 import org.apache.activemq.command.BrokerInfo;
025 import org.apache.activemq.transport.MutexTransport;
026 import org.apache.activemq.transport.ResponseCorrelator;
027 import org.apache.activemq.transport.Transport;
028 import org.apache.activemq.transport.TransportAcceptListener;
029 import org.apache.activemq.transport.TransportServer;
030
031 /**
032 * Broker side of the VMTransport
033 */
034 public class VMTransportServer implements TransportServer {
035
036 private TransportAcceptListener acceptListener;
037 private final URI location;
038 private boolean disposed;
039
040 private final AtomicInteger connectionCount = new AtomicInteger(0);
041 private final boolean disposeOnDisconnect;
042
043 /**
044 * @param location
045 * @param disposeOnDisconnect
046 */
047 public VMTransportServer(URI location, boolean disposeOnDisconnect) {
048 this.location = location;
049 this.disposeOnDisconnect = disposeOnDisconnect;
050 }
051
052 /**
053 * @return a pretty print of this
054 */
055 public String toString() {
056 return "VMTransportServer(" + location + ")";
057 }
058
059 /**
060 * @return new VMTransport
061 * @throws IOException
062 */
063 public VMTransport connect() throws IOException {
064 TransportAcceptListener al;
065 synchronized (this) {
066 if (disposed) {
067 throw new IOException("Server has been disposed.");
068 }
069 al = acceptListener;
070 }
071 if (al == null) {
072 throw new IOException("Server TransportAcceptListener is null.");
073 }
074
075 connectionCount.incrementAndGet();
076 VMTransport client = new VMTransport(location) {
077 public void stop() throws Exception {
078 if (!disposed.get()) {
079 super.stop();
080 if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect) {
081 VMTransportServer.this.stop();
082 }
083 }
084 };
085 };
086
087 VMTransport server = new VMTransport(location);
088 client.setPeer(server);
089 server.setPeer(client);
090 al.onAccept(configure(server));
091 return client;
092 }
093
094 /**
095 * Configure transport
096 *
097 * @param transport
098 * @return the Transport
099 */
100 public static Transport configure(Transport transport) {
101 transport = new MutexTransport(transport);
102 transport = new ResponseCorrelator(transport);
103 return transport;
104 }
105
106 /**
107 * Set the Transport accept listener for new Connections
108 *
109 * @param acceptListener
110 */
111 public synchronized void setAcceptListener(TransportAcceptListener acceptListener) {
112 this.acceptListener = acceptListener;
113 }
114
115 public void start() throws IOException {
116 }
117
118 public void stop() throws IOException {
119 VMTransportFactory.stopped(this);
120 }
121
122 public URI getConnectURI() {
123 return location;
124 }
125
126 public URI getBindURI() {
127 return location;
128 }
129
130 public void setBrokerInfo(BrokerInfo brokerInfo) {
131 }
132
133 public InetSocketAddress getSocketAddress() {
134 return null;
135 }
136
137 public int getConnectionCount() {
138 return connectionCount.intValue();
139 }
140 }