1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server.nio;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.Socket;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.ServerSocketChannel;
26 import java.nio.channels.SocketChannel;
27
28 import org.eclipse.jetty.continuation.Continuation;
29 import org.eclipse.jetty.io.AsyncEndPoint;
30 import org.eclipse.jetty.io.ConnectedEndPoint;
31 import org.eclipse.jetty.io.Connection;
32 import org.eclipse.jetty.io.EndPoint;
33 import org.eclipse.jetty.io.nio.AsyncConnection;
34 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
35 import org.eclipse.jetty.io.nio.SelectorManager;
36 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
37 import org.eclipse.jetty.server.AsyncHttpConnection;
38 import org.eclipse.jetty.server.Request;
39 import org.eclipse.jetty.util.thread.ThreadPool;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public class SelectChannelConnector extends AbstractNIOConnector
68 {
69 protected ServerSocketChannel _acceptChannel;
70 private int _lowResourcesConnections;
71 private int _lowResourcesMaxIdleTime;
72 private int _localPort=-1;
73
74 private final SelectorManager _manager = new ConnectorSelectorManager();
75
76
77
78
79
80
81 public SelectChannelConnector()
82 {
83 _manager.setMaxIdleTime(getMaxIdleTime());
84 addBean(_manager,true);
85 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
86 }
87
88 @Override
89 public void setThreadPool(ThreadPool pool)
90 {
91 super.setThreadPool(pool);
92
93 removeBean(_manager);
94 addBean(_manager,true);
95 }
96
97
98 @Override
99 public void accept(int acceptorID) throws IOException
100 {
101 ServerSocketChannel server;
102 synchronized(this)
103 {
104 server = _acceptChannel;
105 }
106
107 if (server!=null && server.isOpen() && _manager.isStarted())
108 {
109 SocketChannel channel = server.accept();
110 channel.configureBlocking(false);
111 Socket socket = channel.socket();
112 configure(socket);
113 _manager.register(channel);
114 }
115 }
116
117
118 public void close() throws IOException
119 {
120 synchronized(this)
121 {
122 if (_acceptChannel != null)
123 {
124 removeBean(_acceptChannel);
125 if (_acceptChannel.isOpen())
126 _acceptChannel.close();
127 }
128 _acceptChannel = null;
129 _localPort=-2;
130 }
131 }
132
133
134 @Override
135 public void customize(EndPoint endpoint, Request request) throws IOException
136 {
137 request.setTimeStamp(System.currentTimeMillis());
138 endpoint.setMaxIdleTime(_maxIdleTime);
139 super.customize(endpoint, request);
140 }
141
142
143 @Override
144 public void persist(EndPoint endpoint) throws IOException
145 {
146 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
147 aEndp.setCheckForIdle(true);
148 super.persist(endpoint);
149 }
150
151
152 public SelectorManager getSelectorManager()
153 {
154 return _manager;
155 }
156
157
158 public synchronized Object getConnection()
159 {
160 return _acceptChannel;
161 }
162
163
164 public int getLocalPort()
165 {
166 synchronized(this)
167 {
168 return _localPort;
169 }
170 }
171
172
173 public void open() throws IOException
174 {
175 synchronized(this)
176 {
177 if (_acceptChannel == null)
178 {
179
180 _acceptChannel = ServerSocketChannel.open();
181
182 _acceptChannel.configureBlocking(true);
183
184
185 _acceptChannel.socket().setReuseAddress(getReuseAddress());
186 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
187 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
188
189 _localPort=_acceptChannel.socket().getLocalPort();
190 if (_localPort<=0)
191 throw new IOException("Server channel not bound");
192
193 addBean(_acceptChannel);
194 }
195 }
196 }
197
198
199 @Override
200 public void setMaxIdleTime(int maxIdleTime)
201 {
202 _manager.setMaxIdleTime(maxIdleTime);
203 super.setMaxIdleTime(maxIdleTime);
204 }
205
206
207
208
209
210 public int getLowResourcesConnections()
211 {
212 return _lowResourcesConnections;
213 }
214
215
216
217
218
219
220
221
222 public void setLowResourcesConnections(int lowResourcesConnections)
223 {
224 _lowResourcesConnections=lowResourcesConnections;
225 }
226
227
228
229
230
231 @Override
232 public int getLowResourcesMaxIdleTime()
233 {
234 return _lowResourcesMaxIdleTime;
235 }
236
237
238
239
240
241
242
243
244
245 @Override
246 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
247 {
248 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
249 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
250 }
251
252
253
254
255
256
257 @Override
258 protected void doStart() throws Exception
259 {
260 _manager.setSelectSets(getAcceptors());
261 _manager.setMaxIdleTime(getMaxIdleTime());
262 _manager.setLowResourcesConnections(getLowResourcesConnections());
263 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
264
265 super.doStart();
266 }
267
268
269 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
270 {
271 SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
272 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
273 return endp;
274 }
275
276
277 protected void endPointClosed(SelectChannelEndPoint endpoint)
278 {
279 connectionClosed(endpoint.getConnection());
280 }
281
282
283 protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint)
284 {
285 return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer());
286 }
287
288
289
290
291
292 private final class ConnectorSelectorManager extends SelectorManager
293 {
294 @Override
295 public boolean dispatch(Runnable task)
296 {
297 ThreadPool pool=getThreadPool();
298 if (pool==null)
299 pool=getServer().getThreadPool();
300 return pool.dispatch(task);
301 }
302
303 @Override
304 protected void endPointClosed(final SelectChannelEndPoint endpoint)
305 {
306 SelectChannelConnector.this.endPointClosed(endpoint);
307 }
308
309 @Override
310 protected void endPointOpened(SelectChannelEndPoint endpoint)
311 {
312
313 connectionOpened(endpoint.getConnection());
314 }
315
316 @Override
317 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
318 {
319 connectionUpgraded(oldConnection,endpoint.getConnection());
320 }
321
322 @Override
323 public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment)
324 {
325 return SelectChannelConnector.this.newConnection(channel,endpoint);
326 }
327
328 @Override
329 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
330 {
331 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
332 }
333 }
334 }