1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Arrays;
23
24 import org.eclipse.jetty.continuation.Continuation;
25 import org.eclipse.jetty.io.ConnectedEndPoint;
26 import org.eclipse.jetty.io.Connection;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
29 import org.eclipse.jetty.io.nio.SelectorManager;
30 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
31 import org.eclipse.jetty.server.Connector;
32 import org.eclipse.jetty.server.HttpConnection;
33 import org.eclipse.jetty.server.Request;
34 import org.eclipse.jetty.server.Server;
35 import org.eclipse.jetty.util.component.AggregateLifeCycle;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.thread.Timeout.Task;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class SelectChannelConnector extends AbstractNIOConnector
66 {
67 protected ServerSocketChannel _acceptChannel;
68 private int _lowResourcesConnections;
69 private int _lowResourcesMaxIdleTime;
70 private int _localPort=-1;
71
72 private final SelectorManager _manager = new ConnectorSelectorManager();
73
74
75
76
77
78
79 public SelectChannelConnector()
80 {
81 _manager.setMaxIdleTime(getMaxIdleTime());
82 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
83 }
84
85
86 @Override
87 public void accept(int acceptorID) throws IOException
88 {
89 ServerSocketChannel server = _acceptChannel;
90 if (server!=null && server.isOpen())
91 {
92 SocketChannel channel = _acceptChannel.accept();
93 channel.configureBlocking(false);
94 Socket socket = channel.socket();
95 configure(socket);
96 _manager.register(channel);
97 }
98 }
99
100
101 public void close() throws IOException
102 {
103 synchronized(this)
104 {
105 if (_acceptChannel != null)
106 _acceptChannel.close();
107 _acceptChannel = null;
108 _localPort=-2;
109 }
110 }
111
112
113 @Override
114 public void customize(EndPoint endpoint, Request request) throws IOException
115 {
116 SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
117 cep.cancelIdle();
118 request.setTimeStamp(cep.getSelectSet().getNow());
119 endpoint.setMaxIdleTime(_maxIdleTime);
120 super.customize(endpoint, request);
121 }
122
123
124 @Override
125 public void persist(EndPoint endpoint) throws IOException
126 {
127 ((SelectChannelEndPoint)endpoint).scheduleIdle();
128 super.persist(endpoint);
129 }
130
131
132 public Object getConnection()
133 {
134 return _acceptChannel;
135 }
136
137
138 public int getLocalPort()
139 {
140 synchronized(this)
141 {
142 return _localPort;
143 }
144 }
145
146
147 public void open() throws IOException
148 {
149 synchronized(this)
150 {
151 if (_acceptChannel == null)
152 {
153
154 _acceptChannel = ServerSocketChannel.open();
155
156 _acceptChannel.configureBlocking(true);
157
158
159 _acceptChannel.socket().setReuseAddress(getReuseAddress());
160 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
161 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
162
163 _localPort=_acceptChannel.socket().getLocalPort();
164 if (_localPort<=0)
165 throw new IOException("Server channel not bound");
166
167 }
168 }
169 }
170
171
172 @Override
173 public void setMaxIdleTime(int maxIdleTime)
174 {
175 _manager.setMaxIdleTime(maxIdleTime);
176 super.setMaxIdleTime(maxIdleTime);
177 }
178
179
180
181
182
183 public int getLowResourcesConnections()
184 {
185 return _lowResourcesConnections;
186 }
187
188
189
190
191
192
193
194
195 public void setLowResourcesConnections(int lowResourcesConnections)
196 {
197 _lowResourcesConnections=lowResourcesConnections;
198 }
199
200
201
202
203
204 @Override
205 public int getLowResourcesMaxIdleTime()
206 {
207 return _lowResourcesMaxIdleTime;
208 }
209
210
211
212
213
214
215
216
217
218 @Override
219 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
220 {
221 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
222 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
223 }
224
225
226
227
228
229
230 @Override
231 protected void doStart() throws Exception
232 {
233 _manager.setSelectSets(getAcceptors());
234 _manager.setMaxIdleTime(getMaxIdleTime());
235 _manager.setLowResourcesConnections(getLowResourcesConnections());
236 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
237 _manager.start();
238
239 super.doStart();
240
241
242 for (int i=0;i<getAcceptors();i++)
243 {
244 final int id=i;
245 _manager.dispatch(new Runnable()
246 {
247 public void run()
248 {
249 String name=Thread.currentThread().getName();
250 try
251 {
252 Thread.currentThread().setName(name+" Selector"+id+" "+SelectChannelConnector.this);
253 while (isRunning())
254 {
255 try
256 {
257 _manager.doSelect(id);
258 }
259 catch(ThreadDeath e)
260 {
261 throw e;
262 }
263 catch(IOException e)
264 {
265 Log.ignore(e);
266 }
267 catch(Exception e)
268 {
269 Log.warn(e);
270 }
271 }
272 }
273 finally
274 {
275 Thread.currentThread().setName(name);
276 }
277 }
278 });
279 }
280 }
281
282
283
284
285
286 @Override
287 protected void doStop() throws Exception
288 {
289 synchronized(this)
290 {
291 if(_manager.isRunning())
292 {
293 try
294 {
295 _manager.stop();
296 }
297 catch (Exception e)
298 {
299 Log.warn(e);
300 }
301 }
302 }
303 super.doStop();
304 }
305
306
307 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
308 {
309 return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
310 }
311
312
313 protected void endPointClosed(SelectChannelEndPoint endpoint)
314 {
315 connectionClosed(endpoint.getConnection());
316 }
317
318
319 protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
320 {
321 return new SelectChannelHttpConnection(SelectChannelConnector.this,endpoint,getServer(),endpoint);
322 }
323
324
325 public void dump(Appendable out, String indent) throws IOException
326 {
327 out.append(String.valueOf(this)).append("\n");
328 ServerSocketChannel channel=_acceptChannel;
329 if (channel==null)
330 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{null,"CLOSED",_manager}));
331 else
332 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager}));
333 }
334
335
336
337
338 private class SelectChannelHttpConnection extends HttpConnection
339 {
340 private final SelectChannelEndPoint _endpoint;
341
342 private SelectChannelHttpConnection(Connector connector, EndPoint endpoint, Server server, SelectChannelEndPoint endpoint2)
343 {
344 super(connector,endpoint,server);
345 _endpoint = endpoint2;
346 }
347
348
349 @Override
350 public void cancelTimeout(Task task)
351 {
352 _endpoint.getSelectSet().cancelTimeout(task);
353 }
354
355
356 @Override
357 public void scheduleTimeout(Task task, long timeoutMs)
358 {
359 _endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
360 }
361 }
362
363
364
365
366 private final class ConnectorSelectorManager extends SelectorManager
367 {
368 @Override
369 public boolean dispatch(Runnable task)
370 {
371 return getThreadPool().dispatch(task);
372 }
373
374 @Override
375 protected void endPointClosed(final SelectChannelEndPoint endpoint)
376 {
377 SelectChannelConnector.this.endPointClosed(endpoint);
378 }
379
380 @Override
381 protected void endPointOpened(SelectChannelEndPoint endpoint)
382 {
383
384 connectionOpened(endpoint.getConnection());
385 }
386
387 @Override
388 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
389 {
390 connectionUpgraded(oldConnection,endpoint.getConnection());
391 }
392
393 @Override
394 protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
395 {
396 return SelectChannelConnector.this.newConnection(channel,endpoint);
397 }
398
399 @Override
400 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
401 {
402 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
403 }
404 }
405
406 }