1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.util.thread.Timeout.Task;
31
32
33
34
35
36 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37 {
38 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39
40 private final SelectorManager.SelectSet _selectSet;
41 private final SelectorManager _manager;
42 private SelectionKey _key;
43 private final Runnable _handler = new Runnable()
44 {
45 public void run() { handle(); }
46 };
47
48
49 private int _interestOps;
50
51
52
53
54
55
56
57 private volatile AsyncConnection _connection;
58
59
60 private boolean _dispatched = false;
61
62
63 private boolean _asyncDispatch = false;
64
65
66 private volatile boolean _writable = true;
67
68
69
70 private boolean _readBlocked;
71
72
73 private boolean _writeBlocked;
74
75
76 private boolean _open;
77
78 private volatile long _idleTimestamp;
79
80 private boolean _ishut;
81
82
83 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
84 throws IOException
85 {
86 super(channel, maxIdleTime);
87
88 _manager = selectSet.getManager();
89 _selectSet = selectSet;
90 _dispatched = false;
91 _asyncDispatch = false;
92 _open=true;
93 _key = key;
94
95 setCheckForIdle(true);
96 }
97
98
99 public SelectionKey getSelectionKey()
100 {
101 synchronized (this)
102 {
103 return _key;
104 }
105 }
106
107
108 public SelectorManager getSelectManager()
109 {
110 return _manager;
111 }
112
113
114 public Connection getConnection()
115 {
116 return _connection;
117 }
118
119
120 public void setConnection(Connection connection)
121 {
122 Connection old=_connection;
123 _connection=(AsyncConnection)connection;
124 if (old!=null && old!=_connection)
125 _manager.endPointUpgraded(this,old);
126 }
127
128
129 public long getIdleTimestamp()
130 {
131 return _idleTimestamp;
132 }
133
134
135
136
137
138 public void schedule()
139 {
140 synchronized (this)
141 {
142
143 if (_key == null || !_key.isValid())
144 {
145 _readBlocked=false;
146 _writeBlocked=false;
147 this.notifyAll();
148 return;
149 }
150
151
152 if (_readBlocked || _writeBlocked)
153 {
154
155 if (_readBlocked && _key.isReadable())
156 _readBlocked=false;
157 if (_writeBlocked && _key.isWritable())
158 _writeBlocked=false;
159
160
161 this.notifyAll();
162
163
164 _key.interestOps(0);
165 if (!_dispatched)
166 updateKey();
167 return;
168 }
169
170
171 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
172 {
173
174 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
175 _key.interestOps(_interestOps);
176 _writable = true;
177 }
178
179
180 if (_dispatched)
181 _key.interestOps(0);
182 else
183 {
184
185 dispatch();
186 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
187 {
188 _key.interestOps(0);
189 }
190 }
191 }
192 }
193
194
195 public void asyncDispatch()
196 {
197 synchronized(this)
198 {
199 if (_dispatched)
200 _asyncDispatch=true;
201 else
202 dispatch();
203 }
204 }
205
206
207 public void dispatch()
208 {
209 synchronized(this)
210 {
211 if (_dispatched)
212 {
213 throw new IllegalStateException("dispatched");
214 }
215 else
216 {
217 _dispatched = true;
218 boolean dispatched = _manager.dispatch(_handler);
219 if(!dispatched)
220 {
221 _dispatched = false;
222 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
223 updateKey();
224 }
225 }
226 }
227 }
228
229
230
231
232
233
234
235
236 protected boolean undispatch()
237 {
238 synchronized (this)
239 {
240 if (_asyncDispatch)
241 {
242 _asyncDispatch=false;
243 return false;
244 }
245 _dispatched = false;
246 updateKey();
247 }
248 return true;
249 }
250
251
252 public void cancelTimeout(Task task)
253 {
254 getSelectSet().cancelTimeout(task);
255 }
256
257
258 public void scheduleTimeout(Task task, long timeoutMs)
259 {
260 getSelectSet().scheduleTimeout(task,timeoutMs);
261 }
262
263
264 public void setCheckForIdle(boolean check)
265 {
266 _idleTimestamp=check?System.currentTimeMillis():0;
267 }
268
269
270 public boolean isCheckForIdle()
271 {
272 return _idleTimestamp!=0;
273 }
274
275
276 protected void notIdle()
277 {
278 if (_idleTimestamp!=0)
279 _idleTimestamp=System.currentTimeMillis();
280 }
281
282
283 public void checkIdleTimestamp(long now)
284 {
285 long idleTimestamp=_idleTimestamp;
286
287 if (idleTimestamp!=0 && _maxIdleTime>0)
288 {
289 long idleForMs=now-idleTimestamp;
290
291 if (idleForMs>_maxIdleTime)
292 {
293 onIdleExpired(idleForMs);
294 _idleTimestamp=now;
295 }
296 }
297 }
298
299
300 public void onIdleExpired(long idleForMs)
301 {
302 _connection.onIdleExpired(idleForMs);
303 }
304
305
306 @Override
307 public int fill(Buffer buffer) throws IOException
308 {
309 int fill=super.fill(buffer);
310 if (fill>0)
311 notIdle();
312 return fill;
313 }
314
315
316 @Override
317 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
318 {
319 int l = super.flush(header, buffer, trailer);
320
321
322 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
323 {
324 synchronized (this)
325 {
326 _writable=false;
327 if (!_dispatched)
328 updateKey();
329 }
330 }
331 else if (l>0)
332 {
333 _writable=true;
334 notIdle();
335 }
336 return l;
337 }
338
339
340
341
342 @Override
343 public int flush(Buffer buffer) throws IOException
344 {
345 int l = super.flush(buffer);
346
347
348 if (l==0 && buffer!=null && buffer.hasContent())
349 {
350 synchronized (this)
351 {
352 _writable=false;
353 if (!_dispatched)
354 updateKey();
355 }
356 }
357 else if (l>0)
358 {
359 _writable=true;
360 notIdle();
361 }
362
363 return l;
364 }
365
366
367
368
369
370 @Override
371 public boolean blockReadable(long timeoutMs) throws IOException
372 {
373 synchronized (this)
374 {
375 if (isInputShutdown())
376 throw new EofException();
377
378 long now=_selectSet.getNow();
379 long end=now+timeoutMs;
380 boolean check=isCheckForIdle();
381 setCheckForIdle(true);
382 try
383 {
384 _readBlocked=true;
385 while (!isInputShutdown() && _readBlocked)
386 {
387 try
388 {
389 updateKey();
390 this.wait(timeoutMs>=0?(end-now):10000);
391 }
392 catch (InterruptedException e)
393 {
394 LOG.warn(e);
395 }
396 finally
397 {
398 now=_selectSet.getNow();
399 }
400
401 if (_readBlocked && timeoutMs>0 && now>=end)
402 return false;
403 }
404 }
405 finally
406 {
407 _readBlocked=false;
408 setCheckForIdle(check);
409 }
410 }
411 return true;
412 }
413
414
415
416
417
418 @Override
419 public boolean blockWritable(long timeoutMs) throws IOException
420 {
421 synchronized (this)
422 {
423 if (isOutputShutdown())
424 throw new EofException();
425
426 long now=_selectSet.getNow();
427 long end=now+timeoutMs;
428 boolean check=isCheckForIdle();
429 setCheckForIdle(true);
430 try
431 {
432 _writeBlocked=true;
433 while (_writeBlocked && !isOutputShutdown())
434 {
435 try
436 {
437 updateKey();
438 this.wait(timeoutMs>=0?(end-now):10000);
439 }
440 catch (InterruptedException e)
441 {
442 LOG.warn(e);
443 }
444 finally
445 {
446 now=_selectSet.getNow();
447 }
448 if (_writeBlocked && timeoutMs>0 && now>=end)
449 return false;
450 }
451 }
452 finally
453 {
454 _writeBlocked=false;
455 setCheckForIdle(check);
456 }
457 }
458 return true;
459 }
460
461
462
463
464
465 public void scheduleWrite()
466 {
467 if (_writable==true)
468 LOG.debug("Required scheduleWrite {}",this);
469
470 _writable=false;
471 updateKey();
472 }
473
474
475 public boolean isWritable()
476 {
477 return _writable;
478 }
479
480
481 public boolean hasProgressed()
482 {
483 return false;
484 }
485
486
487
488
489
490
491
492 private void updateKey()
493 {
494 final boolean changed;
495 synchronized (this)
496 {
497 int current_ops=-1;
498 if (getChannel().isOpen())
499 {
500 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
501 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
502
503 _interestOps =
504 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
505 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
506 try
507 {
508 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
509 }
510 catch(Exception e)
511 {
512 _key=null;
513 LOG.ignore(e);
514 }
515 }
516 changed=_interestOps!=current_ops;
517 }
518
519 if(changed)
520 {
521 _selectSet.addChange(this);
522 _selectSet.wakeup();
523 }
524 }
525
526
527
528
529
530
531 void doUpdateKey()
532 {
533 synchronized (this)
534 {
535 if (getChannel().isOpen())
536 {
537 if (_interestOps>0)
538 {
539 if (_key==null || !_key.isValid())
540 {
541 SelectableChannel sc = (SelectableChannel)getChannel();
542 if (sc.isRegistered())
543 {
544 updateKey();
545 }
546 else
547 {
548 try
549 {
550 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
551 }
552 catch (Exception e)
553 {
554 LOG.ignore(e);
555 if (_key!=null && _key.isValid())
556 {
557 _key.cancel();
558 }
559
560 if (_open)
561 {
562 _selectSet.destroyEndPoint(this);
563 }
564 _open=false;
565 _key = null;
566 }
567 }
568 }
569 else
570 {
571 _key.interestOps(_interestOps);
572 }
573 }
574 else
575 {
576 if (_key!=null && _key.isValid())
577 _key.interestOps(0);
578 else
579 _key=null;
580 }
581 }
582 else
583 {
584 if (_key!=null && _key.isValid())
585 _key.cancel();
586
587 if (_open)
588 {
589 _open=false;
590 _selectSet.destroyEndPoint(this);
591 }
592 _key = null;
593 }
594 }
595 }
596
597
598
599
600 protected void handle()
601 {
602 boolean dispatched=true;
603 try
604 {
605 while(dispatched)
606 {
607 try
608 {
609 while(true)
610 {
611 final AsyncConnection next = (AsyncConnection)_connection.handle();
612 if (next!=_connection)
613 {
614 LOG.debug("{} replaced {}",next,_connection);
615 Connection old=_connection;
616 _connection=next;
617 _manager.endPointUpgraded(this,old);
618 continue;
619 }
620 break;
621 }
622 }
623 catch (ClosedChannelException e)
624 {
625 LOG.ignore(e);
626 }
627 catch (EofException e)
628 {
629 LOG.debug("EOF", e);
630 try{close();}
631 catch(IOException e2){LOG.ignore(e2);}
632 }
633 catch (IOException e)
634 {
635 LOG.warn(e.toString());
636 try{close();}
637 catch(IOException e2){LOG.ignore(e2);}
638 }
639 catch (Throwable e)
640 {
641 LOG.warn("handle failed", e);
642 try{close();}
643 catch(IOException e2){LOG.ignore(e2);}
644 }
645 finally
646 {
647 if (!_ishut && isInputShutdown() && isOpen())
648 {
649 _ishut=true;
650 try
651 {
652 _connection.onInputShutdown();
653 }
654 catch(Throwable x)
655 {
656 LOG.warn("onInputShutdown failed", x);
657 try{close();}
658 catch(IOException e2){LOG.ignore(e2);}
659 }
660 finally
661 {
662 updateKey();
663 }
664 }
665 dispatched=!undispatch();
666 }
667 }
668 }
669 finally
670 {
671 if (dispatched)
672 {
673 dispatched=!undispatch();
674 while (dispatched)
675 {
676 LOG.warn("SCEP.run() finally DISPATCHED");
677 dispatched=!undispatch();
678 }
679 }
680 }
681 }
682
683
684
685
686
687 @Override
688 public void close() throws IOException
689 {
690 try
691 {
692 super.close();
693 }
694 catch (IOException e)
695 {
696 LOG.ignore(e);
697 }
698 finally
699 {
700 updateKey();
701 }
702 }
703
704
705 @Override
706 public String toString()
707 {
708
709
710
711 SelectionKey key = _key;
712 String keyString = "";
713 if (key != null)
714 {
715 if (key.isValid())
716 {
717 if (key.isReadable())
718 keyString += "r";
719 if (key.isWritable())
720 keyString += "w";
721 }
722 else
723 {
724 keyString += "!";
725 }
726 }
727 else
728 {
729 keyString += "-";
730 }
731 return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
732 hashCode(),
733 _socket.getRemoteSocketAddress(),
734 _socket.getLocalSocketAddress(),
735 _dispatched,
736 isOpen(),
737 isInputShutdown(),
738 isOutputShutdown(),
739 _readBlocked,
740 _writeBlocked,
741 _writable,
742 _interestOps,
743 keyString,
744 _connection);
745 }
746
747
748 public SelectSet getSelectSet()
749 {
750 return _selectSet;
751 }
752
753
754
755
756
757
758 @Override
759 public void setMaxIdleTime(int timeMs) throws IOException
760 {
761 _maxIdleTime=timeMs;
762 }
763
764 }