1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.ByteChannel;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Set;
23
24 import org.eclipse.jetty.http.HttpException;
25 import org.eclipse.jetty.io.Buffer;
26 import org.eclipse.jetty.io.ConnectedEndPoint;
27 import org.eclipse.jetty.io.Connection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.io.EofException;
30 import org.eclipse.jetty.io.nio.ChannelEndPoint;
31 import org.eclipse.jetty.server.BlockingHttpConnection;
32 import org.eclipse.jetty.server.HttpConnection;
33 import org.eclipse.jetty.server.Request;
34 import org.eclipse.jetty.util.ConcurrentHashSet;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public class BlockingChannelConnector extends AbstractNIOConnector
52 {
53 private static final Logger LOG = Log.getLogger(BlockingChannelConnector.class);
54
55 private transient ServerSocketChannel _acceptChannel;
56 private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
57
58
59
60
61
62
63 public BlockingChannelConnector()
64 {
65 }
66
67
68 public Object getConnection()
69 {
70 return _acceptChannel;
71 }
72
73
74
75
76
77 @Override
78 protected void doStart() throws Exception
79 {
80 super.doStart();
81 getThreadPool().dispatch(new Runnable()
82 {
83
84 public void run()
85 {
86 while (isRunning())
87 {
88 try
89 {
90 Thread.sleep(400);
91 long now=System.currentTimeMillis();
92 for (BlockingChannelEndPoint endp : _endpoints)
93 {
94 endp.checkIdleTimestamp(now);
95 }
96 }
97 catch(InterruptedException e)
98 {
99 LOG.ignore(e);
100 }
101 catch(Exception e)
102 {
103 LOG.warn(e);
104 }
105 }
106 }
107
108 });
109
110 }
111
112
113
114 public void open() throws IOException
115 {
116
117 _acceptChannel= ServerSocketChannel.open();
118 _acceptChannel.configureBlocking(true);
119
120
121 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
122 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
123 }
124
125
126 public void close() throws IOException
127 {
128 if (_acceptChannel != null)
129 _acceptChannel.close();
130 _acceptChannel=null;
131 }
132
133
134 @Override
135 public void accept(int acceptorID)
136 throws IOException, InterruptedException
137 {
138 SocketChannel channel = _acceptChannel.accept();
139 channel.configureBlocking(true);
140 Socket socket=channel.socket();
141 configure(socket);
142
143 BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
144 connection.dispatch();
145 }
146
147
148 @Override
149 public void customize(EndPoint endpoint, Request request)
150 throws IOException
151 {
152 super.customize(endpoint, request);
153 endpoint.setMaxIdleTime(_maxIdleTime);
154 configure(((SocketChannel)endpoint.getTransport()).socket());
155 }
156
157
158
159 public int getLocalPort()
160 {
161 if (_acceptChannel==null || !_acceptChannel.isOpen())
162 return -1;
163 return _acceptChannel.socket().getLocalPort();
164 }
165
166
167
168
169 private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
170 {
171 private Connection _connection;
172 private int _timeout;
173 private volatile long _idleTimestamp;
174
175 BlockingChannelEndPoint(ByteChannel channel)
176 throws IOException
177 {
178 super(channel,BlockingChannelConnector.this._maxIdleTime);
179 _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
180 }
181
182
183
184
185
186 public Connection getConnection()
187 {
188 return _connection;
189 }
190
191
192 public void setConnection(Connection connection)
193 {
194 _connection=connection;
195 }
196
197
198 public void checkIdleTimestamp(long now)
199 {
200 if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
201 {
202 idleExpired();
203 }
204 }
205
206
207 protected void idleExpired()
208 {
209 try
210 {
211 close();
212 }
213 catch (IOException e)
214 {
215 LOG.ignore(e);
216 }
217 }
218
219
220 void dispatch() throws IOException
221 {
222 if (!getThreadPool().dispatch(this))
223 {
224 LOG.warn("dispatch failed for {}",_connection);
225 BlockingChannelEndPoint.this.close();
226 }
227 }
228
229
230
231
232
233 @Override
234 public int fill(Buffer buffer) throws IOException
235 {
236 _idleTimestamp=System.currentTimeMillis();
237 return super.fill(buffer);
238 }
239
240
241
242
243
244 @Override
245 public int flush(Buffer buffer) throws IOException
246 {
247 _idleTimestamp=System.currentTimeMillis();
248 return super.flush(buffer);
249 }
250
251
252
253
254
255 @Override
256 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
257 {
258 _idleTimestamp=System.currentTimeMillis();
259 return super.flush(header,buffer,trailer);
260 }
261
262
263 public void run()
264 {
265 try
266 {
267 _timeout=getMaxIdleTime();
268 connectionOpened(_connection);
269 _endpoints.add(this);
270
271 while (isOpen())
272 {
273 _idleTimestamp=System.currentTimeMillis();
274 if (_connection.isIdle())
275 {
276 if (getServer().getThreadPool().isLowOnThreads())
277 {
278 int lrmit = getLowResourcesMaxIdleTime();
279 if (lrmit>=0 && _timeout!= lrmit)
280 {
281 _timeout=lrmit;
282 }
283 }
284 }
285 else
286 {
287 if (_timeout!=getMaxIdleTime())
288 {
289 _timeout=getMaxIdleTime();
290 }
291 }
292
293 _connection = _connection.handle();
294
295 }
296 }
297 catch (EofException e)
298 {
299 LOG.debug("EOF", e);
300 try{BlockingChannelEndPoint.this.close();}
301 catch(IOException e2){LOG.ignore(e2);}
302 }
303 catch (HttpException e)
304 {
305 LOG.debug("BAD", e);
306 try{BlockingChannelEndPoint.this.close();}
307 catch(IOException e2){LOG.ignore(e2);}
308 }
309 catch(Throwable e)
310 {
311 LOG.warn("handle failed",e);
312 try{BlockingChannelEndPoint.this.close();}
313 catch(IOException e2){LOG.ignore(e2);}
314 }
315 finally
316 {
317 connectionClosed(_connection);
318 _endpoints.remove(this);
319
320
321 try
322 {
323 if (!_socket.isClosed())
324 {
325 long timestamp=System.currentTimeMillis();
326 int max_idle=getMaxIdleTime();
327
328 _socket.setSoTimeout(getMaxIdleTime());
329 int c=0;
330 do
331 {
332 c = _socket.getInputStream().read();
333 }
334 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
335 if (!_socket.isClosed())
336 _socket.close();
337 }
338 }
339 catch(IOException e)
340 {
341 LOG.ignore(e);
342 }
343 }
344 }
345 }
346 }