1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io.nio;
20
21 import java.io.IOException;
22 import java.nio.channels.ClosedChannelException;
23 import java.nio.channels.SelectableChannel;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.SocketChannel;
26 import java.util.Locale;
27
28 import org.eclipse.jetty.io.AsyncEndPoint;
29 import org.eclipse.jetty.io.Buffer;
30 import org.eclipse.jetty.io.ConnectedEndPoint;
31 import org.eclipse.jetty.io.Connection;
32 import org.eclipse.jetty.io.EofException;
33 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.util.thread.Timeout.Task;
37
38
39
40
41
42 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
43 {
44 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
45
46 private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
47 private final SelectorManager.SelectSet _selectSet;
48 private final SelectorManager _manager;
49 private SelectionKey _key;
50 private final Runnable _handler = new Runnable()
51 {
52 public void run() { handle(); }
53 };
54
55
56 private int _interestOps;
57
58
59
60
61
62
63
64 private volatile AsyncConnection _connection;
65
66 private static final int STATE_NEEDS_DISPATCH=-1;
67 private static final int STATE_UNDISPATCHED=0;
68 private static final int STATE_DISPATCHED=1;
69 private static final int STATE_ASYNC=2;
70 private int _state;
71
72 private boolean _onIdle;
73
74
75 private volatile boolean _writable = true;
76
77
78
79 private boolean _readBlocked;
80
81
82 private boolean _writeBlocked;
83
84
85 private boolean _open;
86
87 private volatile long _idleTimestamp;
88 private volatile boolean _checkIdle;
89
90 private boolean _ishut;
91
92
93 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
94 throws IOException
95 {
96 super(channel, maxIdleTime);
97
98 _manager = selectSet.getManager();
99 _selectSet = selectSet;
100 _state=STATE_UNDISPATCHED;
101 _onIdle=false;
102 _open=true;
103 _key = key;
104
105 setCheckForIdle(true);
106 }
107
108
109 public SelectionKey getSelectionKey()
110 {
111 synchronized (this)
112 {
113 return _key;
114 }
115 }
116
117
118 public SelectorManager getSelectManager()
119 {
120 return _manager;
121 }
122
123
124 public Connection getConnection()
125 {
126 return _connection;
127 }
128
129
130 public void setConnection(Connection connection)
131 {
132 Connection old=_connection;
133 _connection=(AsyncConnection)connection;
134 if (old!=null && old!=_connection)
135 _manager.endPointUpgraded(this,old);
136 }
137
138
139 public long getIdleTimestamp()
140 {
141 return _idleTimestamp;
142 }
143
144
145
146
147
148 public void schedule()
149 {
150 synchronized (this)
151 {
152
153 if (_key == null || !_key.isValid())
154 {
155 _readBlocked=false;
156 _writeBlocked=false;
157 this.notifyAll();
158 return;
159 }
160
161
162 if (_readBlocked || _writeBlocked)
163 {
164
165 if (_readBlocked && _key.isReadable())
166 _readBlocked=false;
167 if (_writeBlocked && _key.isWritable())
168 _writeBlocked=false;
169
170
171 this.notifyAll();
172
173
174 _key.interestOps(0);
175 if (_state<STATE_DISPATCHED)
176 updateKey();
177 return;
178 }
179
180
181 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
182 {
183
184 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
185 _key.interestOps(_interestOps);
186 _writable = true;
187 }
188
189
190 if (_state>=STATE_DISPATCHED)
191 _key.interestOps(0);
192 else
193 {
194
195 dispatch();
196 if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
197 {
198 _key.interestOps(0);
199 }
200 }
201 }
202 }
203
204
205 public void asyncDispatch()
206 {
207 synchronized(this)
208 {
209 switch(_state)
210 {
211 case STATE_NEEDS_DISPATCH:
212 case STATE_UNDISPATCHED:
213 dispatch();
214 break;
215
216 case STATE_DISPATCHED:
217 case STATE_ASYNC:
218 _state=STATE_ASYNC;
219 break;
220 }
221 }
222 }
223
224
225 public void dispatch()
226 {
227 synchronized(this)
228 {
229 if (_state<=STATE_UNDISPATCHED)
230 {
231 if (_onIdle)
232 _state = STATE_NEEDS_DISPATCH;
233 else
234 {
235 _state = STATE_DISPATCHED;
236 boolean dispatched = _manager.dispatch(_handler);
237 if(!dispatched)
238 {
239 _state = STATE_NEEDS_DISPATCH;
240 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
241 updateKey();
242 }
243 }
244 }
245 }
246 }
247
248
249
250
251
252
253
254
255 protected boolean undispatch()
256 {
257 synchronized (this)
258 {
259 switch(_state)
260 {
261 case STATE_ASYNC:
262 _state=STATE_DISPATCHED;
263 return false;
264
265 default:
266 _state=STATE_UNDISPATCHED;
267 updateKey();
268 return true;
269 }
270 }
271 }
272
273
274 public void cancelTimeout(Task task)
275 {
276 getSelectSet().cancelTimeout(task);
277 }
278
279
280 public void scheduleTimeout(Task task, long timeoutMs)
281 {
282 getSelectSet().scheduleTimeout(task,timeoutMs);
283 }
284
285
286 public void setCheckForIdle(boolean check)
287 {
288 if (check)
289 {
290 _idleTimestamp=System.currentTimeMillis();
291 _checkIdle=true;
292 }
293 else
294 _checkIdle=false;
295 }
296
297
298 public boolean isCheckForIdle()
299 {
300 return _checkIdle;
301 }
302
303
304 protected void notIdle()
305 {
306 _idleTimestamp=System.currentTimeMillis();
307 }
308
309
310 public void checkIdleTimestamp(long now)
311 {
312 if (isCheckForIdle() && _maxIdleTime>0)
313 {
314 final long idleForMs=now-_idleTimestamp;
315
316 if (idleForMs>_maxIdleTime)
317 {
318
319 setCheckForIdle(false);
320 _manager.dispatch(new Runnable()
321 {
322 public void run()
323 {
324 try
325 {
326 onIdleExpired(idleForMs);
327 }
328 finally
329 {
330 setCheckForIdle(true);
331 }
332 }
333 });
334 }
335 }
336 }
337
338
339 public void onIdleExpired(long idleForMs)
340 {
341 try
342 {
343 synchronized (this)
344 {
345 _onIdle=true;
346 }
347
348 if (_maxIdleTime>0 && (System.currentTimeMillis()-_idleTimestamp)>_maxIdleTime)
349 _connection.onIdleExpired(idleForMs);
350 }
351 finally
352 {
353 synchronized (this)
354 {
355 _onIdle=false;
356 if (_state==STATE_NEEDS_DISPATCH)
357 dispatch();
358 }
359 }
360 }
361
362
363 @Override
364 public int fill(Buffer buffer) throws IOException
365 {
366 int fill=super.fill(buffer);
367 if (fill>0)
368 notIdle();
369 return fill;
370 }
371
372
373 @Override
374 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
375 {
376 int l = super.flush(header, buffer, trailer);
377
378
379 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
380 {
381 synchronized (this)
382 {
383 _writable=false;
384 if (_state<STATE_DISPATCHED)
385 updateKey();
386 }
387 }
388 else if (l>0)
389 {
390 _writable=true;
391 notIdle();
392 }
393 return l;
394 }
395
396
397
398
399 @Override
400 public int flush(Buffer buffer) throws IOException
401 {
402 int l = super.flush(buffer);
403
404
405 if (l==0 && buffer!=null && buffer.hasContent())
406 {
407 synchronized (this)
408 {
409 _writable=false;
410 if (_state<STATE_DISPATCHED)
411 updateKey();
412 }
413 }
414 else if (l>0)
415 {
416 _writable=true;
417 notIdle();
418 }
419
420 return l;
421 }
422
423
424
425
426
427 @Override
428 public boolean blockReadable(long timeoutMs) throws IOException
429 {
430 synchronized (this)
431 {
432 if (isInputShutdown())
433 throw new EofException();
434
435 long now=_selectSet.getNow();
436 long end=now+timeoutMs;
437 boolean check=isCheckForIdle();
438 setCheckForIdle(true);
439 try
440 {
441 _readBlocked=true;
442 while (!isInputShutdown() && _readBlocked)
443 {
444 try
445 {
446 updateKey();
447 this.wait(timeoutMs>0?(end-now):10000);
448 }
449 catch (InterruptedException e)
450 {
451 LOG.warn(e);
452 }
453 finally
454 {
455 now=_selectSet.getNow();
456 }
457
458 if (_readBlocked && timeoutMs>0 && now>=end)
459 return false;
460 }
461 }
462 finally
463 {
464 _readBlocked=false;
465 setCheckForIdle(check);
466 }
467 }
468 return true;
469 }
470
471
472
473
474
475 @Override
476 public boolean blockWritable(long timeoutMs) throws IOException
477 {
478 synchronized (this)
479 {
480 if (isOutputShutdown())
481 throw new EofException();
482
483 long now=_selectSet.getNow();
484 long end=now+timeoutMs;
485 boolean check=isCheckForIdle();
486 setCheckForIdle(true);
487 try
488 {
489 _writeBlocked=true;
490 while (_writeBlocked && !isOutputShutdown())
491 {
492 try
493 {
494 updateKey();
495 this.wait(timeoutMs>0?(end-now):10000);
496 }
497 catch (InterruptedException e)
498 {
499 LOG.warn(e);
500 }
501 finally
502 {
503 now=_selectSet.getNow();
504 }
505 if (_writeBlocked && timeoutMs>0 && now>=end)
506 return false;
507 }
508 }
509 finally
510 {
511 _writeBlocked=false;
512 setCheckForIdle(check);
513 }
514 }
515 return true;
516 }
517
518
519
520
521
522 public void scheduleWrite()
523 {
524 if (_writable)
525 LOG.debug("Required scheduleWrite {}",this);
526
527 _writable=false;
528 updateKey();
529 }
530
531
532 public boolean isWritable()
533 {
534 return _writable;
535 }
536
537
538 public boolean hasProgressed()
539 {
540 return false;
541 }
542
543
544
545
546
547
548
549 private void updateKey()
550 {
551 final boolean changed;
552 synchronized (this)
553 {
554 int current_ops=-1;
555 if (getChannel().isOpen())
556 {
557 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
558 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
559
560 _interestOps =
561 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
562 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
563 try
564 {
565 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
566 }
567 catch(Exception e)
568 {
569 _key=null;
570 LOG.ignore(e);
571 }
572 }
573 changed=_interestOps!=current_ops;
574 }
575
576 if(changed)
577 {
578 _selectSet.addChange(this);
579 _selectSet.wakeup();
580 }
581 }
582
583
584
585
586
587
588 void doUpdateKey()
589 {
590 synchronized (this)
591 {
592 if (getChannel().isOpen())
593 {
594 if (_interestOps>0)
595 {
596 if (_key==null || !_key.isValid())
597 {
598 SelectableChannel sc = (SelectableChannel)getChannel();
599 if (sc.isRegistered())
600 {
601 updateKey();
602 }
603 else
604 {
605 try
606 {
607 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
608 }
609 catch (Exception e)
610 {
611 LOG.ignore(e);
612 if (_key!=null && _key.isValid())
613 {
614 _key.cancel();
615 }
616
617 if (_open)
618 {
619 _selectSet.destroyEndPoint(this);
620 }
621 _open=false;
622 _key = null;
623 }
624 }
625 }
626 else
627 {
628 _key.interestOps(_interestOps);
629 }
630 }
631 else
632 {
633 if (_key!=null && _key.isValid())
634 _key.interestOps(0);
635 else
636 _key=null;
637 }
638 }
639 else
640 {
641 if (_key!=null && _key.isValid())
642 _key.cancel();
643
644 if (_open)
645 {
646 _open=false;
647 _selectSet.destroyEndPoint(this);
648 }
649 _key = null;
650 }
651 }
652 }
653
654
655
656
657 protected void handle()
658 {
659 boolean dispatched=true;
660 try
661 {
662 while(dispatched)
663 {
664 try
665 {
666 while(true)
667 {
668 final AsyncConnection next = (AsyncConnection)_connection.handle();
669 if (next!=_connection)
670 {
671 LOG.debug("{} replaced {}",next,_connection);
672 Connection old=_connection;
673 _connection=next;
674 _manager.endPointUpgraded(this,old);
675 continue;
676 }
677 break;
678 }
679 }
680 catch (ClosedChannelException e)
681 {
682 LOG.ignore(e);
683 }
684 catch (EofException e)
685 {
686 LOG.debug("EOF", e);
687 try{close();}
688 catch(IOException e2){LOG.ignore(e2);}
689 }
690 catch (IOException e)
691 {
692 LOG.warn(e.toString());
693 try{close();}
694 catch(IOException e2){LOG.ignore(e2);}
695 }
696 catch (Throwable e)
697 {
698 LOG.warn("handle failed", e);
699 try{close();}
700 catch(IOException e2){LOG.ignore(e2);}
701 }
702 finally
703 {
704 if (!_ishut && isInputShutdown() && isOpen())
705 {
706 _ishut=true;
707 try
708 {
709 _connection.onInputShutdown();
710 }
711 catch(Throwable x)
712 {
713 LOG.warn("onInputShutdown failed", x);
714 try{close();}
715 catch(IOException e2){LOG.ignore(e2);}
716 }
717 finally
718 {
719 updateKey();
720 }
721 }
722 dispatched=!undispatch();
723 }
724 }
725 }
726 finally
727 {
728 if (dispatched)
729 {
730 dispatched=!undispatch();
731 while (dispatched)
732 {
733 LOG.warn("SCEP.run() finally DISPATCHED");
734 dispatched=!undispatch();
735 }
736 }
737 }
738 }
739
740
741
742
743
744 @Override
745 public void close() throws IOException
746 {
747
748
749
750
751
752 if (WORK_AROUND_JVM_BUG_6346658)
753 {
754 try
755 {
756 SelectionKey key = _key;
757 if (key!=null)
758 key.cancel();
759 }
760 catch (Throwable e)
761 {
762 LOG.ignore(e);
763 }
764 }
765
766 try
767 {
768 super.close();
769 }
770 catch (IOException e)
771 {
772 LOG.ignore(e);
773 }
774 finally
775 {
776 updateKey();
777 }
778 }
779
780
781 @Override
782 public String toString()
783 {
784
785
786
787 SelectionKey key = _key;
788 String keyString = "";
789 if (key != null)
790 {
791 if (key.isValid())
792 {
793 if (key.isReadable())
794 keyString += "r";
795 if (key.isWritable())
796 keyString += "w";
797 }
798 else
799 {
800 keyString += "!";
801 }
802 }
803 else
804 {
805 keyString += "-";
806 }
807 return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
808 hashCode(),
809 _socket.getRemoteSocketAddress(),
810 _socket.getLocalSocketAddress(),
811 _state,
812 isOpen(),
813 isInputShutdown(),
814 isOutputShutdown(),
815 _readBlocked,
816 _writeBlocked,
817 _writable,
818 _interestOps,
819 keyString,
820 _connection);
821 }
822
823
824 public SelectSet getSelectSet()
825 {
826 return _selectSet;
827 }
828
829
830
831
832
833
834 @Override
835 public void setMaxIdleTime(int timeMs) throws IOException
836 {
837 _maxIdleTime=timeMs;
838 }
839
840 }