1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.util.thread.Timeout.Task;
31
32
33
34
35
36 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37 {
38 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39
40 private final SelectorManager.SelectSet _selectSet;
41 private final SelectorManager _manager;
42 private SelectionKey _key;
43 private final Runnable _handler = new Runnable()
44 {
45 public void run() { handle(); }
46 };
47
48
49 private int _interestOps;
50
51
52
53
54
55
56
57 private volatile AsyncConnection _connection;
58
59
60 private boolean _dispatched = false;
61
62
63 private boolean _asyncDispatch = false;
64
65
66 private volatile boolean _writable = true;
67
68
69
70 private boolean _readBlocked;
71
72
73 private boolean _writeBlocked;
74
75
76 private boolean _open;
77
78 private volatile long _idleTimestamp;
79
80 private boolean _ishut;
81
82
83 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
84 throws IOException
85 {
86 super(channel, maxIdleTime);
87
88 _manager = selectSet.getManager();
89 _selectSet = selectSet;
90 _dispatched = false;
91 _asyncDispatch = false;
92 _open=true;
93 _key = key;
94
95 setCheckForIdle(true);
96 }
97
98
99 public SelectionKey getSelectionKey()
100 {
101 synchronized (this)
102 {
103 return _key;
104 }
105 }
106
107
108 public SelectorManager getSelectManager()
109 {
110 return _manager;
111 }
112
113
114 public Connection getConnection()
115 {
116 return _connection;
117 }
118
119
120 public void setConnection(Connection connection)
121 {
122 Connection old=_connection;
123 _connection=(AsyncConnection)connection;
124 if (old!=null && old!=_connection)
125 _manager.endPointUpgraded(this,old);
126 }
127
128
129 public long getIdleTimestamp()
130 {
131 return _idleTimestamp;
132 }
133
134
135
136
137
138 public void schedule()
139 {
140 synchronized (this)
141 {
142
143 if (_key == null || !_key.isValid())
144 {
145 _readBlocked=false;
146 _writeBlocked=false;
147 this.notifyAll();
148 return;
149 }
150
151
152 if (_readBlocked || _writeBlocked)
153 {
154
155 if (_readBlocked && _key.isReadable())
156 _readBlocked=false;
157 if (_writeBlocked && _key.isWritable())
158 _writeBlocked=false;
159
160
161 this.notifyAll();
162
163
164 _key.interestOps(0);
165 if (!_dispatched)
166 updateKey();
167 return;
168 }
169
170
171 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
172 {
173
174 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
175 _key.interestOps(_interestOps);
176 _writable = true;
177 }
178
179
180 if (_dispatched)
181 _key.interestOps(0);
182 else
183 {
184
185 dispatch();
186 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
187 {
188 _key.interestOps(0);
189 }
190 }
191 }
192 }
193
194
195 public void asyncDispatch()
196 {
197 synchronized(this)
198 {
199 if (_dispatched)
200 _asyncDispatch=true;
201 else
202 dispatch();
203 }
204 }
205
206
207 public void dispatch()
208 {
209 synchronized(this)
210 {
211 if (_dispatched)
212 {
213 throw new IllegalStateException("dispatched");
214 }
215 else
216 {
217 _dispatched = true;
218 boolean dispatched = _manager.dispatch(_handler);
219 if(!dispatched)
220 {
221 _dispatched = false;
222 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
223 updateKey();
224 }
225 }
226 }
227 }
228
229
230
231
232
233
234
235
236 protected boolean undispatch()
237 {
238 synchronized (this)
239 {
240 if (_asyncDispatch)
241 {
242 _asyncDispatch=false;
243 return false;
244 }
245 _dispatched = false;
246 updateKey();
247 }
248 return true;
249 }
250
251
252 public void cancelTimeout(Task task)
253 {
254 getSelectSet().cancelTimeout(task);
255 }
256
257
258 public void scheduleTimeout(Task task, long timeoutMs)
259 {
260 getSelectSet().scheduleTimeout(task,timeoutMs);
261 }
262
263
264 public void setCheckForIdle(boolean check)
265 {
266 _idleTimestamp=check?System.currentTimeMillis():0;
267 }
268
269
270 public boolean isCheckForIdle()
271 {
272 return _idleTimestamp!=0;
273 }
274
275
276 protected void notIdle()
277 {
278 if (_idleTimestamp!=0)
279 _idleTimestamp=System.currentTimeMillis();
280 }
281
282
283 public void checkIdleTimestamp(long now)
284 {
285 long idleTimestamp=_idleTimestamp;
286
287 if (idleTimestamp!=0 && _maxIdleTime>0)
288 {
289 long idleForMs=now-idleTimestamp;
290
291 if (idleForMs>_maxIdleTime)
292 {
293 onIdleExpired(idleForMs);
294 _idleTimestamp=now;
295 }
296 }
297 }
298
299
300 public void onIdleExpired(long idleForMs)
301 {
302 _connection.onIdleExpired(idleForMs);
303 }
304
305
306 @Override
307 public int fill(Buffer buffer) throws IOException
308 {
309 int fill=super.fill(buffer);
310 if (fill>0)
311 notIdle();
312 return fill;
313 }
314
315
316 @Override
317 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
318 {
319 int l = super.flush(header, buffer, trailer);
320
321
322 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
323 {
324 synchronized (this)
325 {
326 if (_dispatched)
327 _writable=false;
328 }
329 }
330 else if (l>0)
331 {
332 _writable=true;
333 notIdle();
334 }
335 return l;
336 }
337
338
339
340
341 @Override
342 public int flush(Buffer buffer) throws IOException
343 {
344 int l = super.flush(buffer);
345
346
347 if (l==0 && buffer!=null && buffer.hasContent())
348 {
349 synchronized (this)
350 {
351 if (_dispatched)
352 _writable=false;
353 }
354 }
355 else if (l>0)
356 {
357 _writable=true;
358 notIdle();
359 }
360
361 return l;
362 }
363
364
365
366
367
368 @Override
369 public boolean blockReadable(long timeoutMs) throws IOException
370 {
371 synchronized (this)
372 {
373 if (isInputShutdown())
374 throw new EofException();
375
376 long now=_selectSet.getNow();
377 long end=now+timeoutMs;
378 boolean check=isCheckForIdle();
379 setCheckForIdle(true);
380 try
381 {
382 _readBlocked=true;
383 while (!isInputShutdown() && _readBlocked)
384 {
385 try
386 {
387 updateKey();
388 this.wait(timeoutMs>=0?(end-now):10000);
389 }
390 catch (InterruptedException e)
391 {
392 LOG.warn(e);
393 }
394 finally
395 {
396 now=_selectSet.getNow();
397 }
398
399 if (_readBlocked && timeoutMs>0 && now>=end)
400 return false;
401 }
402 }
403 finally
404 {
405 _readBlocked=false;
406 setCheckForIdle(check);
407 }
408 }
409 return true;
410 }
411
412
413
414
415
416 @Override
417 public boolean blockWritable(long timeoutMs) throws IOException
418 {
419 synchronized (this)
420 {
421 if (isOutputShutdown())
422 throw new EofException();
423
424 long now=_selectSet.getNow();
425 long end=now+timeoutMs;
426 boolean check=isCheckForIdle();
427 setCheckForIdle(true);
428 try
429 {
430 _writeBlocked=true;
431 while (_writeBlocked && !isOutputShutdown())
432 {
433 try
434 {
435 updateKey();
436 this.wait(timeoutMs>=0?(end-now):10000);
437 }
438 catch (InterruptedException e)
439 {
440 LOG.warn(e);
441 }
442 finally
443 {
444 now=_selectSet.getNow();
445 }
446 if (_writeBlocked && timeoutMs>0 && now>=end)
447 return false;
448 }
449 }
450 finally
451 {
452 _writeBlocked=false;
453 setCheckForIdle(check);
454 }
455 }
456 return true;
457 }
458
459
460
461
462
463 public void scheduleWrite()
464 {
465 if (_writable==true)
466 LOG.debug("Required scheduleWrite {}",this);
467
468 _writable=false;
469 updateKey();
470 }
471
472
473 public boolean isWritable()
474 {
475 return _writable;
476 }
477
478
479 public boolean hasProgressed()
480 {
481 return false;
482 }
483
484
485
486
487
488
489
490 private void updateKey()
491 {
492 final boolean changed;
493 synchronized (this)
494 {
495 int current_ops=-1;
496 if (getChannel().isOpen())
497 {
498 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
499 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
500
501 _interestOps =
502 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
503 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
504 try
505 {
506 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
507 }
508 catch(Exception e)
509 {
510 _key=null;
511 LOG.ignore(e);
512 }
513 }
514 changed=_interestOps!=current_ops;
515 }
516
517 if(changed)
518 {
519 _selectSet.addChange(this);
520 _selectSet.wakeup();
521 }
522 }
523
524
525
526
527
528
529 void doUpdateKey()
530 {
531 synchronized (this)
532 {
533 if (getChannel().isOpen())
534 {
535 if (_interestOps>0)
536 {
537 if (_key==null || !_key.isValid())
538 {
539 SelectableChannel sc = (SelectableChannel)getChannel();
540 if (sc.isRegistered())
541 {
542 updateKey();
543 }
544 else
545 {
546 try
547 {
548 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
549 }
550 catch (Exception e)
551 {
552 LOG.ignore(e);
553 if (_key!=null && _key.isValid())
554 {
555 _key.cancel();
556 }
557
558 if (_open)
559 {
560 _selectSet.destroyEndPoint(this);
561 }
562 _open=false;
563 _key = null;
564 }
565 }
566 }
567 else
568 {
569 _key.interestOps(_interestOps);
570 }
571 }
572 else
573 {
574 if (_key!=null && _key.isValid())
575 _key.interestOps(0);
576 else
577 _key=null;
578 }
579 }
580 else
581 {
582 if (_key!=null && _key.isValid())
583 _key.cancel();
584
585 if (_open)
586 {
587 _open=false;
588 _selectSet.destroyEndPoint(this);
589 }
590 _key = null;
591 }
592 }
593 }
594
595
596
597
598 protected void handle()
599 {
600 boolean dispatched=true;
601 try
602 {
603 while(dispatched)
604 {
605 try
606 {
607 while(true)
608 {
609 final AsyncConnection next = (AsyncConnection)_connection.handle();
610 if (next!=_connection)
611 {
612 LOG.debug("{} replaced {}",next,_connection);
613 Connection old=_connection;
614 _connection=next;
615 _manager.endPointUpgraded(this,old);
616 continue;
617 }
618 break;
619 }
620 }
621 catch (ClosedChannelException e)
622 {
623 LOG.ignore(e);
624 }
625 catch (EofException e)
626 {
627 LOG.debug("EOF", e);
628 try{close();}
629 catch(IOException e2){LOG.ignore(e2);}
630 }
631 catch (IOException e)
632 {
633 LOG.warn(e.toString());
634 try{close();}
635 catch(IOException e2){LOG.ignore(e2);}
636 }
637 catch (Throwable e)
638 {
639 LOG.warn("handle failed", e);
640 try{close();}
641 catch(IOException e2){LOG.ignore(e2);}
642 }
643 finally
644 {
645 if (!_ishut && isInputShutdown() && isOpen())
646 {
647 _ishut=true;
648 try
649 {
650 _connection.onInputShutdown();
651 }
652 catch(Throwable x)
653 {
654 LOG.warn("onInputShutdown failed", x);
655 try{close();}
656 catch(IOException e2){LOG.ignore(e2);}
657 }
658 finally
659 {
660 updateKey();
661 }
662 }
663 dispatched=!undispatch();
664 }
665 }
666 }
667 finally
668 {
669 if (dispatched)
670 {
671 dispatched=!undispatch();
672 while (dispatched)
673 {
674 LOG.warn("SCEP.run() finally DISPATCHED");
675 dispatched=!undispatch();
676 }
677 }
678 }
679 }
680
681
682
683
684
685 @Override
686 public void close() throws IOException
687 {
688 try
689 {
690 super.close();
691 }
692 catch (IOException e)
693 {
694 LOG.ignore(e);
695 }
696 finally
697 {
698 updateKey();
699 }
700 }
701
702
703 @Override
704 public String toString()
705 {
706
707
708
709 SelectionKey key = _key;
710 String keyString = "";
711 if (key != null)
712 {
713 if (key.isValid())
714 {
715 if (key.isReadable())
716 keyString += "r";
717 if (key.isWritable())
718 keyString += "w";
719 }
720 else
721 {
722 keyString += "!";
723 }
724 }
725 else
726 {
727 keyString += "-";
728 }
729 return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
730 hashCode(),
731 _socket.getRemoteSocketAddress(),
732 _socket.getLocalSocketAddress(),
733 _dispatched,
734 isOpen(),
735 isInputShutdown(),
736 isOutputShutdown(),
737 _readBlocked,
738 _writeBlocked,
739 _writable,
740 _interestOps,
741 keyString,
742 _connection);
743 }
744
745
746 public SelectSet getSelectSet()
747 {
748 return _selectSet;
749 }
750
751
752
753
754
755
756 @Override
757 public void setMaxIdleTime(int timeMs) throws IOException
758 {
759 _maxIdleTime=timeMs;
760 }
761
762 }