1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.ByteChannel;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Set;
23
24 import org.eclipse.jetty.http.HttpException;
25 import org.eclipse.jetty.io.Buffer;
26 import org.eclipse.jetty.io.ConnectedEndPoint;
27 import org.eclipse.jetty.io.Connection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.io.EofException;
30 import org.eclipse.jetty.io.nio.ChannelEndPoint;
31 import org.eclipse.jetty.server.BlockingHttpConnection;
32 import org.eclipse.jetty.server.HttpConnection;
33 import org.eclipse.jetty.server.Request;
34 import org.eclipse.jetty.util.ConcurrentHashSet;
35 import org.eclipse.jetty.util.log.Log;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public class BlockingChannelConnector extends AbstractNIOConnector
51 {
52 private transient ServerSocketChannel _acceptChannel;
53 private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
54
55
56
57
58
59
60 public BlockingChannelConnector()
61 {
62 }
63
64
65 public Object getConnection()
66 {
67 return _acceptChannel;
68 }
69
70
71
72
73
74 @Override
75 protected void doStart() throws Exception
76 {
77 super.doStart();
78 getThreadPool().dispatch(new Runnable()
79 {
80
81 public void run()
82 {
83 while (isRunning())
84 {
85 try
86 {
87 Thread.sleep(400);
88 long now=System.currentTimeMillis();
89 for (BlockingChannelEndPoint endp : _endpoints)
90 {
91 endp.checkIdleTimestamp(now);
92 }
93 }
94 catch(InterruptedException e)
95 {
96 Log.ignore(e);
97 }
98 catch(Exception e)
99 {
100 Log.warn(e);
101 }
102 }
103 }
104
105 });
106
107 }
108
109
110
111 public void open() throws IOException
112 {
113
114 _acceptChannel= ServerSocketChannel.open();
115 _acceptChannel.configureBlocking(true);
116
117
118 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
119 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
120 }
121
122
123 public void close() throws IOException
124 {
125 if (_acceptChannel != null)
126 _acceptChannel.close();
127 _acceptChannel=null;
128 }
129
130
131 @Override
132 public void accept(int acceptorID)
133 throws IOException, InterruptedException
134 {
135 SocketChannel channel = _acceptChannel.accept();
136 channel.configureBlocking(true);
137 Socket socket=channel.socket();
138 configure(socket);
139
140 BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
141 connection.dispatch();
142 }
143
144
145 @Override
146 public void customize(EndPoint endpoint, Request request)
147 throws IOException
148 {
149 super.customize(endpoint, request);
150 endpoint.setMaxIdleTime(_maxIdleTime);
151 configure(((SocketChannel)endpoint.getTransport()).socket());
152 }
153
154
155
156 public int getLocalPort()
157 {
158 if (_acceptChannel==null || !_acceptChannel.isOpen())
159 return -1;
160 return _acceptChannel.socket().getLocalPort();
161 }
162
163
164
165
166 private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
167 {
168 private Connection _connection;
169 private int _timeout;
170 private volatile long _idleTimestamp;
171
172 BlockingChannelEndPoint(ByteChannel channel)
173 throws IOException
174 {
175 super(channel,BlockingChannelConnector.this._maxIdleTime);
176 _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
177 }
178
179
180
181
182
183 public Connection getConnection()
184 {
185 return _connection;
186 }
187
188
189 public void setConnection(Connection connection)
190 {
191 _connection=connection;
192 }
193
194
195 public void checkIdleTimestamp(long now)
196 {
197 if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
198 {
199 idleExpired();
200 }
201 }
202
203
204 protected void idleExpired()
205 {
206 try
207 {
208 close();
209 }
210 catch (IOException e)
211 {
212 Log.ignore(e);
213 }
214 }
215
216
217 void dispatch() throws IOException
218 {
219 if (!getThreadPool().dispatch(this))
220 {
221 Log.warn("dispatch failed for {}",_connection);
222 BlockingChannelEndPoint.this.close();
223 }
224 }
225
226
227
228
229
230 @Override
231 public int fill(Buffer buffer) throws IOException
232 {
233 _idleTimestamp=System.currentTimeMillis();
234 return super.fill(buffer);
235 }
236
237
238
239
240
241 @Override
242 public int flush(Buffer buffer) throws IOException
243 {
244 _idleTimestamp=System.currentTimeMillis();
245 return super.flush(buffer);
246 }
247
248
249
250
251
252 @Override
253 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
254 {
255 _idleTimestamp=System.currentTimeMillis();
256 return super.flush(header,buffer,trailer);
257 }
258
259
260 public void run()
261 {
262 try
263 {
264 _timeout=getMaxIdleTime();
265 connectionOpened(_connection);
266 _endpoints.add(this);
267
268 while (isOpen())
269 {
270 _idleTimestamp=System.currentTimeMillis();
271 if (_connection.isIdle())
272 {
273 if (getServer().getThreadPool().isLowOnThreads())
274 {
275 int lrmit = getLowResourcesMaxIdleTime();
276 if (lrmit>=0 && _timeout!= lrmit)
277 {
278 _timeout=lrmit;
279 }
280 }
281 }
282 else
283 {
284 if (_timeout!=getMaxIdleTime())
285 {
286 _timeout=getMaxIdleTime();
287 }
288 }
289
290 _connection = _connection.handle();
291
292 }
293 }
294 catch (EofException e)
295 {
296 Log.debug("EOF", e);
297 try{BlockingChannelEndPoint.this.close();}
298 catch(IOException e2){Log.ignore(e2);}
299 }
300 catch (HttpException e)
301 {
302 Log.debug("BAD", e);
303 try{BlockingChannelEndPoint.this.close();}
304 catch(IOException e2){Log.ignore(e2);}
305 }
306 catch(Throwable e)
307 {
308 Log.warn("handle failed",e);
309 try{BlockingChannelEndPoint.this.close();}
310 catch(IOException e2){Log.ignore(e2);}
311 }
312 finally
313 {
314 connectionClosed(_connection);
315 _endpoints.remove(this);
316
317
318 try
319 {
320 if (!_socket.isClosed())
321 {
322 long timestamp=System.currentTimeMillis();
323 int max_idle=getMaxIdleTime();
324
325 _socket.setSoTimeout(getMaxIdleTime());
326 int c=0;
327 do
328 {
329 c = _socket.getInputStream().read();
330 }
331 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
332 if (!_socket.isClosed())
333 _socket.close();
334 }
335 }
336 catch(IOException e)
337 {
338 Log.ignore(e);
339 }
340 }
341 }
342 }
343 }