1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.Channel;
19 import java.nio.channels.ClosedSelectorException;
20 import java.nio.channels.SelectableChannel;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.Selector;
23 import java.nio.channels.ServerSocketChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33
34 import org.eclipse.jetty.io.ConnectedEndPoint;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.util.TypeUtil;
38 import org.eclipse.jetty.util.component.AbstractLifeCycle;
39 import org.eclipse.jetty.util.component.AggregateLifeCycle;
40 import org.eclipse.jetty.util.component.Dumpable;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43 import org.eclipse.jetty.util.thread.Timeout;
44 import org.eclipse.jetty.util.thread.Timeout.Task;
45
46
47
48
49
50
51
52
53
54
55 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
56 {
57 public static final Logger __log=Log.getLogger("org.eclipse.jetty.io.nio");
58
59
60 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
61 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
62 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
63 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
64 private static final int __BUSY_KEY=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_KEY",-1).intValue();
65 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
66
67 private int _maxIdleTime;
68 private int _lowResourcesMaxIdleTime;
69 private long _lowResourcesConnections;
70 private SelectSet[] _selectSet;
71 private int _selectSets=1;
72 private volatile int _set;
73 private boolean _deferringInterestedOps0=true;
74
75
76
77
78
79
80 public void setMaxIdleTime(long maxIdleTime)
81 {
82 _maxIdleTime=(int)maxIdleTime;
83 }
84
85
86
87
88
89 public void setSelectSets(int selectSets)
90 {
91 long lrc = _lowResourcesConnections * _selectSets;
92 _selectSets=selectSets;
93 _lowResourcesConnections=lrc/_selectSets;
94 }
95
96
97
98
99
100 public long getMaxIdleTime()
101 {
102 return _maxIdleTime;
103 }
104
105
106
107
108
109 public int getSelectSets()
110 {
111 return _selectSets;
112 }
113
114
115
116
117
118
119 public SelectSet getSelectSet(int i)
120 {
121 return _selectSet[i];
122 }
123
124
125
126
127
128
129 public void register(SocketChannel channel, Object att)
130 {
131
132
133
134
135 int s=_set++;
136 s=s%_selectSets;
137 SelectSet[] sets=_selectSet;
138 if (sets!=null)
139 {
140 SelectSet set=sets[s];
141 set.addChange(channel,att);
142 set.wakeup();
143 }
144 }
145
146
147
148
149
150
151 public void register(SocketChannel channel)
152 {
153
154
155
156
157 int s=_set++;
158 s=s%_selectSets;
159 SelectSet[] sets=_selectSet;
160 if (sets!=null)
161 {
162 SelectSet set=sets[s];
163 set.addChange(channel);
164 set.wakeup();
165 }
166 }
167
168
169
170
171
172 public void register(ServerSocketChannel acceptChannel)
173 {
174 int s=_set++;
175 s=s%_selectSets;
176 SelectSet set=_selectSet[s];
177 set.addChange(acceptChannel);
178 set.wakeup();
179 }
180
181
182
183
184
185 public long getLowResourcesConnections()
186 {
187 return _lowResourcesConnections*_selectSets;
188 }
189
190
191
192
193
194
195
196
197 public void setLowResourcesConnections(long lowResourcesConnections)
198 {
199 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
200 }
201
202
203
204
205
206 public long getLowResourcesMaxIdleTime()
207 {
208 return _lowResourcesMaxIdleTime;
209 }
210
211
212
213
214
215
216 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
217 {
218 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
219 }
220
221
222
223
224
225
226 public void doSelect(int acceptorID) throws IOException
227 {
228 SelectSet[] sets= _selectSet;
229 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
230 sets[acceptorID].doSelect();
231 }
232
233
234 public abstract boolean dispatch(Runnable task);
235
236
237
238
239
240 @Override
241 protected void doStart() throws Exception
242 {
243 _selectSet = new SelectSet[_selectSets];
244 for (int i=0;i<_selectSet.length;i++)
245 _selectSet[i]= new SelectSet(i);
246
247 super.doStart();
248 }
249
250
251
252 @Override
253 protected void doStop() throws Exception
254 {
255 SelectSet[] sets= _selectSet;
256 _selectSet=null;
257 if (sets!=null)
258 {
259 for (SelectSet set : sets)
260 {
261 if (set!=null)
262 set.stop();
263 }
264 }
265 super.doStop();
266 }
267
268
269
270
271
272 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
273
274
275
276
277
278 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
279
280
281 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
282
283
284 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
285
286
287
288
289
290
291
292
293
294
295 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
296
297
298 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
299 {
300 __log.warn(ex+","+channel+","+attachment);
301 __log.debug(ex);
302 }
303
304
305 public String dump()
306 {
307 return AggregateLifeCycle.dump(this);
308 }
309
310
311 public void dump(Appendable out, String indent) throws IOException
312 {
313 out.append(String.valueOf(this)).append("\n");
314 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
315 }
316
317
318
319
320
321 public class SelectSet implements Dumpable
322 {
323 private final int _setID;
324 private final Timeout _timeout;
325
326 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
327
328 private Selector _selector;
329
330 private volatile Thread _selecting;
331 private int _jvmBug;
332 private int _selects;
333 private long _monitorStart;
334 private long _monitorNext;
335 private boolean _pausing;
336 private SelectionKey _busyKey;
337 private int _busyKeyCount;
338 private long _log;
339 private int _paused;
340 private int _jvmFix0;
341 private int _jvmFix1;
342 private int _jvmFix2;
343 private volatile long _idleTick;
344 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
345
346
347 SelectSet(int acceptorID) throws Exception
348 {
349 _setID=acceptorID;
350
351 _idleTick = System.currentTimeMillis();
352 _timeout = new Timeout(this);
353 _timeout.setDuration(0L);
354
355
356 _selector = Selector.open();
357 _monitorStart=System.currentTimeMillis();
358 _monitorNext=_monitorStart+__MONITOR_PERIOD;
359 _log=_monitorStart+60000;
360 }
361
362
363 public void addChange(Object change)
364 {
365 _changes.add(change);
366 }
367
368
369 public void addChange(SelectableChannel channel, Object att)
370 {
371 if (att==null)
372 addChange(channel);
373 else if (att instanceof EndPoint)
374 addChange(att);
375 else
376 addChange(new ChannelAndAttachment(channel,att));
377 }
378
379
380
381
382
383
384
385 public void doSelect() throws IOException
386 {
387 try
388 {
389 _selecting=Thread.currentThread();
390 final Selector selector=_selector;
391
392
393 Object change;
394 int changes=_changes.size();
395 while (changes-->0 && (change=_changes.poll())!=null)
396 {
397 Channel ch=null;
398 SelectionKey key=null;
399
400 try
401 {
402 if (change instanceof EndPoint)
403 {
404
405 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
406 ch=endpoint.getChannel();
407 endpoint.doUpdateKey();
408 }
409 else if (change instanceof ChannelAndAttachment)
410 {
411
412 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
413 final SelectableChannel channel=asc._channel;
414 ch=channel;
415 final Object att = asc._attachment;
416
417 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
418 {
419 key = channel.register(selector,SelectionKey.OP_READ,att);
420 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
421 key.attach(endpoint);
422 endpoint.schedule();
423 }
424 else if (channel.isOpen())
425 {
426 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
427 }
428 }
429 else if (change instanceof SocketChannel)
430 {
431
432 final SocketChannel channel=(SocketChannel)change;
433 ch=channel;
434 key = channel.register(selector,SelectionKey.OP_READ,null);
435 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
436 key.attach(endpoint);
437 endpoint.schedule();
438 }
439 else if (change instanceof ChangeTask)
440 {
441 ((Runnable)change).run();
442 }
443 else if (change instanceof Runnable)
444 {
445 dispatch((Runnable)change);
446 }
447 else
448 throw new IllegalArgumentException(change.toString());
449 }
450 catch (CancelledKeyException e)
451 {
452 __log.ignore(e);
453 }
454 catch (Throwable e)
455 {
456 if (e instanceof ThreadDeath)
457 throw (ThreadDeath)e;
458
459 if (isRunning())
460 __log.warn(e);
461 else
462 __log.debug(e);
463
464 try
465 {
466 ch.close();
467 }
468 catch(IOException e2)
469 {
470 __log.debug(e2);
471 }
472 }
473 }
474
475
476
477 int selected=selector.selectNow();
478 _selects++;
479
480 long now=System.currentTimeMillis();
481
482
483 if (selected==0 && selector.selectedKeys().isEmpty())
484 {
485
486 if (_pausing)
487 {
488 try
489 {
490 Thread.sleep(__BUSY_PAUSE);
491 }
492 catch(InterruptedException e)
493 {
494 __log.ignore(e);
495 }
496 now=System.currentTimeMillis();
497 }
498
499
500 _timeout.setNow(now);
501 long to_next_timeout=_timeout.getTimeToNext();
502
503 long wait = _changes.size()==0?__IDLE_TICK:0L;
504 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
505 wait = to_next_timeout;
506
507
508 if (wait>0)
509 {
510 long before=now;
511 selected=selector.select(wait);
512 _selects++;
513 now = System.currentTimeMillis();
514 _timeout.setNow(now);
515
516 if (__JVMBUG_THRESHHOLD>0)
517 checkJvmBugs(before, now, wait, selected);
518 }
519 }
520
521
522 if (_selector==null || !selector.isOpen())
523 return;
524
525
526 for (SelectionKey key: selector.selectedKeys())
527 {
528 SocketChannel channel=null;
529
530 try
531 {
532 if (!key.isValid())
533 {
534 key.cancel();
535 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
536 if (endpoint != null)
537 endpoint.doUpdateKey();
538 continue;
539 }
540
541 Object att = key.attachment();
542 if (att instanceof SelectChannelEndPoint)
543 {
544 if (key.isReadable()||key.isWritable())
545 ((SelectChannelEndPoint)att).schedule();
546 }
547 else if (key.isConnectable())
548 {
549
550 channel = (SocketChannel)key.channel();
551 boolean connected=false;
552 try
553 {
554 connected=channel.finishConnect();
555 }
556 catch(Exception e)
557 {
558 connectionFailed(channel,e,att);
559 }
560 finally
561 {
562 if (connected)
563 {
564 key.interestOps(SelectionKey.OP_READ);
565 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
566 key.attach(endpoint);
567 endpoint.schedule();
568 }
569 else
570 {
571 key.cancel();
572 }
573 }
574 }
575 else
576 {
577
578 channel = (SocketChannel)key.channel();
579 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
580 key.attach(endpoint);
581 if (key.isReadable())
582 endpoint.schedule();
583 }
584 key = null;
585 }
586 catch (CancelledKeyException e)
587 {
588 __log.ignore(e);
589 }
590 catch (Exception e)
591 {
592 if (isRunning())
593 __log.warn(e);
594 else
595 __log.ignore(e);
596
597 try
598 {
599 if (channel!=null)
600 channel.close();
601 }
602 catch(IOException e2)
603 {
604 __log.debug(e2);
605 }
606
607 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
608 key.cancel();
609 }
610 }
611
612
613 selector.selectedKeys().clear();
614
615 now=System.currentTimeMillis();
616 _timeout.setNow(now);
617 Task task = _timeout.expired();
618 while (task!=null)
619 {
620 if (task instanceof Runnable)
621 dispatch((Runnable)task);
622 task = _timeout.expired();
623 }
624
625
626 if (now-_idleTick>__IDLE_TICK)
627 {
628 _idleTick=now;
629
630 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
631 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
632 :now;
633
634 dispatch(new Runnable()
635 {
636 public void run()
637 {
638 for (SelectChannelEndPoint endp:_endPoints.keySet())
639 {
640 endp.checkIdleTimestamp(idle_now);
641 }
642 }
643 });
644 }
645 }
646 catch (ClosedSelectorException e)
647 {
648 if (isRunning())
649 __log.warn(e);
650 else
651 __log.ignore(e);
652 }
653 catch (CancelledKeyException e)
654 {
655 __log.ignore(e);
656 }
657 finally
658 {
659 _selecting=null;
660 }
661 }
662
663
664 private void checkJvmBugs(long before, long now, long wait, int selected)
665 throws IOException
666 {
667 Selector selector = _selector;
668 if (selector==null)
669 return;
670
671
672
673
674 if (now>_monitorNext)
675 {
676 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
677 _pausing=_selects>__MAX_SELECTS;
678 if (_pausing)
679 _paused++;
680
681 _selects=0;
682 _jvmBug=0;
683 _monitorStart=now;
684 _monitorNext=now+__MONITOR_PERIOD;
685 }
686
687 if (now>_log)
688 {
689 if (_paused>0)
690 __log.debug(this+" Busy selector - injecting delay "+_paused+" times");
691
692 if (_jvmFix2>0)
693 __log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
694
695 if (_jvmFix1>0)
696 __log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
697
698 else if(__log.isDebugEnabled() && _jvmFix0>0)
699 __log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
700 _paused=0;
701 _jvmFix2=0;
702 _jvmFix1=0;
703 _jvmFix0=0;
704 _log=now+60000;
705 }
706
707
708 if (selected==0 && wait>10 && (now-before)<(wait/2))
709 {
710
711 _jvmBug++;
712 if (_jvmBug>(__JVMBUG_THRESHHOLD))
713 {
714 try
715 {
716 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
717 _jvmFix2++;
718
719 Thread.sleep(__BUSY_PAUSE);
720 }
721 catch(InterruptedException e)
722 {
723 __log.ignore(e);
724 }
725 }
726 else if (_jvmBug==__JVMBUG_THRESHHOLD)
727 {
728 renewSelector();
729 }
730 else if (_jvmBug%32==31)
731 {
732
733 int cancelled=0;
734 for (SelectionKey k: selector.keys())
735 {
736 if (k.isValid()&&k.interestOps()==0)
737 {
738 k.cancel();
739 cancelled++;
740 }
741 }
742 if (cancelled>0)
743 _jvmFix0++;
744
745 return;
746 }
747 }
748 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
749 {
750
751 SelectionKey busy = selector.selectedKeys().iterator().next();
752 if (busy==_busyKey)
753 {
754 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
755 {
756 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
757 __log.warn("Busy Key "+busy.channel()+" "+endpoint);
758 busy.cancel();
759 if (endpoint!=null)
760 {
761 dispatch(new Runnable()
762 {
763 public void run()
764 {
765 try
766 {
767 endpoint.close();
768 }
769 catch (IOException e)
770 {
771 __log.ignore(e);
772 }
773 }
774 });
775 }
776 }
777 }
778 else
779 _busyKeyCount=0;
780 _busyKey=busy;
781 }
782 }
783
784
785 private void renewSelector()
786 {
787 try
788 {
789 synchronized (this)
790 {
791 Selector selector=_selector;
792 if (selector==null)
793 return;
794 final Selector new_selector = Selector.open();
795 for (SelectionKey k: selector.keys())
796 {
797 if (!k.isValid() || k.interestOps()==0)
798 continue;
799
800 final SelectableChannel channel = k.channel();
801 final Object attachment = k.attachment();
802
803 if (attachment==null)
804 addChange(channel);
805 else
806 addChange(channel,attachment);
807 }
808 _selector.close();
809 _selector=new_selector;
810 }
811 }
812 catch(IOException e)
813 {
814 throw new RuntimeException("recreating selector",e);
815 }
816 }
817
818
819 public SelectorManager getManager()
820 {
821 return SelectorManager.this;
822 }
823
824
825 public long getNow()
826 {
827 return _timeout.getNow();
828 }
829
830
831
832
833
834
835
836
837 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
838 {
839 if (!(task instanceof Runnable))
840 throw new IllegalArgumentException("!Runnable");
841 _timeout.schedule(task, timeoutMs);
842 }
843
844
845 public void cancelTimeout(Timeout.Task task)
846 {
847 task.cancel();
848 }
849
850
851 public void wakeup()
852 {
853 try
854 {
855 Selector selector = _selector;
856 if (selector!=null)
857 selector.wakeup();
858 }
859 catch(Exception e)
860 {
861 addChange(new ChangeTask()
862 {
863 public void run()
864 {
865 renewSelector();
866 }
867 });
868
869 renewSelector();
870 }
871 }
872
873
874 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
875 {
876 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
877 endPointOpened(endp);
878 _endPoints.put(endp,this);
879 return endp;
880 }
881
882
883 public void destroyEndPoint(SelectChannelEndPoint endp)
884 {
885 _endPoints.remove(endp);
886 endPointClosed(endp);
887 }
888
889
890 Selector getSelector()
891 {
892 return _selector;
893 }
894
895
896 void stop() throws Exception
897 {
898
899
900 try
901 {
902 for (int i=0;i<100 && _selecting!=null;i++)
903 {
904 wakeup();
905 Thread.sleep(10);
906 }
907 }
908 catch(Exception e)
909 {
910 __log.ignore(e);
911 }
912
913
914 synchronized (this)
915 {
916 for (SelectionKey key:_selector.keys())
917 {
918 if (key==null)
919 continue;
920 Object att=key.attachment();
921 if (att instanceof EndPoint)
922 {
923 EndPoint endpoint = (EndPoint)att;
924 try
925 {
926 endpoint.close();
927 }
928 catch(IOException e)
929 {
930 __log.ignore(e);
931 }
932 }
933 }
934
935
936 _timeout.cancelAll();
937 try
938 {
939 if (_selector != null)
940 _selector.close();
941 }
942 catch (IOException e)
943 {
944 __log.ignore(e);
945 }
946 _selector=null;
947 }
948 }
949
950
951 public String dump()
952 {
953 return AggregateLifeCycle.dump(this);
954 }
955
956
957 public void dump(Appendable out, String indent) throws IOException
958 {
959 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
960
961 Thread selecting = _selecting;
962
963 Object where = "not selecting";
964 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
965 if (trace!=null)
966 {
967 for (StackTraceElement t:trace)
968 if (t.getClassName().startsWith("org.eclipse.jetty."))
969 {
970 where=t;
971 break;
972 }
973 }
974
975 Selector selector=_selector;
976 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
977 dump.add(where);
978
979 final CountDownLatch latch = new CountDownLatch(1);
980
981 addChange(new Runnable(){
982 public void run()
983 {
984 dumpKeyState(dump);
985 latch.countDown();
986 }
987 });
988
989 try
990 {
991 latch.await(5,TimeUnit.SECONDS);
992 }
993 catch(InterruptedException e)
994 {
995 __log.ignore(e);
996 }
997 AggregateLifeCycle.dump(out,indent,dump);
998 }
999
1000
1001 public void dumpKeyState(List<Object> dumpto)
1002 {
1003 Selector selector=_selector;
1004 dumpto.add(selector+" keys="+selector.keys().size());
1005 for (SelectionKey key: selector.keys())
1006 {
1007 if (key.isValid())
1008 dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
1009 else
1010 dumpto.add(key.attachment()+" - - ");
1011 }
1012 }
1013 }
1014
1015
1016 private static class ChannelAndAttachment
1017 {
1018 final SelectableChannel _channel;
1019 final Object _attachment;
1020
1021 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1022 {
1023 super();
1024 _channel = channel;
1025 _attachment = attachment;
1026 }
1027 }
1028
1029
1030 public boolean isDeferringInterestedOps0()
1031 {
1032 return _deferringInterestedOps0;
1033 }
1034
1035
1036 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1037 {
1038 _deferringInterestedOps0 = deferringInterestedOps0;
1039 }
1040
1041
1042
1043
1044
1045 private interface ChangeTask extends Runnable
1046 {}
1047
1048 }