1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.util.thread.Timeout.Task;
31
32
33
34
35
36 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37 {
38 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39
40 private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase().contains("win");
41 private final SelectorManager.SelectSet _selectSet;
42 private final SelectorManager _manager;
43 private SelectionKey _key;
44 private final Runnable _handler = new Runnable()
45 {
46 public void run() { handle(); }
47 };
48
49
50 private int _interestOps;
51
52
53
54
55
56
57
58 private volatile AsyncConnection _connection;
59
60
61 private boolean _dispatched = false;
62
63
64 private boolean _asyncDispatch = false;
65
66
67 private volatile boolean _writable = true;
68
69
70
71 private boolean _readBlocked;
72
73
74 private boolean _writeBlocked;
75
76
77 private boolean _open;
78
79 private volatile long _idleTimestamp;
80
81 private boolean _ishut;
82
83
84 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
85 throws IOException
86 {
87 super(channel, maxIdleTime);
88
89 _manager = selectSet.getManager();
90 _selectSet = selectSet;
91 _dispatched = false;
92 _asyncDispatch = false;
93 _open=true;
94 _key = key;
95
96 setCheckForIdle(true);
97 }
98
99
100 public SelectionKey getSelectionKey()
101 {
102 synchronized (this)
103 {
104 return _key;
105 }
106 }
107
108
109 public SelectorManager getSelectManager()
110 {
111 return _manager;
112 }
113
114
115 public Connection getConnection()
116 {
117 return _connection;
118 }
119
120
121 public void setConnection(Connection connection)
122 {
123 Connection old=_connection;
124 _connection=(AsyncConnection)connection;
125 if (old!=null && old!=_connection)
126 _manager.endPointUpgraded(this,old);
127 }
128
129
130 public long getIdleTimestamp()
131 {
132 return _idleTimestamp;
133 }
134
135
136
137
138
139 public void schedule()
140 {
141 synchronized (this)
142 {
143
144 if (_key == null || !_key.isValid())
145 {
146 _readBlocked=false;
147 _writeBlocked=false;
148 this.notifyAll();
149 return;
150 }
151
152
153 if (_readBlocked || _writeBlocked)
154 {
155
156 if (_readBlocked && _key.isReadable())
157 _readBlocked=false;
158 if (_writeBlocked && _key.isWritable())
159 _writeBlocked=false;
160
161
162 this.notifyAll();
163
164
165 _key.interestOps(0);
166 if (!_dispatched)
167 updateKey();
168 return;
169 }
170
171
172 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
173 {
174
175 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
176 _key.interestOps(_interestOps);
177 _writable = true;
178 }
179
180
181 if (_dispatched)
182 _key.interestOps(0);
183 else
184 {
185
186 dispatch();
187 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
188 {
189 _key.interestOps(0);
190 }
191 }
192 }
193 }
194
195
196 public void asyncDispatch()
197 {
198 synchronized(this)
199 {
200 if (_dispatched)
201 _asyncDispatch=true;
202 else
203 dispatch();
204 }
205 }
206
207
208 public void dispatch()
209 {
210 synchronized(this)
211 {
212 if (!_dispatched)
213 {
214 _dispatched = true;
215 boolean dispatched = _manager.dispatch(_handler);
216 if(!dispatched)
217 {
218 _dispatched = false;
219 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
220 updateKey();
221 }
222 }
223 }
224 }
225
226
227
228
229
230
231
232
233 protected boolean undispatch()
234 {
235 synchronized (this)
236 {
237 if (_asyncDispatch)
238 {
239 _asyncDispatch=false;
240 return false;
241 }
242 _dispatched = false;
243 updateKey();
244 }
245 return true;
246 }
247
248
249 public void cancelTimeout(Task task)
250 {
251 getSelectSet().cancelTimeout(task);
252 }
253
254
255 public void scheduleTimeout(Task task, long timeoutMs)
256 {
257 getSelectSet().scheduleTimeout(task,timeoutMs);
258 }
259
260
261 public void setCheckForIdle(boolean check)
262 {
263 _idleTimestamp=check?System.currentTimeMillis():0;
264 }
265
266
267 public boolean isCheckForIdle()
268 {
269 return _idleTimestamp!=0;
270 }
271
272
273 protected void notIdle()
274 {
275 if (_idleTimestamp!=0)
276 _idleTimestamp=System.currentTimeMillis();
277 }
278
279
280 public void checkIdleTimestamp(long now)
281 {
282 long idleTimestamp=_idleTimestamp;
283
284 if (idleTimestamp!=0 && _maxIdleTime>0)
285 {
286 final long idleForMs=now-idleTimestamp;
287
288 if (idleForMs>_maxIdleTime)
289 {
290
291 setCheckForIdle(false);
292 _manager.dispatch(new Runnable()
293 {
294 public void run()
295 {
296 try
297 {
298 onIdleExpired(idleForMs);
299 }
300 finally
301 {
302 setCheckForIdle(true);
303 }
304 }
305 });
306 }
307 }
308 }
309
310
311 public void onIdleExpired(long idleForMs)
312 {
313 _connection.onIdleExpired(idleForMs);
314 }
315
316
317 @Override
318 public int fill(Buffer buffer) throws IOException
319 {
320 int fill=super.fill(buffer);
321 if (fill>0)
322 notIdle();
323 return fill;
324 }
325
326
327 @Override
328 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
329 {
330 int l = super.flush(header, buffer, trailer);
331
332
333 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
334 {
335 synchronized (this)
336 {
337 if (_dispatched)
338 _writable=false;
339 }
340 }
341 else if (l>0)
342 {
343 _writable=true;
344 notIdle();
345 }
346 return l;
347 }
348
349
350
351
352 @Override
353 public int flush(Buffer buffer) throws IOException
354 {
355 int l = super.flush(buffer);
356
357
358 if (l==0 && buffer!=null && buffer.hasContent())
359 {
360 synchronized (this)
361 {
362 if (_dispatched)
363 _writable=false;
364 }
365 }
366 else if (l>0)
367 {
368 _writable=true;
369 notIdle();
370 }
371
372 return l;
373 }
374
375
376
377
378
379 @Override
380 public boolean blockReadable(long timeoutMs) throws IOException
381 {
382 synchronized (this)
383 {
384 if (isInputShutdown())
385 throw new EofException();
386
387 long now=_selectSet.getNow();
388 long end=now+timeoutMs;
389 boolean check=isCheckForIdle();
390 setCheckForIdle(true);
391 try
392 {
393 _readBlocked=true;
394 while (!isInputShutdown() && _readBlocked)
395 {
396 try
397 {
398 updateKey();
399 this.wait(timeoutMs>0?(end-now):10000);
400 }
401 catch (InterruptedException e)
402 {
403 LOG.warn(e);
404 }
405 finally
406 {
407 now=_selectSet.getNow();
408 }
409
410 if (_readBlocked && timeoutMs>0 && now>=end)
411 return false;
412 }
413 }
414 finally
415 {
416 _readBlocked=false;
417 setCheckForIdle(check);
418 }
419 }
420 return true;
421 }
422
423
424
425
426
427 @Override
428 public boolean blockWritable(long timeoutMs) throws IOException
429 {
430 synchronized (this)
431 {
432 if (isOutputShutdown())
433 throw new EofException();
434
435 long now=_selectSet.getNow();
436 long end=now+timeoutMs;
437 boolean check=isCheckForIdle();
438 setCheckForIdle(true);
439 try
440 {
441 _writeBlocked=true;
442 while (_writeBlocked && !isOutputShutdown())
443 {
444 try
445 {
446 updateKey();
447 this.wait(timeoutMs>0?(end-now):10000);
448 }
449 catch (InterruptedException e)
450 {
451 LOG.warn(e);
452 }
453 finally
454 {
455 now=_selectSet.getNow();
456 }
457 if (_writeBlocked && timeoutMs>0 && now>=end)
458 return false;
459 }
460 }
461 finally
462 {
463 _writeBlocked=false;
464 setCheckForIdle(check);
465 }
466 }
467 return true;
468 }
469
470
471
472
473
474 public void scheduleWrite()
475 {
476 if (_writable)
477 LOG.debug("Required scheduleWrite {}",this);
478
479 _writable=false;
480 updateKey();
481 }
482
483
484 public boolean isWritable()
485 {
486 return _writable;
487 }
488
489
490 public boolean hasProgressed()
491 {
492 return false;
493 }
494
495
496
497
498
499
500
501 private void updateKey()
502 {
503 final boolean changed;
504 synchronized (this)
505 {
506 int current_ops=-1;
507 if (getChannel().isOpen())
508 {
509 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
510 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
511
512 _interestOps =
513 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
514 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
515 try
516 {
517 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
518 }
519 catch(Exception e)
520 {
521 _key=null;
522 LOG.ignore(e);
523 }
524 }
525 changed=_interestOps!=current_ops;
526 }
527
528 if(changed)
529 {
530 _selectSet.addChange(this);
531 _selectSet.wakeup();
532 }
533 }
534
535
536
537
538
539
540 void doUpdateKey()
541 {
542 synchronized (this)
543 {
544 if (getChannel().isOpen())
545 {
546 if (_interestOps>0)
547 {
548 if (_key==null || !_key.isValid())
549 {
550 SelectableChannel sc = (SelectableChannel)getChannel();
551 if (sc.isRegistered())
552 {
553 updateKey();
554 }
555 else
556 {
557 try
558 {
559 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
560 }
561 catch (Exception e)
562 {
563 LOG.ignore(e);
564 if (_key!=null && _key.isValid())
565 {
566 _key.cancel();
567 }
568
569 if (_open)
570 {
571 _selectSet.destroyEndPoint(this);
572 }
573 _open=false;
574 _key = null;
575 }
576 }
577 }
578 else
579 {
580 _key.interestOps(_interestOps);
581 }
582 }
583 else
584 {
585 if (_key!=null && _key.isValid())
586 _key.interestOps(0);
587 else
588 _key=null;
589 }
590 }
591 else
592 {
593 if (_key!=null && _key.isValid())
594 _key.cancel();
595
596 if (_open)
597 {
598 _open=false;
599 _selectSet.destroyEndPoint(this);
600 }
601 _key = null;
602 }
603 }
604 }
605
606
607
608
609 protected void handle()
610 {
611 boolean dispatched=true;
612 try
613 {
614 while(dispatched)
615 {
616 try
617 {
618 while(true)
619 {
620 final AsyncConnection next = (AsyncConnection)_connection.handle();
621 if (next!=_connection)
622 {
623 LOG.debug("{} replaced {}",next,_connection);
624 Connection old=_connection;
625 _connection=next;
626 _manager.endPointUpgraded(this,old);
627 continue;
628 }
629 break;
630 }
631 }
632 catch (ClosedChannelException e)
633 {
634 LOG.ignore(e);
635 }
636 catch (EofException e)
637 {
638 LOG.debug("EOF", e);
639 try{close();}
640 catch(IOException e2){LOG.ignore(e2);}
641 }
642 catch (IOException e)
643 {
644 LOG.warn(e.toString());
645 try{close();}
646 catch(IOException e2){LOG.ignore(e2);}
647 }
648 catch (Throwable e)
649 {
650 LOG.warn("handle failed", e);
651 try{close();}
652 catch(IOException e2){LOG.ignore(e2);}
653 }
654 finally
655 {
656 if (!_ishut && isInputShutdown() && isOpen())
657 {
658 _ishut=true;
659 try
660 {
661 _connection.onInputShutdown();
662 }
663 catch(Throwable x)
664 {
665 LOG.warn("onInputShutdown failed", x);
666 try{close();}
667 catch(IOException e2){LOG.ignore(e2);}
668 }
669 finally
670 {
671 updateKey();
672 }
673 }
674 dispatched=!undispatch();
675 }
676 }
677 }
678 finally
679 {
680 if (dispatched)
681 {
682 dispatched=!undispatch();
683 while (dispatched)
684 {
685 LOG.warn("SCEP.run() finally DISPATCHED");
686 dispatched=!undispatch();
687 }
688 }
689 }
690 }
691
692
693
694
695
696 @Override
697 public void close() throws IOException
698 {
699
700
701
702
703
704 if (WORK_AROUND_JVM_BUG_6346658)
705 {
706 try
707 {
708 SelectionKey key = _key;
709 if (key!=null)
710 key.cancel();
711 }
712 catch (Throwable e)
713 {
714 LOG.ignore(e);
715 }
716 }
717
718 try
719 {
720 super.close();
721 }
722 catch (IOException e)
723 {
724 LOG.ignore(e);
725 }
726 finally
727 {
728 updateKey();
729 }
730 }
731
732
733 @Override
734 public String toString()
735 {
736
737
738
739 SelectionKey key = _key;
740 String keyString = "";
741 if (key != null)
742 {
743 if (key.isValid())
744 {
745 if (key.isReadable())
746 keyString += "r";
747 if (key.isWritable())
748 keyString += "w";
749 }
750 else
751 {
752 keyString += "!";
753 }
754 }
755 else
756 {
757 keyString += "-";
758 }
759 return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
760 hashCode(),
761 _socket.getRemoteSocketAddress(),
762 _socket.getLocalSocketAddress(),
763 _dispatched,
764 isOpen(),
765 isInputShutdown(),
766 isOutputShutdown(),
767 _readBlocked,
768 _writeBlocked,
769 _writable,
770 _interestOps,
771 keyString,
772 _connection);
773 }
774
775
776 public SelectSet getSelectSet()
777 {
778 return _selectSet;
779 }
780
781
782
783
784
785
786 @Override
787 public void setMaxIdleTime(int timeMs) throws IOException
788 {
789 _maxIdleTime=timeMs;
790 }
791
792 }