1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.ByteChannel;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Set;
23
24 import org.eclipse.jetty.http.HttpException;
25 import org.eclipse.jetty.io.Buffer;
26 import org.eclipse.jetty.io.ConnectedEndPoint;
27 import org.eclipse.jetty.io.Connection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.io.EofException;
30 import org.eclipse.jetty.io.nio.ChannelEndPoint;
31 import org.eclipse.jetty.server.HttpConnection;
32 import org.eclipse.jetty.server.Request;
33 import org.eclipse.jetty.util.ConcurrentHashSet;
34 import org.eclipse.jetty.util.log.Log;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public class BlockingChannelConnector extends AbstractNIOConnector
50 {
51 private transient ServerSocketChannel _acceptChannel;
52 private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
53
54
55
56
57
58
59 public BlockingChannelConnector()
60 {
61 }
62
63
64 public Object getConnection()
65 {
66 return _acceptChannel;
67 }
68
69
70
71
72
73 @Override
74 protected void doStart() throws Exception
75 {
76 super.doStart();
77 getThreadPool().dispatch(new Runnable()
78 {
79
80 public void run()
81 {
82 while (isRunning())
83 {
84 try
85 {
86 Thread.sleep(400);
87 long now=System.currentTimeMillis();
88 for (BlockingChannelEndPoint endp : _endpoints)
89 {
90 endp.checkIdleTimestamp(now);
91 }
92 }
93 catch(InterruptedException e)
94 {
95 Log.ignore(e);
96 }
97 catch(Exception e)
98 {
99 Log.warn(e);
100 }
101 }
102 }
103
104 });
105
106 }
107
108
109
110 public void open() throws IOException
111 {
112
113 _acceptChannel= ServerSocketChannel.open();
114 _acceptChannel.configureBlocking(true);
115
116
117 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
118 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
119 }
120
121
122 public void close() throws IOException
123 {
124 if (_acceptChannel != null)
125 _acceptChannel.close();
126 _acceptChannel=null;
127 }
128
129
130 @Override
131 public void accept(int acceptorID)
132 throws IOException, InterruptedException
133 {
134 SocketChannel channel = _acceptChannel.accept();
135 channel.configureBlocking(true);
136 Socket socket=channel.socket();
137 configure(socket);
138
139 BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
140 connection.dispatch();
141 }
142
143
144 @Override
145 public void customize(EndPoint endpoint, Request request)
146 throws IOException
147 {
148 super.customize(endpoint, request);
149 endpoint.setMaxIdleTime(_maxIdleTime);
150 configure(((SocketChannel)endpoint.getTransport()).socket());
151 }
152
153
154
155 public int getLocalPort()
156 {
157 if (_acceptChannel==null || !_acceptChannel.isOpen())
158 return -1;
159 return _acceptChannel.socket().getLocalPort();
160 }
161
162
163
164
165 private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
166 {
167 private Connection _connection;
168 private int _timeout;
169 private volatile long _idleTimestamp;
170
171 BlockingChannelEndPoint(ByteChannel channel)
172 throws IOException
173 {
174 super(channel,BlockingChannelConnector.this._maxIdleTime);
175 _connection = new HttpConnection(BlockingChannelConnector.this,this,getServer());
176 }
177
178
179
180
181
182 public Connection getConnection()
183 {
184 return _connection;
185 }
186
187
188 public void setConnection(Connection connection)
189 {
190 _connection=connection;
191 }
192
193
194 public void checkIdleTimestamp(long now)
195 {
196 if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
197 {
198 idleExpired();
199 }
200 }
201
202
203 protected void idleExpired()
204 {
205 try
206 {
207 close();
208 }
209 catch (IOException e)
210 {
211 Log.ignore(e);
212 }
213 }
214
215
216 void dispatch() throws IOException
217 {
218 if (!getThreadPool().dispatch(this))
219 {
220 Log.warn("dispatch failed for {}",_connection);
221 BlockingChannelEndPoint.this.close();
222 }
223 }
224
225
226
227
228
229 @Override
230 public int fill(Buffer buffer) throws IOException
231 {
232 _idleTimestamp=System.currentTimeMillis();
233 return super.fill(buffer);
234 }
235
236
237
238
239
240 @Override
241 public int flush(Buffer buffer) throws IOException
242 {
243 _idleTimestamp=System.currentTimeMillis();
244 return super.flush(buffer);
245 }
246
247
248
249
250
251 @Override
252 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
253 {
254 _idleTimestamp=System.currentTimeMillis();
255 return super.flush(header,buffer,trailer);
256 }
257
258
259 public void run()
260 {
261 try
262 {
263 _timeout=getMaxIdleTime();
264 connectionOpened(_connection);
265 _endpoints.add(this);
266
267 while (isOpen())
268 {
269 _idleTimestamp=System.currentTimeMillis();
270 if (_connection.isIdle())
271 {
272 if (getServer().getThreadPool().isLowOnThreads())
273 {
274 int lrmit = getLowResourcesMaxIdleTime();
275 if (lrmit>=0 && _timeout!= lrmit)
276 {
277 _timeout=lrmit;
278 }
279 }
280 }
281 else
282 {
283 if (_timeout!=getMaxIdleTime())
284 {
285 _timeout=getMaxIdleTime();
286 }
287 }
288
289 _connection = _connection.handle();
290
291 }
292 }
293 catch (EofException e)
294 {
295 Log.debug("EOF", e);
296 try{BlockingChannelEndPoint.this.close();}
297 catch(IOException e2){Log.ignore(e2);}
298 }
299 catch (HttpException e)
300 {
301 Log.debug("BAD", e);
302 try{BlockingChannelEndPoint.this.close();}
303 catch(IOException e2){Log.ignore(e2);}
304 }
305 catch(Throwable e)
306 {
307 Log.warn("handle failed",e);
308 try{BlockingChannelEndPoint.this.close();}
309 catch(IOException e2){Log.ignore(e2);}
310 }
311 finally
312 {
313 connectionClosed(_connection);
314 _endpoints.remove(this);
315
316
317 try
318 {
319 if (!_socket.isClosed())
320 {
321 long timestamp=System.currentTimeMillis();
322 int max_idle=getMaxIdleTime();
323
324 _socket.setSoTimeout(getMaxIdleTime());
325 int c=0;
326 do
327 {
328 c = _socket.getInputStream().read();
329 }
330 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
331 if (!_socket.isClosed())
332 _socket.close();
333 }
334 }
335 catch(IOException e)
336 {
337 Log.ignore(e);
338 }
339 }
340 }
341 }
342 }