1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.util.thread.Timeout.Task;
31
32
33
34
35
36 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37 {
38 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39
40 private final SelectorManager.SelectSet _selectSet;
41 private final SelectorManager _manager;
42 private SelectionKey _key;
43 private final Runnable _handler = new Runnable()
44 {
45 public void run() { handle(); }
46 };
47
48
49 private int _interestOps;
50
51
52
53
54
55
56
57 private volatile AsyncConnection _connection;
58
59
60 private boolean _dispatched = false;
61
62
63 private boolean _asyncDispatch = false;
64
65
66 private volatile boolean _writable = true;
67
68
69
70 private boolean _readBlocked;
71
72
73 private boolean _writeBlocked;
74
75
76 private boolean _open;
77
78 private volatile long _idleTimestamp;
79
80 private boolean _ishut;
81
82
83 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
84 throws IOException
85 {
86 super(channel, maxIdleTime);
87
88 _manager = selectSet.getManager();
89 _selectSet = selectSet;
90 _dispatched = false;
91 _asyncDispatch = false;
92 _open=true;
93 _key = key;
94
95 setCheckForIdle(true);
96 }
97
98
99 public SelectionKey getSelectionKey()
100 {
101 synchronized (this)
102 {
103 return _key;
104 }
105 }
106
107
108 public SelectorManager getSelectManager()
109 {
110 return _manager;
111 }
112
113
114 public Connection getConnection()
115 {
116 return _connection;
117 }
118
119
120 public void setConnection(Connection connection)
121 {
122 Connection old=_connection;
123 _connection=(AsyncConnection)connection;
124 if (old!=null && old!=_connection)
125 _manager.endPointUpgraded(this,old);
126 }
127
128
129 public long getIdleTimestamp()
130 {
131 return _idleTimestamp;
132 }
133
134
135
136
137
138 public void schedule()
139 {
140 synchronized (this)
141 {
142
143 if (_key == null || !_key.isValid())
144 {
145 _readBlocked=false;
146 _writeBlocked=false;
147 this.notifyAll();
148 return;
149 }
150
151
152 if (_readBlocked || _writeBlocked)
153 {
154
155 if (_readBlocked && _key.isReadable())
156 _readBlocked=false;
157 if (_writeBlocked && _key.isWritable())
158 _writeBlocked=false;
159
160
161 this.notifyAll();
162
163
164 _key.interestOps(0);
165 if (!_dispatched)
166 updateKey();
167 return;
168 }
169
170
171 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
172 {
173
174 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
175 _key.interestOps(_interestOps);
176 _writable = true;
177 }
178
179
180 if (!_dispatched)
181 {
182 dispatch();
183 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
184 {
185 _key.interestOps(0);
186 }
187 }
188 }
189 }
190
191
192 public void asyncDispatch()
193 {
194 synchronized(this)
195 {
196 if (_dispatched)
197 _asyncDispatch=true;
198 else
199 dispatch();
200 }
201 }
202
203
204 public void dispatch()
205 {
206 synchronized(this)
207 {
208 if (_dispatched)
209 {
210 throw new IllegalStateException("dispatched");
211 }
212 else
213 {
214 _dispatched = true;
215 boolean dispatched = _manager.dispatch(_handler);
216 if(!dispatched)
217 {
218 _dispatched = false;
219 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
220 updateKey();
221 }
222 }
223 }
224 }
225
226
227
228
229
230
231
232
233 protected boolean undispatch()
234 {
235 synchronized (this)
236 {
237 if (_asyncDispatch)
238 {
239 _asyncDispatch=false;
240 return false;
241 }
242 _dispatched = false;
243 updateKey();
244 }
245 return true;
246 }
247
248
249 public void cancelTimeout(Task task)
250 {
251 getSelectSet().cancelTimeout(task);
252 }
253
254
255 public void scheduleTimeout(Task task, long timeoutMs)
256 {
257 getSelectSet().scheduleTimeout(task,timeoutMs);
258 }
259
260
261 public void setCheckForIdle(boolean check)
262 {
263 _idleTimestamp=check?System.currentTimeMillis():0;
264 }
265
266
267 public boolean isCheckForIdle()
268 {
269 return _idleTimestamp!=0;
270 }
271
272
273 protected void notIdle()
274 {
275 if (_idleTimestamp!=0)
276 _idleTimestamp=System.currentTimeMillis();
277 }
278
279
280 public void checkIdleTimestamp(long now)
281 {
282 long idleTimestamp=_idleTimestamp;
283
284 if (idleTimestamp!=0 && _maxIdleTime>0)
285 {
286 long idleForMs=now-idleTimestamp;
287
288 if (idleForMs>_maxIdleTime)
289 {
290 onIdleExpired(idleForMs);
291 _idleTimestamp=now;
292 }
293 }
294 }
295
296
297 public void onIdleExpired(long idleForMs)
298 {
299 _connection.onIdleExpired(idleForMs);
300 }
301
302
303 @Override
304 public int fill(Buffer buffer) throws IOException
305 {
306 int fill=super.fill(buffer);
307 if (fill>0)
308 notIdle();
309 return fill;
310 }
311
312
313 @Override
314 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
315 {
316 int l = super.flush(header, buffer, trailer);
317
318
319 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
320 {
321 synchronized (this)
322 {
323 _writable=false;
324 if (!_dispatched)
325 updateKey();
326 }
327 }
328 else if (l>0)
329 {
330 _writable=true;
331 notIdle();
332 }
333 return l;
334 }
335
336
337
338
339 @Override
340 public int flush(Buffer buffer) throws IOException
341 {
342 int l = super.flush(buffer);
343
344
345 if (l==0 && buffer!=null && buffer.hasContent())
346 {
347 synchronized (this)
348 {
349 _writable=false;
350 if (!_dispatched)
351 updateKey();
352 }
353 }
354 else if (l>0)
355 {
356 _writable=true;
357 notIdle();
358 }
359
360 return l;
361 }
362
363
364
365
366
367 @Override
368 public boolean blockReadable(long timeoutMs) throws IOException
369 {
370 synchronized (this)
371 {
372 if (isInputShutdown())
373 throw new EofException();
374
375 long now=_selectSet.getNow();
376 long end=now+timeoutMs;
377 boolean check=isCheckForIdle();
378 setCheckForIdle(true);
379 try
380 {
381 _readBlocked=true;
382 while (!isInputShutdown() && _readBlocked)
383 {
384 try
385 {
386 updateKey();
387 this.wait(timeoutMs>=0?(end-now):10000);
388 }
389 catch (InterruptedException e)
390 {
391 LOG.warn(e);
392 }
393 finally
394 {
395 now=_selectSet.getNow();
396 }
397
398 if (_readBlocked && timeoutMs>0 && now>=end)
399 return false;
400 }
401 }
402 finally
403 {
404 _readBlocked=false;
405 setCheckForIdle(check);
406 }
407 }
408 return true;
409 }
410
411
412
413
414
415 @Override
416 public boolean blockWritable(long timeoutMs) throws IOException
417 {
418 synchronized (this)
419 {
420 if (isOutputShutdown())
421 throw new EofException();
422
423 long now=_selectSet.getNow();
424 long end=now+timeoutMs;
425 boolean check=isCheckForIdle();
426 setCheckForIdle(true);
427 try
428 {
429 _writeBlocked=true;
430 while (_writeBlocked && !isOutputShutdown())
431 {
432 try
433 {
434 updateKey();
435 this.wait(timeoutMs>=0?(end-now):10000);
436 }
437 catch (InterruptedException e)
438 {
439 LOG.warn(e);
440 }
441 finally
442 {
443 now=_selectSet.getNow();
444 }
445 if (_writeBlocked && timeoutMs>0 && now>=end)
446 return false;
447 }
448 }
449 finally
450 {
451 _writeBlocked=false;
452 setCheckForIdle(check);
453 }
454 }
455 return true;
456 }
457
458
459
460 public void clearWritable()
461 {
462 _writable=false;
463 }
464
465
466
467
468
469 public void scheduleWrite()
470 {
471 if (_writable==true)
472 LOG.debug("Required scheduleWrite {}",this);
473
474 _writable=false;
475 updateKey();
476 }
477
478
479 public boolean isWritable()
480 {
481 return _writable;
482 }
483
484
485 public boolean hasProgressed()
486 {
487 return false;
488 }
489
490
491
492
493
494
495
496 private void updateKey()
497 {
498 final boolean changed;
499 synchronized (this)
500 {
501 int current_ops=-1;
502 if (getChannel().isOpen())
503 {
504 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
505 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
506
507 _interestOps =
508 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
509 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
510 try
511 {
512 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
513 }
514 catch(Exception e)
515 {
516 _key=null;
517 LOG.ignore(e);
518 }
519 }
520 changed=_interestOps!=current_ops;
521 }
522
523 if(changed)
524 {
525 _selectSet.addChange(this);
526 _selectSet.wakeup();
527 }
528 }
529
530
531
532
533
534
535 void doUpdateKey()
536 {
537 synchronized (this)
538 {
539 if (getChannel().isOpen())
540 {
541 if (_interestOps>0)
542 {
543 if (_key==null || !_key.isValid())
544 {
545 SelectableChannel sc = (SelectableChannel)getChannel();
546 if (sc.isRegistered())
547 {
548 updateKey();
549 }
550 else
551 {
552 try
553 {
554 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
555 }
556 catch (Exception e)
557 {
558 LOG.ignore(e);
559 if (_key!=null && _key.isValid())
560 {
561 _key.cancel();
562 }
563
564 if (_open)
565 {
566 _selectSet.destroyEndPoint(this);
567 }
568 _open=false;
569 _key = null;
570 }
571 }
572 }
573 else
574 {
575 _key.interestOps(_interestOps);
576 }
577 }
578 else
579 {
580 if (_key!=null && _key.isValid())
581 _key.interestOps(0);
582 else
583 _key=null;
584 }
585 }
586 else
587 {
588 if (_key!=null && _key.isValid())
589 _key.cancel();
590
591 if (_open)
592 {
593 _open=false;
594 _selectSet.destroyEndPoint(this);
595 }
596 _key = null;
597 }
598 }
599 }
600
601
602
603
604 protected void handle()
605 {
606 boolean dispatched=true;
607 try
608 {
609 while(dispatched)
610 {
611 try
612 {
613 while(true)
614 {
615 final AsyncConnection next = (AsyncConnection)_connection.handle();
616 if (next!=_connection)
617 {
618 LOG.debug("{} replaced {}",next,_connection);
619 Connection old=_connection;
620 _connection=next;
621 _manager.endPointUpgraded(this,old);
622 continue;
623 }
624 break;
625 }
626 }
627 catch (ClosedChannelException e)
628 {
629 LOG.ignore(e);
630 }
631 catch (EofException e)
632 {
633 LOG.debug("EOF", e);
634 try{close();}
635 catch(IOException e2){LOG.ignore(e2);}
636 }
637 catch (IOException e)
638 {
639 LOG.warn(e.toString());
640 LOG.debug(e);
641 try{close();}
642 catch(IOException e2){LOG.ignore(e2);}
643 }
644 catch (Throwable e)
645 {
646 LOG.warn("handle failed", e);
647 try{close();}
648 catch(IOException e2){LOG.ignore(e2);}
649 }
650 finally
651 {
652 if (!_ishut && isInputShutdown() && isOpen())
653 {
654 _ishut=true;
655 try
656 {
657 _connection.onInputShutdown();
658 }
659 catch(Throwable x)
660 {
661 LOG.warn("onInputShutdown failed", x);
662 try{close();}
663 catch(IOException e2){LOG.ignore(e2);}
664 }
665 finally
666 {
667 updateKey();
668 }
669 }
670 dispatched=!undispatch();
671 }
672 }
673 }
674 finally
675 {
676 if (dispatched)
677 {
678 dispatched=!undispatch();
679 while (dispatched)
680 {
681 LOG.warn("SCEP.run() finally DISPATCHED");
682 dispatched=!undispatch();
683 }
684 }
685 }
686 }
687
688
689
690
691
692 @Override
693 public void close() throws IOException
694 {
695 try
696 {
697 super.close();
698 }
699 catch (IOException e)
700 {
701 LOG.ignore(e);
702 }
703 finally
704 {
705 updateKey();
706 }
707 }
708
709
710 @Override
711 public String toString()
712 {
713
714
715
716 SelectionKey key = _key;
717 String keyString = "";
718 if (key != null)
719 {
720 if (key.isValid())
721 {
722 if (key.isReadable())
723 keyString += "r";
724 if (key.isWritable())
725 keyString += "w";
726 }
727 else
728 {
729 keyString += "!";
730 }
731 }
732 else
733 {
734 keyString += "-";
735 }
736 return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
737 hashCode(),
738 _socket.getRemoteSocketAddress(),
739 _socket.getLocalSocketAddress(),
740 _dispatched,
741 isOpen(),
742 isInputShutdown(),
743 isOutputShutdown(),
744 _readBlocked,
745 _writeBlocked,
746 _writable,
747 _interestOps,
748 keyString,
749 _connection);
750 }
751
752
753 public SelectSet getSelectSet()
754 {
755 return _selectSet;
756 }
757
758
759
760
761
762
763 @Override
764 public void setMaxIdleTime(int timeMs) throws IOException
765 {
766 _maxIdleTime=timeMs;
767 }
768
769 }