1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.ByteChannel;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Set;
23
24 import org.eclipse.jetty.http.HttpException;
25 import org.eclipse.jetty.io.Buffer;
26 import org.eclipse.jetty.io.ConnectedEndPoint;
27 import org.eclipse.jetty.io.Connection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.io.EofException;
30 import org.eclipse.jetty.io.nio.ChannelEndPoint;
31 import org.eclipse.jetty.server.BlockingHttpConnection;
32 import org.eclipse.jetty.server.Request;
33 import org.eclipse.jetty.util.ConcurrentHashSet;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public class BlockingChannelConnector extends AbstractNIOConnector
51 {
52 private static final Logger LOG = Log.getLogger(BlockingChannelConnector.class);
53
54 private transient ServerSocketChannel _acceptChannel;
55 private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
56
57
58
59
60
61
62 public BlockingChannelConnector()
63 {
64 }
65
66
67 public Object getConnection()
68 {
69 return _acceptChannel;
70 }
71
72
73
74
75
76 @Override
77 protected void doStart() throws Exception
78 {
79 super.doStart();
80 getThreadPool().dispatch(new Runnable()
81 {
82
83 public void run()
84 {
85 while (isRunning())
86 {
87 try
88 {
89 Thread.sleep(400);
90 long now=System.currentTimeMillis();
91 for (BlockingChannelEndPoint endp : _endpoints)
92 {
93 endp.checkIdleTimestamp(now);
94 }
95 }
96 catch(InterruptedException e)
97 {
98 LOG.ignore(e);
99 }
100 catch(Exception e)
101 {
102 LOG.warn(e);
103 }
104 }
105 }
106
107 });
108
109 }
110
111
112
113 public void open() throws IOException
114 {
115
116 _acceptChannel= ServerSocketChannel.open();
117 _acceptChannel.configureBlocking(true);
118
119
120 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
121 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
122 }
123
124
125 public void close() throws IOException
126 {
127 if (_acceptChannel != null)
128 _acceptChannel.close();
129 _acceptChannel=null;
130 }
131
132
133 @Override
134 public void accept(int acceptorID)
135 throws IOException, InterruptedException
136 {
137 SocketChannel channel = _acceptChannel.accept();
138 channel.configureBlocking(true);
139 Socket socket=channel.socket();
140 configure(socket);
141
142 BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
143 connection.dispatch();
144 }
145
146
147 @Override
148 public void customize(EndPoint endpoint, Request request)
149 throws IOException
150 {
151 super.customize(endpoint, request);
152 endpoint.setMaxIdleTime(_maxIdleTime);
153 configure(((SocketChannel)endpoint.getTransport()).socket());
154 }
155
156
157
158 public int getLocalPort()
159 {
160 if (_acceptChannel==null || !_acceptChannel.isOpen())
161 return -1;
162 return _acceptChannel.socket().getLocalPort();
163 }
164
165
166
167
168 private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
169 {
170 private Connection _connection;
171 private int _timeout;
172 private volatile long _idleTimestamp;
173
174 BlockingChannelEndPoint(ByteChannel channel)
175 throws IOException
176 {
177 super(channel,BlockingChannelConnector.this._maxIdleTime);
178 _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
179 }
180
181
182
183
184
185 public Connection getConnection()
186 {
187 return _connection;
188 }
189
190
191 public void setConnection(Connection connection)
192 {
193 _connection=connection;
194 }
195
196
197 public void checkIdleTimestamp(long now)
198 {
199 if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
200 {
201 idleExpired();
202 }
203 }
204
205
206 protected void idleExpired()
207 {
208 try
209 {
210 super.close();
211 }
212 catch (IOException e)
213 {
214 LOG.ignore(e);
215 }
216 }
217
218
219 void dispatch() throws IOException
220 {
221 if (!getThreadPool().dispatch(this))
222 {
223 LOG.warn("dispatch failed for {}",_connection);
224 super.close();
225 }
226 }
227
228
229
230
231
232 @Override
233 public int fill(Buffer buffer) throws IOException
234 {
235 _idleTimestamp=System.currentTimeMillis();
236 return super.fill(buffer);
237 }
238
239
240
241
242
243 @Override
244 public int flush(Buffer buffer) throws IOException
245 {
246 _idleTimestamp=System.currentTimeMillis();
247 return super.flush(buffer);
248 }
249
250
251
252
253
254 @Override
255 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
256 {
257 _idleTimestamp=System.currentTimeMillis();
258 return super.flush(header,buffer,trailer);
259 }
260
261
262 public void run()
263 {
264 try
265 {
266 _timeout=getMaxIdleTime();
267 connectionOpened(_connection);
268 _endpoints.add(this);
269
270 while (isOpen())
271 {
272 _idleTimestamp=System.currentTimeMillis();
273 if (_connection.isIdle())
274 {
275 if (getServer().getThreadPool().isLowOnThreads())
276 {
277 int lrmit = getLowResourcesMaxIdleTime();
278 if (lrmit>=0 && _timeout!= lrmit)
279 {
280 _timeout=lrmit;
281 }
282 }
283 }
284 else
285 {
286 if (_timeout!=getMaxIdleTime())
287 {
288 _timeout=getMaxIdleTime();
289 }
290 }
291
292 _connection = _connection.handle();
293
294 }
295 }
296 catch (EofException e)
297 {
298 LOG.debug("EOF", e);
299 try{BlockingChannelEndPoint.this.close();}
300 catch(IOException e2){LOG.ignore(e2);}
301 }
302 catch (HttpException e)
303 {
304 LOG.debug("BAD", e);
305 try{super.close();}
306 catch(IOException e2){LOG.ignore(e2);}
307 }
308 catch(Throwable e)
309 {
310 LOG.warn("handle failed",e);
311 try{super.close();}
312 catch(IOException e2){LOG.ignore(e2);}
313 }
314 finally
315 {
316 connectionClosed(_connection);
317 _endpoints.remove(this);
318
319
320 try
321 {
322 if (!_socket.isClosed())
323 {
324 long timestamp=System.currentTimeMillis();
325 int max_idle=getMaxIdleTime();
326
327 _socket.setSoTimeout(getMaxIdleTime());
328 int c=0;
329 do
330 {
331 c = _socket.getInputStream().read();
332 }
333 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
334 if (!_socket.isClosed())
335 _socket.close();
336 }
337 }
338 catch(IOException e)
339 {
340 LOG.ignore(e);
341 }
342 }
343 }
344 }
345 }