1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22
23 import org.eclipse.jetty.continuation.Continuation;
24 import org.eclipse.jetty.io.AsyncEndPoint;
25 import org.eclipse.jetty.io.ConnectedEndPoint;
26 import org.eclipse.jetty.io.Connection;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.nio.AsyncConnection;
29 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
30 import org.eclipse.jetty.io.nio.SelectorManager;
31 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
32 import org.eclipse.jetty.server.AsyncHttpConnection;
33 import org.eclipse.jetty.server.Request;
34 import org.eclipse.jetty.util.thread.ThreadPool;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public class SelectChannelConnector extends AbstractNIOConnector
63 {
64 protected ServerSocketChannel _acceptChannel;
65 private int _lowResourcesConnections;
66 private int _lowResourcesMaxIdleTime;
67 private int _localPort=-1;
68
69 private final SelectorManager _manager = new ConnectorSelectorManager();
70
71
72
73
74
75
76 public SelectChannelConnector()
77 {
78 _manager.setMaxIdleTime(getMaxIdleTime());
79 addBean(_manager,true);
80 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
81 }
82
83
84 @Override
85 public void accept(int acceptorID) throws IOException
86 {
87 ServerSocketChannel server;
88 synchronized(this)
89 {
90 server = _acceptChannel;
91 }
92
93 if (server!=null && server.isOpen() && _manager.isStarted())
94 {
95 SocketChannel channel = server.accept();
96 channel.configureBlocking(false);
97 Socket socket = channel.socket();
98 configure(socket);
99 _manager.register(channel);
100 }
101 }
102
103
104 public void close() throws IOException
105 {
106 synchronized(this)
107 {
108 if (_acceptChannel != null)
109 {
110 removeBean(_acceptChannel);
111 if (_acceptChannel.isOpen())
112 _acceptChannel.close();
113 }
114 _acceptChannel = null;
115 _localPort=-2;
116 }
117 }
118
119
120 @Override
121 public void customize(EndPoint endpoint, Request request) throws IOException
122 {
123 request.setTimeStamp(System.currentTimeMillis());
124 endpoint.setMaxIdleTime(_maxIdleTime);
125 super.customize(endpoint, request);
126 }
127
128
129 @Override
130 public void persist(EndPoint endpoint) throws IOException
131 {
132 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
133 aEndp.setCheckForIdle(true);
134 super.persist(endpoint);
135 }
136
137
138 public SelectorManager getSelectorManager()
139 {
140 return _manager;
141 }
142
143
144 public synchronized Object getConnection()
145 {
146 return _acceptChannel;
147 }
148
149
150 public int getLocalPort()
151 {
152 synchronized(this)
153 {
154 return _localPort;
155 }
156 }
157
158
159 public void open() throws IOException
160 {
161 synchronized(this)
162 {
163 if (_acceptChannel == null)
164 {
165
166 _acceptChannel = ServerSocketChannel.open();
167
168 _acceptChannel.configureBlocking(true);
169
170
171 _acceptChannel.socket().setReuseAddress(getReuseAddress());
172 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
173 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
174
175 _localPort=_acceptChannel.socket().getLocalPort();
176 if (_localPort<=0)
177 throw new IOException("Server channel not bound");
178
179 addBean(_acceptChannel);
180 }
181 }
182 }
183
184
185 @Override
186 public void setMaxIdleTime(int maxIdleTime)
187 {
188 _manager.setMaxIdleTime(maxIdleTime);
189 super.setMaxIdleTime(maxIdleTime);
190 }
191
192
193
194
195
196 public int getLowResourcesConnections()
197 {
198 return _lowResourcesConnections;
199 }
200
201
202
203
204
205
206
207
208 public void setLowResourcesConnections(int lowResourcesConnections)
209 {
210 _lowResourcesConnections=lowResourcesConnections;
211 }
212
213
214
215
216
217 @Override
218 public int getLowResourcesMaxIdleTime()
219 {
220 return _lowResourcesMaxIdleTime;
221 }
222
223
224
225
226
227
228
229
230
231 @Override
232 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
233 {
234 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
235 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
236 }
237
238
239
240
241
242
243 @Override
244 protected void doStart() throws Exception
245 {
246 _manager.setSelectSets(getAcceptors());
247 _manager.setMaxIdleTime(getMaxIdleTime());
248 _manager.setLowResourcesConnections(getLowResourcesConnections());
249 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
250
251 super.doStart();
252 }
253
254
255 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
256 {
257 SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
258 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
259 return endp;
260 }
261
262
263 protected void endPointClosed(SelectChannelEndPoint endpoint)
264 {
265 connectionClosed(endpoint.getConnection());
266 }
267
268
269 protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint)
270 {
271 return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer());
272 }
273
274
275
276
277
278 private final class ConnectorSelectorManager extends SelectorManager
279 {
280 @Override
281 public boolean dispatch(Runnable task)
282 {
283 ThreadPool pool=getThreadPool();
284 if (pool==null)
285 pool=getServer().getThreadPool();
286 return pool.dispatch(task);
287 }
288
289 @Override
290 protected void endPointClosed(final SelectChannelEndPoint endpoint)
291 {
292 SelectChannelConnector.this.endPointClosed(endpoint);
293 }
294
295 @Override
296 protected void endPointOpened(SelectChannelEndPoint endpoint)
297 {
298
299 connectionOpened(endpoint.getConnection());
300 }
301
302 @Override
303 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
304 {
305 connectionUpgraded(oldConnection,endpoint.getConnection());
306 }
307
308 @Override
309 public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment)
310 {
311 return SelectChannelConnector.this.newConnection(channel,endpoint);
312 }
313
314 @Override
315 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
316 {
317 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
318 }
319 }
320 }