1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Arrays;
23
24 import org.eclipse.jetty.continuation.Continuation;
25 import org.eclipse.jetty.io.ConnectedEndPoint;
26 import org.eclipse.jetty.io.Connection;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
29 import org.eclipse.jetty.io.nio.SelectorManager;
30 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
31 import org.eclipse.jetty.server.AsyncHttpConnection;
32 import org.eclipse.jetty.server.Connector;
33 import org.eclipse.jetty.server.HttpConnection;
34 import org.eclipse.jetty.server.Request;
35 import org.eclipse.jetty.server.Server;
36 import org.eclipse.jetty.util.component.AggregateLifeCycle;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.log.Logger;
39 import org.eclipse.jetty.util.thread.ThreadPool;
40 import org.eclipse.jetty.util.thread.Timeout.Task;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public class SelectChannelConnector extends AbstractNIOConnector
69 {
70 private static final Logger LOG = Log.getLogger(SelectChannelConnector.class);
71
72 protected ServerSocketChannel _acceptChannel;
73 private int _lowResourcesConnections;
74 private int _lowResourcesMaxIdleTime;
75 private int _localPort=-1;
76
77 private final SelectorManager _manager = new ConnectorSelectorManager();
78
79
80
81
82
83
84 public SelectChannelConnector()
85 {
86 _manager.setMaxIdleTime(getMaxIdleTime());
87 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
88 }
89
90
91 @Override
92 public void accept(int acceptorID) throws IOException
93 {
94 ServerSocketChannel server = _acceptChannel;
95 if (server!=null && server.isOpen())
96 {
97 SocketChannel channel = _acceptChannel.accept();
98 channel.configureBlocking(false);
99 Socket socket = channel.socket();
100 configure(socket);
101 _manager.register(channel);
102 }
103 }
104
105
106 public void close() throws IOException
107 {
108 synchronized(this)
109 {
110 if (_acceptChannel != null)
111 _acceptChannel.close();
112 _acceptChannel = null;
113 _localPort=-2;
114 }
115 }
116
117
118 @Override
119 public void customize(EndPoint endpoint, Request request) throws IOException
120 {
121 SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
122 cep.cancelIdle();
123 request.setTimeStamp(cep.getSelectSet().getNow());
124 endpoint.setMaxIdleTime(_maxIdleTime);
125 super.customize(endpoint, request);
126 }
127
128
129 @Override
130 public void persist(EndPoint endpoint) throws IOException
131 {
132 ((SelectChannelEndPoint)endpoint).scheduleIdle();
133 super.persist(endpoint);
134 }
135
136
137 public Object getConnection()
138 {
139 return _acceptChannel;
140 }
141
142
143 public int getLocalPort()
144 {
145 synchronized(this)
146 {
147 return _localPort;
148 }
149 }
150
151
152 public void open() throws IOException
153 {
154 synchronized(this)
155 {
156 if (_acceptChannel == null)
157 {
158
159 _acceptChannel = ServerSocketChannel.open();
160
161 _acceptChannel.configureBlocking(true);
162
163
164 _acceptChannel.socket().setReuseAddress(getReuseAddress());
165 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
166 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
167
168 _localPort=_acceptChannel.socket().getLocalPort();
169 if (_localPort<=0)
170 throw new IOException("Server channel not bound");
171
172 }
173 }
174 }
175
176
177 @Override
178 public void setMaxIdleTime(int maxIdleTime)
179 {
180 _manager.setMaxIdleTime(maxIdleTime);
181 super.setMaxIdleTime(maxIdleTime);
182 }
183
184
185
186
187
188 public int getLowResourcesConnections()
189 {
190 return _lowResourcesConnections;
191 }
192
193
194
195
196
197
198
199
200 public void setLowResourcesConnections(int lowResourcesConnections)
201 {
202 _lowResourcesConnections=lowResourcesConnections;
203 }
204
205
206
207
208
209 @Override
210 public int getLowResourcesMaxIdleTime()
211 {
212 return _lowResourcesMaxIdleTime;
213 }
214
215
216
217
218
219
220
221
222
223 @Override
224 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
225 {
226 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
227 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
228 }
229
230
231
232
233
234
235 @Override
236 protected void doStart() throws Exception
237 {
238 _manager.setSelectSets(getAcceptors());
239 _manager.setMaxIdleTime(getMaxIdleTime());
240 _manager.setLowResourcesConnections(getLowResourcesConnections());
241 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
242 _manager.start();
243
244 super.doStart();
245 }
246
247
248
249
250
251 @Override
252 protected void doStop() throws Exception
253 {
254 synchronized(this)
255 {
256 if(_manager.isRunning())
257 {
258 try
259 {
260 _manager.stop();
261 }
262 catch (Exception e)
263 {
264 LOG.warn(e);
265 }
266 }
267 }
268 super.doStop();
269 }
270
271
272 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
273 {
274 return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
275 }
276
277
278 protected void endPointClosed(SelectChannelEndPoint endpoint)
279 {
280 connectionClosed(endpoint.getConnection());
281 }
282
283
284 protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
285 {
286 return new SelectChannelHttpConnection(SelectChannelConnector.this,endpoint,getServer(),endpoint);
287 }
288
289
290 public void dump(Appendable out, String indent) throws IOException
291 {
292 out.append(String.valueOf(this)).append("\n");
293 ServerSocketChannel channel=_acceptChannel;
294 if (channel==null)
295 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{null,"CLOSED",_manager}));
296 else
297 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager}));
298 }
299
300
301
302
303 private class SelectChannelHttpConnection extends AsyncHttpConnection
304 {
305 private final SelectChannelEndPoint _endpoint;
306
307 private SelectChannelHttpConnection(Connector connector, EndPoint endpoint, Server server, SelectChannelEndPoint endpoint2)
308 {
309 super(connector,endpoint,server);
310 _endpoint = endpoint2;
311 }
312
313
314 @Override
315 public void cancelTimeout(Task task)
316 {
317 _endpoint.getSelectSet().cancelTimeout(task);
318 }
319
320
321 @Override
322 public void scheduleTimeout(Task task, long timeoutMs)
323 {
324 _endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
325 }
326 }
327
328
329
330
331 private final class ConnectorSelectorManager extends SelectorManager
332 {
333 @Override
334 public boolean dispatch(Runnable task)
335 {
336 ThreadPool pool=getThreadPool();
337 if (pool==null)
338 pool=getServer().getThreadPool();
339 return pool.dispatch(task);
340 }
341
342 @Override
343 protected void endPointClosed(final SelectChannelEndPoint endpoint)
344 {
345 SelectChannelConnector.this.endPointClosed(endpoint);
346 }
347
348 @Override
349 protected void endPointOpened(SelectChannelEndPoint endpoint)
350 {
351
352 connectionOpened(endpoint.getConnection());
353 }
354
355 @Override
356 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
357 {
358 connectionUpgraded(oldConnection,endpoint.getConnection());
359 }
360
361 @Override
362 protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
363 {
364 return SelectChannelConnector.this.newConnection(channel,endpoint);
365 }
366
367 @Override
368 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
369 {
370 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
371 }
372 }
373
374 }