001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.nio;
018
019 import java.io.IOException;
020 import java.nio.channels.SelectionKey;
021 import java.nio.channels.Selector;
022 import java.util.Iterator;
023 import java.util.Set;
024 import java.util.concurrent.ConcurrentLinkedQueue;
025 import java.util.concurrent.atomic.AtomicInteger;
026
027 public class SelectorWorker implements Runnable {
028
029 private static final AtomicInteger NEXT_ID = new AtomicInteger();
030
031 final SelectorManager manager;
032 final Selector selector;
033 final int id = NEXT_ID.getAndIncrement();
034 private final int maxChannelsPerWorker;
035
036 final AtomicInteger retainCounter = new AtomicInteger(1);
037 private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
038
039 public SelectorWorker(SelectorManager manager) throws IOException {
040 this.manager = manager;
041 selector = Selector.open();
042 maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
043 manager.getSelectorExecutor().execute(this);
044 }
045
046 void retain() {
047 if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
048 manager.onWorkerFullEvent(this);
049 }
050 }
051
052 void release() {
053 int use = retainCounter.decrementAndGet();
054 if (use == 0) {
055 manager.onWorkerEmptyEvent(this);
056 } else if (use == maxChannelsPerWorker - 1) {
057 manager.onWorkerNotFullEvent(this);
058 }
059 }
060
061 boolean isReleased() {
062 return retainCounter.get()==0;
063 }
064
065
066 public void addIoTask(Runnable work) {
067 ioTasks.add(work);
068 selector.wakeup();
069 }
070
071 private void processIoTasks() {
072 Runnable task;
073 while( (task= ioTasks.poll()) !=null ) {
074 try {
075 task.run();
076 } catch (Throwable e) {
077 e.printStackTrace();
078 }
079 }
080 }
081
082
083
084 public void run() {
085
086 String origName = Thread.currentThread().getName();
087 try {
088 Thread.currentThread().setName("Selector Worker: " + id);
089 while (!isReleased()) {
090
091 processIoTasks();
092
093 int count = selector.select(10);
094
095 if (count == 0) {
096 continue;
097 }
098
099 // Get a java.util.Set containing the SelectionKey objects
100 // for all channels that are ready for I/O.
101 Set keys = selector.selectedKeys();
102
103 for (Iterator i = keys.iterator(); i.hasNext();) {
104 final SelectionKey key = (SelectionKey)i.next();
105 i.remove();
106
107 final SelectorSelection s = (SelectorSelection)key.attachment();
108 try {
109 if( key.isValid() ) {
110 key.interestOps(0);
111 }
112
113 // Kick off another thread to find newly selected keys
114 // while we process the
115 // currently selected keys
116 manager.getChannelExecutor().execute(new Runnable() {
117 public void run() {
118 try {
119 s.onSelect();
120 s.enable();
121 } catch (Throwable e) {
122 s.onError(e);
123 }
124 }
125 });
126
127 } catch (Throwable e) {
128 s.onError(e);
129 }
130
131 }
132
133 }
134 } catch (Throwable e) {
135 e.printStackTrace();
136 // Notify all the selections that the error occurred.
137 Set keys = selector.keys();
138 for (Iterator i = keys.iterator(); i.hasNext();) {
139 SelectionKey key = (SelectionKey)i.next();
140 SelectorSelection s = (SelectorSelection)key.attachment();
141 s.onError(e);
142 }
143 } finally {
144 try {
145 manager.onWorkerEmptyEvent(this);
146 selector.close();
147 } catch (IOException ignore) {
148 ignore.printStackTrace();
149 }
150 Thread.currentThread().setName(origName);
151 }
152 }
153
154 }