1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Arrays;
23
24 import org.eclipse.jetty.continuation.Continuation;
25 import org.eclipse.jetty.io.AsyncEndPoint;
26 import org.eclipse.jetty.io.ConnectedEndPoint;
27 import org.eclipse.jetty.io.Connection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.io.nio.AsyncConnection;
30 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
31 import org.eclipse.jetty.io.nio.SelectorManager;
32 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
33 import org.eclipse.jetty.server.AsyncHttpConnection;
34 import org.eclipse.jetty.server.Request;
35 import org.eclipse.jetty.util.component.AggregateLifeCycle;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38 import org.eclipse.jetty.util.thread.ThreadPool;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class SelectChannelConnector extends AbstractNIOConnector
67 {
68 private static final Logger LOG = Log.getLogger(SelectChannelConnector.class);
69
70 protected ServerSocketChannel _acceptChannel;
71 private int _lowResourcesConnections;
72 private int _lowResourcesMaxIdleTime;
73 private int _localPort=-1;
74
75 private final SelectorManager _manager = new ConnectorSelectorManager();
76
77
78
79
80
81
82 public SelectChannelConnector()
83 {
84 _manager.setMaxIdleTime(getMaxIdleTime());
85 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
86 }
87
88
89 @Override
90 public void accept(int acceptorID) throws IOException
91 {
92 ServerSocketChannel server;
93 synchronized(this)
94 {
95 server = _acceptChannel;
96 }
97
98 if (server!=null && server.isOpen() && _manager.isStarted())
99 {
100 SocketChannel channel = server.accept();
101 channel.configureBlocking(false);
102 Socket socket = channel.socket();
103 configure(socket);
104 _manager.register(channel);
105 }
106 }
107
108
109 public void close() throws IOException
110 {
111 synchronized(this)
112 {
113 if (_acceptChannel != null)
114 _acceptChannel.close();
115 _acceptChannel = null;
116 _localPort=-2;
117 }
118 }
119
120
121 @Override
122 public void customize(EndPoint endpoint, Request request) throws IOException
123 {
124 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
125 request.setTimeStamp(System.currentTimeMillis());
126 endpoint.setMaxIdleTime(_maxIdleTime);
127 super.customize(endpoint, request);
128 }
129
130
131 @Override
132 public void persist(EndPoint endpoint) throws IOException
133 {
134 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
135 aEndp.setCheckForIdle(true);
136 super.persist(endpoint);
137 }
138
139
140 public SelectorManager getSelectorManager()
141 {
142 return _manager;
143 }
144
145
146 public synchronized Object getConnection()
147 {
148 return _acceptChannel;
149 }
150
151
152 public int getLocalPort()
153 {
154 synchronized(this)
155 {
156 return _localPort;
157 }
158 }
159
160
161 public void open() throws IOException
162 {
163 synchronized(this)
164 {
165 if (_acceptChannel == null)
166 {
167
168 _acceptChannel = ServerSocketChannel.open();
169
170 _acceptChannel.configureBlocking(true);
171
172
173 _acceptChannel.socket().setReuseAddress(getReuseAddress());
174 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
175 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
176
177 _localPort=_acceptChannel.socket().getLocalPort();
178 if (_localPort<=0)
179 throw new IOException("Server channel not bound");
180
181 }
182 }
183 }
184
185
186 @Override
187 public void setMaxIdleTime(int maxIdleTime)
188 {
189 _manager.setMaxIdleTime(maxIdleTime);
190 super.setMaxIdleTime(maxIdleTime);
191 }
192
193
194
195
196
197 public int getLowResourcesConnections()
198 {
199 return _lowResourcesConnections;
200 }
201
202
203
204
205
206
207
208
209 public void setLowResourcesConnections(int lowResourcesConnections)
210 {
211 _lowResourcesConnections=lowResourcesConnections;
212 }
213
214
215
216
217
218 @Override
219 public int getLowResourcesMaxIdleTime()
220 {
221 return _lowResourcesMaxIdleTime;
222 }
223
224
225
226
227
228
229
230
231
232 @Override
233 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
234 {
235 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
236 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
237 }
238
239
240
241
242
243
244 @Override
245 protected void doStart() throws Exception
246 {
247 _manager.setSelectSets(getAcceptors());
248 _manager.setMaxIdleTime(getMaxIdleTime());
249 _manager.setLowResourcesConnections(getLowResourcesConnections());
250 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
251
252 super.doStart();
253 _manager.start();
254 }
255
256
257
258
259
260 @Override
261 protected void doStop() throws Exception
262 {
263 synchronized(this)
264 {
265 if(_manager.isRunning())
266 {
267 try
268 {
269 _manager.stop();
270 }
271 catch (Exception e)
272 {
273 LOG.warn(e);
274 }
275 }
276 }
277 super.doStop();
278 }
279
280
281 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
282 {
283 SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
284 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
285 return endp;
286 }
287
288
289 protected void endPointClosed(SelectChannelEndPoint endpoint)
290 {
291 connectionClosed(endpoint.getConnection());
292 }
293
294
295 protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint)
296 {
297 return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer());
298 }
299
300
301 public void dump(Appendable out, String indent) throws IOException
302 {
303 super.dump(out, indent);
304 ServerSocketChannel channel;
305 synchronized (this)
306 {
307 channel=_acceptChannel;
308 }
309 if (channel==null)
310 AggregateLifeCycle.dump(out,indent,Arrays.asList(null,"CLOSED",_manager));
311 else
312 AggregateLifeCycle.dump(out,indent,Arrays.asList(channel,channel.isOpen()?"OPEN":"CLOSED",_manager));
313 }
314
315
316
317
318 private final class ConnectorSelectorManager extends SelectorManager
319 {
320 @Override
321 public boolean dispatch(Runnable task)
322 {
323 ThreadPool pool=getThreadPool();
324 if (pool==null)
325 pool=getServer().getThreadPool();
326 return pool.dispatch(task);
327 }
328
329 @Override
330 protected void endPointClosed(final SelectChannelEndPoint endpoint)
331 {
332 SelectChannelConnector.this.endPointClosed(endpoint);
333 }
334
335 @Override
336 protected void endPointOpened(SelectChannelEndPoint endpoint)
337 {
338
339 connectionOpened(endpoint.getConnection());
340 }
341
342 @Override
343 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
344 {
345 connectionUpgraded(oldConnection,endpoint.getConnection());
346 }
347
348 @Override
349 public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment)
350 {
351 return SelectChannelConnector.this.newConnection(channel,endpoint);
352 }
353
354 @Override
355 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
356 {
357 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
358 }
359 }
360
361 }