1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Arrays;
23
24 import org.eclipse.jetty.continuation.Continuation;
25 import org.eclipse.jetty.io.ConnectedEndPoint;
26 import org.eclipse.jetty.io.Connection;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
29 import org.eclipse.jetty.io.nio.SelectorManager;
30 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
31 import org.eclipse.jetty.server.AsyncHttpConnection;
32 import org.eclipse.jetty.server.Connector;
33 import org.eclipse.jetty.server.HttpConnection;
34 import org.eclipse.jetty.server.Request;
35 import org.eclipse.jetty.server.Server;
36 import org.eclipse.jetty.util.component.AggregateLifeCycle;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.log.Logger;
39 import org.eclipse.jetty.util.thread.ThreadPool;
40 import org.eclipse.jetty.util.thread.Timeout.Task;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public class SelectChannelConnector extends AbstractNIOConnector
69 {
70 private static final Logger LOG = Log.getLogger(SelectChannelConnector.class);
71
72 protected ServerSocketChannel _acceptChannel;
73 private int _lowResourcesConnections;
74 private int _lowResourcesMaxIdleTime;
75 private int _localPort=-1;
76
77 private final SelectorManager _manager = new ConnectorSelectorManager();
78
79
80
81
82
83
84 public SelectChannelConnector()
85 {
86 _manager.setMaxIdleTime(getMaxIdleTime());
87 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
88 }
89
90
91 @Override
92 public void accept(int acceptorID) throws IOException
93 {
94 ServerSocketChannel server = _acceptChannel;
95 if (server!=null && server.isOpen())
96 {
97 SocketChannel channel = _acceptChannel.accept();
98 channel.configureBlocking(false);
99 Socket socket = channel.socket();
100 configure(socket);
101 _manager.register(channel);
102 }
103 }
104
105
106 public void close() throws IOException
107 {
108 synchronized(this)
109 {
110 if (_acceptChannel != null)
111 _acceptChannel.close();
112 _acceptChannel = null;
113 _localPort=-2;
114 }
115 }
116
117
118 @Override
119 public void customize(EndPoint endpoint, Request request) throws IOException
120 {
121 SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
122 cep.cancelIdle();
123 request.setTimeStamp(cep.getSelectSet().getNow());
124 endpoint.setMaxIdleTime(_maxIdleTime);
125 super.customize(endpoint, request);
126 }
127
128
129 @Override
130 public void persist(EndPoint endpoint) throws IOException
131 {
132 ((SelectChannelEndPoint)endpoint).scheduleIdle();
133 super.persist(endpoint);
134 }
135
136
137 public SelectorManager getSelectorManager()
138 {
139 return _manager;
140 }
141
142
143 public Object getConnection()
144 {
145 return _acceptChannel;
146 }
147
148
149 public int getLocalPort()
150 {
151 synchronized(this)
152 {
153 return _localPort;
154 }
155 }
156
157
158 public void open() throws IOException
159 {
160 synchronized(this)
161 {
162 if (_acceptChannel == null)
163 {
164
165 _acceptChannel = ServerSocketChannel.open();
166
167 _acceptChannel.configureBlocking(true);
168
169
170 _acceptChannel.socket().setReuseAddress(getReuseAddress());
171 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
172 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
173
174 _localPort=_acceptChannel.socket().getLocalPort();
175 if (_localPort<=0)
176 throw new IOException("Server channel not bound");
177
178 }
179 }
180 }
181
182
183 @Override
184 public void setMaxIdleTime(int maxIdleTime)
185 {
186 _manager.setMaxIdleTime(maxIdleTime);
187 super.setMaxIdleTime(maxIdleTime);
188 }
189
190
191
192
193
194 public int getLowResourcesConnections()
195 {
196 return _lowResourcesConnections;
197 }
198
199
200
201
202
203
204
205
206 public void setLowResourcesConnections(int lowResourcesConnections)
207 {
208 _lowResourcesConnections=lowResourcesConnections;
209 }
210
211
212
213
214
215 @Override
216 public int getLowResourcesMaxIdleTime()
217 {
218 return _lowResourcesMaxIdleTime;
219 }
220
221
222
223
224
225
226
227
228
229 @Override
230 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
231 {
232 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
233 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
234 }
235
236
237
238
239
240
241 @Override
242 protected void doStart() throws Exception
243 {
244 _manager.setSelectSets(getAcceptors());
245 _manager.setMaxIdleTime(getMaxIdleTime());
246 _manager.setLowResourcesConnections(getLowResourcesConnections());
247 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
248
249 super.doStart();
250 _manager.start();
251 }
252
253
254
255
256
257 @Override
258 protected void doStop() throws Exception
259 {
260 synchronized(this)
261 {
262 if(_manager.isRunning())
263 {
264 try
265 {
266 _manager.stop();
267 }
268 catch (Exception e)
269 {
270 LOG.warn(e);
271 }
272 }
273 }
274 super.doStop();
275 }
276
277
278 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
279 {
280 return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
281 }
282
283
284 protected void endPointClosed(SelectChannelEndPoint endpoint)
285 {
286 connectionClosed(endpoint.getConnection());
287 }
288
289
290 protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
291 {
292 return new SelectChannelHttpConnection(SelectChannelConnector.this,endpoint,getServer(),endpoint);
293 }
294
295
296 public void dump(Appendable out, String indent) throws IOException
297 {
298 out.append(String.valueOf(this)).append("\n");
299 ServerSocketChannel channel=_acceptChannel;
300 if (channel==null)
301 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{null,"CLOSED",_manager}));
302 else
303 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager}));
304 }
305
306
307
308
309 private class SelectChannelHttpConnection extends AsyncHttpConnection
310 {
311 private final SelectChannelEndPoint _endpoint;
312
313 private SelectChannelHttpConnection(Connector connector, EndPoint endpoint, Server server, SelectChannelEndPoint endpoint2)
314 {
315 super(connector,endpoint,server);
316 _endpoint = endpoint2;
317 }
318
319
320 @Override
321 public void cancelTimeout(Task task)
322 {
323 _endpoint.getSelectSet().cancelTimeout(task);
324 }
325
326
327 @Override
328 public void scheduleTimeout(Task task, long timeoutMs)
329 {
330 _endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
331 }
332 }
333
334
335
336
337 private final class ConnectorSelectorManager extends SelectorManager
338 {
339 @Override
340 public boolean dispatch(Runnable task)
341 {
342 ThreadPool pool=getThreadPool();
343 if (pool==null)
344 pool=getServer().getThreadPool();
345 return pool.dispatch(task);
346 }
347
348 @Override
349 protected void endPointClosed(final SelectChannelEndPoint endpoint)
350 {
351 SelectChannelConnector.this.endPointClosed(endpoint);
352 }
353
354 @Override
355 protected void endPointOpened(SelectChannelEndPoint endpoint)
356 {
357
358 connectionOpened(endpoint.getConnection());
359 }
360
361 @Override
362 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
363 {
364 connectionUpgraded(oldConnection,endpoint.getConnection());
365 }
366
367 @Override
368 protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
369 {
370 return SelectChannelConnector.this.newConnection(channel,endpoint);
371 }
372
373 @Override
374 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
375 {
376 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
377 }
378 }
379
380 }