1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22
23 import org.eclipse.jetty.continuation.Continuation;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EndPoint;
27 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
28 import org.eclipse.jetty.io.nio.SelectorManager;
29 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
30 import org.eclipse.jetty.server.HttpConnection;
31 import org.eclipse.jetty.server.Request;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.thread.Timeout.Task;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 public class SelectChannelConnector extends AbstractNIOConnector
62 {
63 protected ServerSocketChannel _acceptChannel;
64 private int _lowResourcesConnections;
65 private int _lowResourcesMaxIdleTime;
66 private int _localPort=-1;
67
68 private final SelectorManager _manager = new SelectorManager()
69 {
70 @Override
71 protected SocketChannel acceptChannel(SelectionKey key) throws IOException
72 {
73
74 SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
75 if (channel==null)
76 return null;
77 channel.configureBlocking(false);
78 Socket socket = channel.socket();
79 configure(socket);
80 return channel;
81 }
82
83 @Override
84 public boolean dispatch(Runnable task)
85 {
86 return getThreadPool().dispatch(task);
87 }
88
89 @Override
90 protected void endPointClosed(final SelectChannelEndPoint endpoint)
91 {
92 connectionClosed(endpoint.getConnection());
93 }
94
95 @Override
96 protected void endPointOpened(SelectChannelEndPoint endpoint)
97 {
98
99 connectionOpened(endpoint.getConnection());
100 }
101
102 @Override
103 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
104 {
105 connectionUpgraded(oldConnection,endpoint.getConnection());
106 }
107
108 @Override
109 protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
110 {
111 return SelectChannelConnector.this.newConnection(channel,endpoint);
112 }
113
114 @Override
115 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
116 {
117 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
118 }
119 };
120
121
122
123
124
125
126 public SelectChannelConnector()
127 {
128 }
129
130
131 @Override
132 public void accept(int acceptorID) throws IOException
133 {
134 _manager.doSelect(acceptorID);
135 }
136
137
138 public void close() throws IOException
139 {
140 synchronized(this)
141 {
142 if(_manager.isRunning())
143 {
144 try
145 {
146 _manager.stop();
147 }
148 catch (Exception e)
149 {
150 Log.warn(e);
151 }
152 }
153 if (_acceptChannel != null)
154 _acceptChannel.close();
155 _acceptChannel = null;
156 _localPort=-2;
157 }
158 }
159
160
161 @Override
162 public void customize(EndPoint endpoint, Request request) throws IOException
163 {
164 SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
165 cep.cancelIdle();
166 request.setTimeStamp(cep.getSelectSet().getNow());
167 super.customize(endpoint, request);
168 }
169
170
171 @Override
172 public void persist(EndPoint endpoint) throws IOException
173 {
174 ((SelectChannelEndPoint)endpoint).scheduleIdle();
175 super.persist(endpoint);
176 }
177
178
179 public Object getConnection()
180 {
181 return _acceptChannel;
182 }
183
184
185 public int getLocalPort()
186 {
187 synchronized(this)
188 {
189 return _localPort;
190 }
191 }
192
193
194 public void open() throws IOException
195 {
196 synchronized(this)
197 {
198 if (_acceptChannel == null)
199 {
200
201 _acceptChannel = ServerSocketChannel.open();
202
203 _acceptChannel.configureBlocking(true);
204
205
206 _acceptChannel.socket().setReuseAddress(getReuseAddress());
207 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
208 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
209
210 _localPort=_acceptChannel.socket().getLocalPort();
211 if (_localPort<=0)
212 throw new IOException("Server channel not bound");
213
214
215 _acceptChannel.configureBlocking(false);
216
217 }
218 }
219 }
220
221
222 @Override
223 public void setMaxIdleTime(int maxIdleTime)
224 {
225 _manager.setMaxIdleTime(maxIdleTime);
226 super.setMaxIdleTime(maxIdleTime);
227 }
228
229
230
231
232
233 public int getLowResourcesConnections()
234 {
235 return _lowResourcesConnections;
236 }
237
238
239
240
241
242
243
244
245 public void setLowResourcesConnections(int lowResourcesConnections)
246 {
247 _lowResourcesConnections=lowResourcesConnections;
248 }
249
250
251
252
253
254 @Override
255 public int getLowResourcesMaxIdleTime()
256 {
257 return _lowResourcesMaxIdleTime;
258 }
259
260
261
262
263
264
265
266
267
268 @Override
269 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
270 {
271 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
272 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
273 }
274
275
276
277
278
279
280 @Override
281 protected void doStart() throws Exception
282 {
283 _manager.setSelectSets(getAcceptors());
284 _manager.setMaxIdleTime(getMaxIdleTime());
285 _manager.setLowResourcesConnections(getLowResourcesConnections());
286 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
287 _manager.start();
288 open();
289 _manager.register(_acceptChannel);
290 super.doStart();
291 }
292
293
294
295
296
297 @Override
298 protected void doStop() throws Exception
299 {
300 super.doStop();
301 }
302
303
304 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
305 {
306 return new SelectChannelEndPoint(channel,selectSet,key);
307 }
308
309
310 protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
311 {
312 return new HttpConnection(SelectChannelConnector.this,endpoint,getServer())
313 {
314
315 @Override
316 public void cancelTimeout(Task task)
317 {
318 endpoint.getSelectSet().cancelTimeout(task);
319 }
320
321
322 @Override
323 public void scheduleTimeout(Task task, long timeoutMs)
324 {
325 endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
326 }
327 };
328 }
329
330
331 public void dump()
332 {
333 Log.info("channel "+_acceptChannel+(_acceptChannel.isOpen()?" is open":" is closed"));
334 _manager.dump();
335 }
336 }