001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.vm;
018
019 import java.io.IOException;
020 import java.io.InterruptedIOException;
021 import java.net.URI;
022 import java.util.concurrent.BlockingQueue;
023 import java.util.concurrent.LinkedBlockingQueue;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicBoolean;
026 import java.util.concurrent.atomic.AtomicLong;
027
028 import org.apache.activemq.command.ShutdownInfo;
029 import org.apache.activemq.thread.DefaultThreadPools;
030 import org.apache.activemq.thread.Task;
031 import org.apache.activemq.thread.TaskRunner;
032 import org.apache.activemq.transport.FutureResponse;
033 import org.apache.activemq.transport.ResponseCallback;
034 import org.apache.activemq.transport.Transport;
035 import org.apache.activemq.transport.TransportDisposedIOException;
036 import org.apache.activemq.transport.TransportListener;
037
038 /**
039 * A Transport implementation that uses direct method invocations.
040 */
041 public class VMTransport implements Transport, Task {
042
043 private static final Object DISCONNECT = new Object();
044 private static final AtomicLong NEXT_ID = new AtomicLong(0);
045
046 // Transport Configuration
047 protected VMTransport peer;
048 protected TransportListener transportListener;
049 protected boolean marshal;
050 protected boolean network;
051 protected boolean async = true;
052 protected int asyncQueueDepth = 2000;
053 protected final URI location;
054 protected final long id;
055
056 // Implementation
057 private LinkedBlockingQueue<Object> messageQueue;
058 private TaskRunner taskRunner;
059
060 // Transport State
061 protected final AtomicBoolean started = new AtomicBoolean();
062 protected final AtomicBoolean disposed = new AtomicBoolean();
063
064 private volatile int receiveCounter;
065
066 public VMTransport(URI location) {
067 this.location = location;
068 this.id = NEXT_ID.getAndIncrement();
069 }
070
071 public void setPeer(VMTransport peer) {
072 this.peer = peer;
073 }
074
075 public void oneway(Object command) throws IOException {
076
077 if (disposed.get()) {
078 throw new TransportDisposedIOException("Transport disposed.");
079 }
080
081 if (peer == null) {
082 throw new IOException("Peer not connected.");
083 }
084
085 try {
086
087 if (peer.disposed.get()) {
088 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
089 }
090
091 if (peer.async || !peer.started.get()) {
092 peer.getMessageQueue().put(command);
093 peer.wakeup();
094 return;
095 }
096
097 } catch (InterruptedException e) {
098 InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
099 iioe.initCause(e);
100 throw iioe;
101 }
102
103 dispatch(peer, peer.messageQueue, command);
104 }
105
106 public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
107 TransportListener transportListener = transport.getTransportListener();
108 if (transportListener != null) {
109 synchronized (started) {
110
111 // Ensure that no additional commands entered the queue in the small time window
112 // before the start method locks the dispatch lock and the oneway method was in
113 // an put operation.
114 while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
115 doDispatch(transport, transportListener, pending.poll());
116 }
117
118 // We are now in sync mode and won't enqueue any more commands to the target
119 // transport so lets clean up its resources.
120 transport.messageQueue = null;
121
122 // Don't dispatch if either end was disposed already.
123 if (command != null && !this.disposed.get() && !transport.isDisposed()) {
124 doDispatch(transport, transportListener, command);
125 }
126 }
127 }
128 }
129
130 public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
131 if (command == DISCONNECT) {
132 transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
133 } else {
134 transport.receiveCounter++;
135 transportListener.onCommand(command);
136 }
137 }
138
139 public void start() throws Exception {
140
141 if (transportListener == null) {
142 throw new IOException("TransportListener not set.");
143 }
144
145 // If we are not in async mode we lock the dispatch lock here and then start to
146 // prevent any sync dispatches from occurring until we dispatch the pending messages
147 // to maintain delivery order. When async this happens automatically so just set
148 // started and wakeup the task runner.
149 if (!async) {
150 synchronized (started) {
151 if (started.compareAndSet(false, true)) {
152 LinkedBlockingQueue<Object> mq = getMessageQueue();
153 Object command;
154 while ((command = mq.poll()) != null && !disposed.get() ) {
155 receiveCounter++;
156 doDispatch(this, transportListener, command);
157 }
158 }
159 }
160 } else {
161 if (started.compareAndSet(false, true)) {
162 wakeup();
163 }
164 }
165 }
166
167 public void stop() throws Exception {
168 // Only need to do this once, all future oneway calls will now
169 // fail as will any asnyc jobs in the task runner.
170 if (disposed.compareAndSet(false, true)) {
171
172 TaskRunner tr = taskRunner;
173 LinkedBlockingQueue<Object> mq = this.messageQueue;
174
175 taskRunner = null;
176 messageQueue = null;
177
178 if (mq != null) {
179 mq.clear();
180 }
181
182 // Allow pending deliveries to finish up, but don't wait
183 // forever in case of an stalled onCommand.
184 if (tr != null) {
185 try {
186 tr.shutdown(TimeUnit.SECONDS.toMillis(1));
187 } catch(Exception e) {
188 }
189 }
190
191 // let the peer know that we are disconnecting after attempting
192 // to cleanly shutdown the async tasks so that this is the last
193 // command it see's.
194 try {
195 peer.transportListener.onCommand(new ShutdownInfo());
196 } catch (Exception ignore) {
197 }
198 }
199 }
200
201 protected void wakeup() {
202 if (async && started.get()) {
203 try {
204 getTaskRunner().wakeup();
205 } catch (InterruptedException e) {
206 Thread.currentThread().interrupt();
207 } catch (TransportDisposedIOException e) {
208 }
209 }
210 }
211
212 /**
213 * @see org.apache.activemq.thread.Task#iterate()
214 */
215 public boolean iterate() {
216
217 final TransportListener tl = transportListener;
218
219 LinkedBlockingQueue<Object> mq;
220 try {
221 mq = getMessageQueue();
222 } catch (TransportDisposedIOException e) {
223 return false;
224 }
225
226 Object command = mq.poll();
227 if (command != null && !disposed.get()) {
228 if( command == DISCONNECT ) {
229 tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
230 } else {
231 tl.onCommand(command);
232 }
233 return !mq.isEmpty() && !disposed.get();
234 } else {
235 if(disposed.get()) {
236 mq.clear();
237 }
238 return false;
239 }
240 }
241
242 public void setTransportListener(TransportListener commandListener) {
243 this.transportListener = commandListener;
244 }
245
246 public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) {
247 synchronized (this) {
248 if (messageQueue == null) {
249 messageQueue = asyncQueue;
250 }
251 }
252 }
253
254 public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
255 LinkedBlockingQueue<Object> result = messageQueue;
256 if (result == null) {
257 synchronized (this) {
258 result = messageQueue;
259 if (result == null) {
260 if (disposed.get()) {
261 throw new TransportDisposedIOException("The Transport has been disposed");
262 }
263
264 messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
265 }
266 }
267 }
268 return result;
269 }
270
271 protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
272 TaskRunner result = taskRunner;
273 if (result == null) {
274 synchronized (this) {
275 result = taskRunner;
276 if (result == null) {
277 if (disposed.get()) {
278 throw new TransportDisposedIOException("The Transport has been disposed");
279 }
280
281 taskRunner = result = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
282 }
283 }
284 }
285 return result;
286 }
287
288 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
289 throw new AssertionError("Unsupported Method");
290 }
291
292 public Object request(Object command) throws IOException {
293 throw new AssertionError("Unsupported Method");
294 }
295
296 public Object request(Object command, int timeout) throws IOException {
297 throw new AssertionError("Unsupported Method");
298 }
299
300 public TransportListener getTransportListener() {
301 return transportListener;
302 }
303
304 public <T> T narrow(Class<T> target) {
305 if (target.isAssignableFrom(getClass())) {
306 return target.cast(this);
307 }
308 return null;
309 }
310
311 public boolean isMarshal() {
312 return marshal;
313 }
314
315 public void setMarshal(boolean marshal) {
316 this.marshal = marshal;
317 }
318
319 public boolean isNetwork() {
320 return network;
321 }
322
323 public void setNetwork(boolean network) {
324 this.network = network;
325 }
326
327 @Override
328 public String toString() {
329 return location + "#" + id;
330 }
331
332 public String getRemoteAddress() {
333 if (peer != null) {
334 return peer.toString();
335 }
336 return null;
337 }
338
339 /**
340 * @return the async
341 */
342 public boolean isAsync() {
343 return async;
344 }
345
346 /**
347 * @param async the async to set
348 */
349 public void setAsync(boolean async) {
350 this.async = async;
351 }
352
353 /**
354 * @return the asyncQueueDepth
355 */
356 public int getAsyncQueueDepth() {
357 return asyncQueueDepth;
358 }
359
360 /**
361 * @param asyncQueueDepth the asyncQueueDepth to set
362 */
363 public void setAsyncQueueDepth(int asyncQueueDepth) {
364 this.asyncQueueDepth = asyncQueueDepth;
365 }
366
367 public boolean isFaultTolerant() {
368 return false;
369 }
370
371 public boolean isDisposed() {
372 return disposed.get();
373 }
374
375 public boolean isConnected() {
376 return !disposed.get();
377 }
378
379 public void reconnect(URI uri) throws IOException {
380 throw new IOException("Transport reconnect is not supported");
381 }
382
383 public boolean isReconnectSupported() {
384 return false;
385 }
386
387 public boolean isUpdateURIsSupported() {
388 return false;
389 }
390
391 public void updateURIs(boolean reblance,URI[] uris) throws IOException {
392 throw new IOException("URI update feature not supported");
393 }
394
395 public int getReceiveCounter() {
396 return receiveCounter;
397 }
398 }