1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Arrays;
23
24 import org.eclipse.jetty.continuation.Continuation;
25 import org.eclipse.jetty.io.ConnectedEndPoint;
26 import org.eclipse.jetty.io.Connection;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
29 import org.eclipse.jetty.io.nio.SelectorManager;
30 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
31 import org.eclipse.jetty.server.AsyncHttpConnection;
32 import org.eclipse.jetty.server.Connector;
33 import org.eclipse.jetty.server.HttpConnection;
34 import org.eclipse.jetty.server.Request;
35 import org.eclipse.jetty.server.Server;
36 import org.eclipse.jetty.util.component.AggregateLifeCycle;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.thread.Timeout.Task;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class SelectChannelConnector extends AbstractNIOConnector
67 {
68 protected ServerSocketChannel _acceptChannel;
69 private int _lowResourcesConnections;
70 private int _lowResourcesMaxIdleTime;
71 private int _localPort=-1;
72
73 private final SelectorManager _manager = new ConnectorSelectorManager();
74
75
76
77
78
79
80 public SelectChannelConnector()
81 {
82 _manager.setMaxIdleTime(getMaxIdleTime());
83 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
84 }
85
86
87 @Override
88 public void accept(int acceptorID) throws IOException
89 {
90 ServerSocketChannel server = _acceptChannel;
91 if (server!=null && server.isOpen())
92 {
93 SocketChannel channel = _acceptChannel.accept();
94 channel.configureBlocking(false);
95 Socket socket = channel.socket();
96 configure(socket);
97 _manager.register(channel);
98 }
99 }
100
101
102 public void close() throws IOException
103 {
104 synchronized(this)
105 {
106 if (_acceptChannel != null)
107 _acceptChannel.close();
108 _acceptChannel = null;
109 _localPort=-2;
110 }
111 }
112
113
114 @Override
115 public void customize(EndPoint endpoint, Request request) throws IOException
116 {
117 SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
118 cep.cancelIdle();
119 request.setTimeStamp(cep.getSelectSet().getNow());
120 endpoint.setMaxIdleTime(_maxIdleTime);
121 super.customize(endpoint, request);
122 }
123
124
125 @Override
126 public void persist(EndPoint endpoint) throws IOException
127 {
128 ((SelectChannelEndPoint)endpoint).scheduleIdle();
129 super.persist(endpoint);
130 }
131
132
133 public Object getConnection()
134 {
135 return _acceptChannel;
136 }
137
138
139 public int getLocalPort()
140 {
141 synchronized(this)
142 {
143 return _localPort;
144 }
145 }
146
147
148 public void open() throws IOException
149 {
150 synchronized(this)
151 {
152 if (_acceptChannel == null)
153 {
154
155 _acceptChannel = ServerSocketChannel.open();
156
157 _acceptChannel.configureBlocking(true);
158
159
160 _acceptChannel.socket().setReuseAddress(getReuseAddress());
161 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
162 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
163
164 _localPort=_acceptChannel.socket().getLocalPort();
165 if (_localPort<=0)
166 throw new IOException("Server channel not bound");
167
168 }
169 }
170 }
171
172
173 @Override
174 public void setMaxIdleTime(int maxIdleTime)
175 {
176 _manager.setMaxIdleTime(maxIdleTime);
177 super.setMaxIdleTime(maxIdleTime);
178 }
179
180
181
182
183
184 public int getLowResourcesConnections()
185 {
186 return _lowResourcesConnections;
187 }
188
189
190
191
192
193
194
195
196 public void setLowResourcesConnections(int lowResourcesConnections)
197 {
198 _lowResourcesConnections=lowResourcesConnections;
199 }
200
201
202
203
204
205 @Override
206 public int getLowResourcesMaxIdleTime()
207 {
208 return _lowResourcesMaxIdleTime;
209 }
210
211
212
213
214
215
216
217
218
219 @Override
220 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
221 {
222 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
223 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
224 }
225
226
227
228
229
230
231 @Override
232 protected void doStart() throws Exception
233 {
234 _manager.setSelectSets(getAcceptors());
235 _manager.setMaxIdleTime(getMaxIdleTime());
236 _manager.setLowResourcesConnections(getLowResourcesConnections());
237 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
238 _manager.start();
239
240 super.doStart();
241
242
243 for (int i=0;i<getAcceptors();i++)
244 {
245 final int id=i;
246 _manager.dispatch(new Runnable()
247 {
248 public void run()
249 {
250 String name=Thread.currentThread().getName();
251 try
252 {
253 Thread.currentThread().setName(name+" Selector"+id+" "+SelectChannelConnector.this);
254 while (isRunning())
255 {
256 try
257 {
258 _manager.doSelect(id);
259 }
260 catch(ThreadDeath e)
261 {
262 throw e;
263 }
264 catch(IOException e)
265 {
266 Log.ignore(e);
267 }
268 catch(Exception e)
269 {
270 Log.warn(e);
271 }
272 }
273 }
274 finally
275 {
276 Thread.currentThread().setName(name);
277 }
278 }
279 });
280 }
281 }
282
283
284
285
286
287 @Override
288 protected void doStop() throws Exception
289 {
290 synchronized(this)
291 {
292 if(_manager.isRunning())
293 {
294 try
295 {
296 _manager.stop();
297 }
298 catch (Exception e)
299 {
300 Log.warn(e);
301 }
302 }
303 }
304 super.doStop();
305 }
306
307
308 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
309 {
310 return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
311 }
312
313
314 protected void endPointClosed(SelectChannelEndPoint endpoint)
315 {
316 connectionClosed(endpoint.getConnection());
317 }
318
319
320 protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
321 {
322 return new SelectChannelHttpConnection(SelectChannelConnector.this,endpoint,getServer(),endpoint);
323 }
324
325
326 public void dump(Appendable out, String indent) throws IOException
327 {
328 out.append(String.valueOf(this)).append("\n");
329 ServerSocketChannel channel=_acceptChannel;
330 if (channel==null)
331 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{null,"CLOSED",_manager}));
332 else
333 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager}));
334 }
335
336
337
338
339 private class SelectChannelHttpConnection extends AsyncHttpConnection
340 {
341 private final SelectChannelEndPoint _endpoint;
342
343 private SelectChannelHttpConnection(Connector connector, EndPoint endpoint, Server server, SelectChannelEndPoint endpoint2)
344 {
345 super(connector,endpoint,server);
346 _endpoint = endpoint2;
347 }
348
349
350 @Override
351 public void cancelTimeout(Task task)
352 {
353 _endpoint.getSelectSet().cancelTimeout(task);
354 }
355
356
357 @Override
358 public void scheduleTimeout(Task task, long timeoutMs)
359 {
360 _endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
361 }
362 }
363
364
365
366
367 private final class ConnectorSelectorManager extends SelectorManager
368 {
369 @Override
370 public boolean dispatch(Runnable task)
371 {
372 return getThreadPool().dispatch(task);
373 }
374
375 @Override
376 protected void endPointClosed(final SelectChannelEndPoint endpoint)
377 {
378 SelectChannelConnector.this.endPointClosed(endpoint);
379 }
380
381 @Override
382 protected void endPointOpened(SelectChannelEndPoint endpoint)
383 {
384
385 connectionOpened(endpoint.getConnection());
386 }
387
388 @Override
389 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
390 {
391 connectionUpgraded(oldConnection,endpoint.getConnection());
392 }
393
394 @Override
395 protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
396 {
397 return SelectChannelConnector.this.newConnection(channel,endpoint);
398 }
399
400 @Override
401 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
402 {
403 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
404 }
405 }
406
407 }