1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.ByteChannel;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Collections;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25
26 import org.eclipse.jetty.http.HttpException;
27 import org.eclipse.jetty.io.Buffer;
28 import org.eclipse.jetty.io.ConnectedEndPoint;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.io.EofException;
32 import org.eclipse.jetty.io.nio.ChannelEndPoint;
33 import org.eclipse.jetty.server.HttpConnection;
34 import org.eclipse.jetty.server.Request;
35 import org.eclipse.jetty.util.ConcurrentHashSet;
36 import org.eclipse.jetty.util.log.Log;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public class BlockingChannelConnector extends AbstractNIOConnector
52 {
53 private transient ServerSocketChannel _acceptChannel;
54 private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
55
56
57
58
59
60
61 public BlockingChannelConnector()
62 {
63 }
64
65
66 public Object getConnection()
67 {
68 return _acceptChannel;
69 }
70
71
72
73
74
75 @Override
76 protected void doStart() throws Exception
77 {
78 super.doStart();
79 getThreadPool().dispatch(new Runnable()
80 {
81
82 public void run()
83 {
84 while (isRunning())
85 {
86 try
87 {
88 Thread.sleep(400);
89 long now=System.currentTimeMillis();
90 for (BlockingChannelEndPoint endp : _endpoints)
91 {
92 endp.checkIdleTimestamp(now);
93 }
94 }
95 catch(InterruptedException e)
96 {
97 Log.ignore(e);
98 }
99 catch(Exception e)
100 {
101 Log.warn(e);
102 }
103 }
104 }
105
106 });
107
108 }
109
110
111
112 public void open() throws IOException
113 {
114
115 _acceptChannel= ServerSocketChannel.open();
116 _acceptChannel.configureBlocking(true);
117
118
119 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
120 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
121 }
122
123
124 public void close() throws IOException
125 {
126 if (_acceptChannel != null)
127 _acceptChannel.close();
128 _acceptChannel=null;
129 }
130
131
132 @Override
133 public void accept(int acceptorID)
134 throws IOException, InterruptedException
135 {
136 SocketChannel channel = _acceptChannel.accept();
137 channel.configureBlocking(true);
138 Socket socket=channel.socket();
139 configure(socket);
140
141 BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
142 connection.dispatch();
143 }
144
145
146 @Override
147 public void customize(EndPoint endpoint, Request request)
148 throws IOException
149 {
150 super.customize(endpoint, request);
151 endpoint.setMaxIdleTime(_maxIdleTime);
152 configure(((SocketChannel)endpoint.getTransport()).socket());
153 }
154
155
156
157 public int getLocalPort()
158 {
159 if (_acceptChannel==null || !_acceptChannel.isOpen())
160 return -1;
161 return _acceptChannel.socket().getLocalPort();
162 }
163
164
165
166
167 private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
168 {
169 private Connection _connection;
170 private int _timeout;
171 private volatile long _idleTimestamp;
172
173 BlockingChannelEndPoint(ByteChannel channel)
174 throws IOException
175 {
176 super(channel,BlockingChannelConnector.this._maxIdleTime);
177 _connection = new HttpConnection(BlockingChannelConnector.this,this,getServer());
178 }
179
180
181
182
183
184 public Connection getConnection()
185 {
186 return _connection;
187 }
188
189
190 public void setConnection(Connection connection)
191 {
192 _connection=connection;
193 }
194
195
196 public void checkIdleTimestamp(long now)
197 {
198 if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
199 {
200 idleExpired();
201 }
202 }
203
204
205 protected void idleExpired()
206 {
207 try
208 {
209 close();
210 }
211 catch (IOException e)
212 {
213 Log.ignore(e);
214 }
215 }
216
217
218 void dispatch() throws IOException
219 {
220 if (!getThreadPool().dispatch(this))
221 {
222 Log.warn("dispatch failed for {}",_connection);
223 BlockingChannelEndPoint.this.close();
224 }
225 }
226
227
228
229
230
231 @Override
232 public int fill(Buffer buffer) throws IOException
233 {
234 _idleTimestamp=System.currentTimeMillis();
235 return super.fill(buffer);
236 }
237
238
239
240
241
242 @Override
243 public int flush(Buffer buffer) throws IOException
244 {
245 _idleTimestamp=System.currentTimeMillis();
246 return super.flush(buffer);
247 }
248
249
250
251
252
253 @Override
254 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
255 {
256 _idleTimestamp=System.currentTimeMillis();
257 return super.flush(header,buffer,trailer);
258 }
259
260
261 public void run()
262 {
263 try
264 {
265 _timeout=getMaxIdleTime();
266 connectionOpened(_connection);
267 _endpoints.add(this);
268
269 while (isOpen())
270 {
271 _idleTimestamp=System.currentTimeMillis();
272 if (_connection.isIdle())
273 {
274 if (getServer().getThreadPool().isLowOnThreads())
275 {
276 int lrmit = getLowResourcesMaxIdleTime();
277 if (lrmit>=0 && _timeout!= lrmit)
278 {
279 _timeout=lrmit;
280 }
281 }
282 }
283 else
284 {
285 if (_timeout!=getMaxIdleTime())
286 {
287 _timeout=getMaxIdleTime();
288 }
289 }
290
291 _connection = _connection.handle();
292
293 }
294 }
295 catch (EofException e)
296 {
297 Log.debug("EOF", e);
298 try{BlockingChannelEndPoint.this.close();}
299 catch(IOException e2){Log.ignore(e2);}
300 }
301 catch (HttpException e)
302 {
303 Log.debug("BAD", e);
304 try{BlockingChannelEndPoint.this.close();}
305 catch(IOException e2){Log.ignore(e2);}
306 }
307 catch(Throwable e)
308 {
309 Log.warn("handle failed",e);
310 try{BlockingChannelEndPoint.this.close();}
311 catch(IOException e2){Log.ignore(e2);}
312 }
313 finally
314 {
315 connectionClosed(_connection);
316 _endpoints.remove(this);
317
318
319 try
320 {
321 if (!_socket.isClosed())
322 {
323 long timestamp=System.currentTimeMillis();
324 int max_idle=getMaxIdleTime();
325
326 _socket.setSoTimeout(getMaxIdleTime());
327 int c=0;
328 do
329 {
330 c = _socket.getInputStream().read();
331 }
332 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
333 if (!_socket.isClosed())
334 _socket.close();
335 }
336 }
337 catch(IOException e)
338 {
339 Log.ignore(e);
340 }
341 }
342 }
343 }
344 }