1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.thread.Timeout;
30
31
32
33
34
35 public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint
36 {
37 private final SelectorManager.SelectSet _selectSet;
38 private final SelectorManager _manager;
39 private volatile Connection _connection;
40 private boolean _dispatched = false;
41 private boolean _redispatched = false;
42 private volatile boolean _writable = true;
43
44 private SelectionKey _key;
45 private int _interestOps;
46 private boolean _readBlocked;
47 private boolean _writeBlocked;
48 private boolean _open;
49 private volatile long _idleTimestamp;
50
51
52 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
53 throws IOException
54 {
55 super(channel);
56
57 _manager = selectSet.getManager();
58 _selectSet = selectSet;
59 _dispatched = false;
60 _redispatched = false;
61 _open=true;
62 _key = key;
63
64 _connection = _manager.newConnection(channel,this);
65
66 scheduleIdle();
67 }
68
69
70 public SelectionKey getSelectionKey()
71 {
72 synchronized (this)
73 {
74 return _key;
75 }
76 }
77
78
79 public SelectorManager getSelectManager()
80 {
81 return _manager;
82 }
83
84
85 public Connection getConnection()
86 {
87 return _connection;
88 }
89
90
91 public void setConnection(Connection connection)
92 {
93 Connection old=_connection;
94 _connection=connection;
95 _manager.endPointUpgraded(this,old);
96 }
97
98
99
100
101
102 public void schedule()
103 {
104 synchronized (this)
105 {
106
107 if (_key == null || !_key.isValid())
108 {
109 _readBlocked=false;
110 _writeBlocked=false;
111 this.notifyAll();
112 return;
113 }
114
115
116 if (_readBlocked || _writeBlocked)
117 {
118
119 if (_readBlocked && _key.isReadable())
120 _readBlocked=false;
121 if (_writeBlocked && _key.isWritable())
122 _writeBlocked=false;
123
124
125 this.notifyAll();
126
127
128 _key.interestOps(0);
129 return;
130 }
131
132
133 if (!isReadyForDispatch())
134 {
135
136 _key.interestOps(0);
137 return;
138 }
139
140
141
142 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
143 {
144
145 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
146 _key.interestOps(_interestOps);
147 _writable = true;
148 }
149
150 if (_dispatched)
151 _key.interestOps(0);
152 else
153 dispatch();
154 }
155 }
156
157
158 public void dispatch()
159 {
160 synchronized(this)
161 {
162 if (_dispatched)
163 _redispatched=true;
164 else
165 {
166 _dispatched = _manager.dispatch(this);
167 if(!_dispatched)
168 {
169 Log.warn("Dispatched Failed!");
170 updateKey();
171 }
172 }
173 }
174 }
175
176
177
178
179
180
181
182
183 private boolean undispatch()
184 {
185 synchronized (this)
186 {
187 if (_redispatched)
188 {
189 _redispatched=false;
190 return false;
191 }
192 _dispatched = false;
193 updateKey();
194 }
195 return true;
196 }
197
198
199 public void scheduleIdle()
200 {
201 _idleTimestamp=System.currentTimeMillis();
202 }
203
204
205 public void cancelIdle()
206 {
207 _idleTimestamp=0;
208 }
209
210
211 public void checkIdleTimestamp(long now)
212 {
213 if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
214 {
215 idleExpired();
216 }
217 }
218
219
220 protected void idleExpired()
221 {
222 try
223 {
224 close();
225 }
226 catch (IOException e)
227 {
228 Log.ignore(e);
229 }
230 }
231
232
233
234
235 @Override
236 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
237 {
238 int l = super.flush(header, buffer, trailer);
239 if (!(_writable=l!=0))
240 {
241 synchronized (this)
242 {
243 if (!_dispatched)
244 updateKey();
245 }
246 }
247 return l;
248 }
249
250
251
252
253 @Override
254 public int flush(Buffer buffer) throws IOException
255 {
256 int l = super.flush(buffer);
257 if (!(_writable=l!=0))
258 {
259 synchronized (this)
260 {
261 if (!_dispatched)
262 updateKey();
263 }
264 }
265 return l;
266 }
267
268
269 public boolean isReadyForDispatch()
270 {
271 synchronized (this)
272 {
273 return !(_dispatched || getConnection().isSuspended());
274 }
275 }
276
277
278
279
280
281 @Override
282 public boolean blockReadable(long timeoutMs) throws IOException
283 {
284 synchronized (this)
285 {
286 long start=_selectSet.getNow();
287 try
288 {
289 _readBlocked=true;
290 while (isOpen() && _readBlocked)
291 {
292 try
293 {
294 updateKey();
295 this.wait(timeoutMs);
296
297 timeoutMs -= _selectSet.getNow()-start;
298 if (_readBlocked && timeoutMs<=0)
299 return false;
300 }
301 catch (InterruptedException e)
302 {
303 Log.warn(e);
304 }
305 }
306 }
307 finally
308 {
309 _readBlocked=false;
310 }
311 }
312 return true;
313 }
314
315
316
317
318
319 @Override
320 public boolean blockWritable(long timeoutMs) throws IOException
321 {
322 synchronized (this)
323 {
324 long start=_selectSet.getNow();
325 try
326 {
327 _writeBlocked=true;
328 while (isOpen() && _writeBlocked)
329 {
330 try
331 {
332 updateKey();
333 this.wait(timeoutMs);
334
335 timeoutMs -= _selectSet.getNow()-start;
336 if (_writeBlocked && timeoutMs<=0)
337 return false;
338 }
339 catch (InterruptedException e)
340 {
341 Log.warn(e);
342 }
343 }
344 }
345 finally
346 {
347 _writeBlocked=false;
348 if (_idleTimestamp!=-1)
349 scheduleIdle();
350 }
351 }
352 return true;
353 }
354
355
356 public void setWritable(boolean writable)
357 {
358 _writable=writable;
359 }
360
361
362 public void scheduleWrite()
363 {
364 _writable=false;
365 updateKey();
366 }
367
368
369
370
371
372
373
374 private void updateKey()
375 {
376 synchronized (this)
377 {
378 int ops=-1;
379 if (getChannel().isOpen())
380 {
381 _interestOps =
382 ((!_dispatched || _readBlocked) ? SelectionKey.OP_READ : 0)
383 | ((!_writable || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
384 try
385 {
386 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
387 }
388 catch(Exception e)
389 {
390 _key=null;
391 Log.ignore(e);
392 }
393 }
394
395 if(_interestOps == ops && getChannel().isOpen())
396 return;
397
398 }
399 _selectSet.addChange(this);
400 _selectSet.wakeup();
401 }
402
403
404
405
406
407 void doUpdateKey()
408 {
409 synchronized (this)
410 {
411 if (getChannel().isOpen())
412 {
413 if (_interestOps>0)
414 {
415 if (_key==null || !_key.isValid())
416 {
417 SelectableChannel sc = (SelectableChannel)getChannel();
418 if (sc.isRegistered())
419 {
420 updateKey();
421 }
422 else
423 {
424 try
425 {
426 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
427 }
428 catch (Exception e)
429 {
430 Log.ignore(e);
431 if (_key!=null && _key.isValid())
432 {
433 _key.cancel();
434 }
435 cancelIdle();
436
437 if (_open)
438 {
439 _selectSet.destroyEndPoint(this);
440 }
441 _open=false;
442 _key = null;
443 }
444 }
445 }
446 else
447 {
448 _key.interestOps(_interestOps);
449 }
450 }
451 else
452 {
453 if (_key.isValid())
454 _key.interestOps(0);
455 else
456 _key=null;
457 }
458 }
459 else
460 {
461 if (_key!=null && _key.isValid())
462 _key.cancel();
463
464 cancelIdle();
465 if (_open)
466 {
467 _selectSet.destroyEndPoint(this);
468 }
469 _open=false;
470 _key = null;
471 }
472 }
473 }
474
475
476
477
478 public void run()
479 {
480 boolean dispatched=true;
481 try
482 {
483 while(dispatched)
484 {
485 try
486 {
487 while(true)
488 {
489 final Connection next = _connection.handle();
490 if (next!=_connection)
491 {
492 _connection=next;
493 continue;
494 }
495 break;
496 }
497 }
498 catch (ClosedChannelException e)
499 {
500 Log.ignore(e);
501 }
502 catch (EofException e)
503 {
504 Log.debug("EOF", e);
505 try{close();}
506 catch(IOException e2){Log.ignore(e2);}
507 }
508 catch (IOException e)
509 {
510 Log.warn(e.toString());
511 Log.debug(e);
512 try{close();}
513 catch(IOException e2){Log.ignore(e2);}
514 }
515 catch (Throwable e)
516 {
517 Log.warn("handle failed", e);
518 try{close();}
519 catch(IOException e2){Log.ignore(e2);}
520 }
521 dispatched=!undispatch();
522 }
523 }
524 finally
525 {
526 if (dispatched)
527 {
528 dispatched=!undispatch();
529 while (dispatched)
530 {
531 Log.warn("SCEP.run() finally DISPATCHED");
532 dispatched=!undispatch();
533 }
534 }
535 }
536 }
537
538
539
540
541
542 @Override
543 public void close() throws IOException
544 {
545 try
546 {
547 super.close();
548 }
549 catch (IOException e)
550 {
551 Log.ignore(e);
552 }
553 finally
554 {
555 updateKey();
556 }
557 }
558
559
560 @Override
561 public String toString()
562 {
563 synchronized(this)
564 {
565 return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
566 ",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
567 }
568 }
569
570
571 public SelectSet getSelectSet()
572 {
573 return _selectSet;
574 }
575
576
577
578
579
580
581 @Override
582 public void setMaxIdleTime(int timeMs) throws IOException
583 {
584 _maxIdleTime=timeMs;
585 }
586
587
588
589 }