1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.Channel;
19 import java.nio.channels.ClosedSelectorException;
20 import java.nio.channels.SelectableChannel;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.Selector;
23 import java.nio.channels.ServerSocketChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.management.RuntimeErrorException;
34
35 import org.eclipse.jetty.io.ConnectedEndPoint;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.util.TypeUtil;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.component.AggregateLifeCycle;
41 import org.eclipse.jetty.util.component.Dumpable;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.thread.Timeout;
44 import org.eclipse.jetty.util.thread.Timeout.Task;
45
46
47
48
49
50
51
52
53
54
55 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
56 {
57
58 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
59 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
60 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
61 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
62 private static final int __BUSY_KEY=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_KEY",-1).intValue();
63 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
64
65 private int _maxIdleTime;
66 private int _lowResourcesMaxIdleTime;
67 private long _lowResourcesConnections;
68 private SelectSet[] _selectSet;
69 private int _selectSets=1;
70 private volatile int _set;
71 private boolean _deferringInterestedOps0=true;
72
73
74
75
76
77
78 public void setMaxIdleTime(long maxIdleTime)
79 {
80 _maxIdleTime=(int)maxIdleTime;
81 }
82
83
84
85
86
87 public void setSelectSets(int selectSets)
88 {
89 long lrc = _lowResourcesConnections * _selectSets;
90 _selectSets=selectSets;
91 _lowResourcesConnections=lrc/_selectSets;
92 }
93
94
95
96
97
98 public long getMaxIdleTime()
99 {
100 return _maxIdleTime;
101 }
102
103
104
105
106
107 public int getSelectSets()
108 {
109 return _selectSets;
110 }
111
112
113
114
115
116
117 public SelectSet getSelectSet(int i)
118 {
119 return _selectSet[i];
120 }
121
122
123
124
125
126
127 public void register(SocketChannel channel, Object att)
128 {
129
130
131
132
133 int s=_set++;
134 s=s%_selectSets;
135 SelectSet[] sets=_selectSet;
136 if (sets!=null)
137 {
138 SelectSet set=sets[s];
139 set.addChange(channel,att);
140 set.wakeup();
141 }
142 }
143
144
145
146
147
148
149 public void register(SocketChannel channel)
150 {
151
152
153
154
155 int s=_set++;
156 s=s%_selectSets;
157 SelectSet[] sets=_selectSet;
158 if (sets!=null)
159 {
160 SelectSet set=sets[s];
161 set.addChange(channel);
162 set.wakeup();
163 }
164 }
165
166
167
168
169
170 public void register(ServerSocketChannel acceptChannel)
171 {
172 int s=_set++;
173 s=s%_selectSets;
174 SelectSet set=_selectSet[s];
175 set.addChange(acceptChannel);
176 set.wakeup();
177 }
178
179
180
181
182
183 public long getLowResourcesConnections()
184 {
185 return _lowResourcesConnections*_selectSets;
186 }
187
188
189
190
191
192
193
194
195 public void setLowResourcesConnections(long lowResourcesConnections)
196 {
197 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
198 }
199
200
201
202
203
204 public long getLowResourcesMaxIdleTime()
205 {
206 return _lowResourcesMaxIdleTime;
207 }
208
209
210
211
212
213
214 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
215 {
216 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
217 }
218
219
220
221
222
223
224 public void doSelect(int acceptorID) throws IOException
225 {
226 SelectSet[] sets= _selectSet;
227 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
228 sets[acceptorID].doSelect();
229 }
230
231
232 public abstract boolean dispatch(Runnable task);
233
234
235
236
237
238 @Override
239 protected void doStart() throws Exception
240 {
241 _selectSet = new SelectSet[_selectSets];
242 for (int i=0;i<_selectSet.length;i++)
243 _selectSet[i]= new SelectSet(i);
244
245 super.doStart();
246 }
247
248
249
250 @Override
251 protected void doStop() throws Exception
252 {
253 SelectSet[] sets= _selectSet;
254 _selectSet=null;
255 if (sets!=null)
256 {
257 for (SelectSet set : sets)
258 {
259 if (set!=null)
260 set.stop();
261 }
262 }
263 super.doStop();
264 }
265
266
267
268
269
270 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
271
272
273
274
275
276 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
277
278
279 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
280
281
282 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
283
284
285
286
287
288
289
290
291
292
293 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
294
295
296 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
297 {
298 Log.warn(ex+","+channel+","+attachment);
299 Log.debug(ex);
300 }
301
302
303 public String dump()
304 {
305 return AggregateLifeCycle.dump(this);
306 }
307
308
309 public void dump(Appendable out, String indent) throws IOException
310 {
311 out.append(String.valueOf(this)).append("\n");
312 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
313 }
314
315
316
317
318
319 public class SelectSet implements Dumpable
320 {
321 private final int _setID;
322 private final Timeout _timeout;
323
324 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
325
326 private Selector _selector;
327
328 private volatile Thread _selecting;
329 private int _jvmBug;
330 private int _selects;
331 private long _monitorStart;
332 private long _monitorNext;
333 private boolean _pausing;
334 private SelectionKey _busyKey;
335 private int _busyKeyCount;
336 private long _log;
337 private int _paused;
338 private int _jvmFix0;
339 private int _jvmFix1;
340 private int _jvmFix2;
341 private volatile long _idleTick;
342 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
343
344
345 SelectSet(int acceptorID) throws Exception
346 {
347 _setID=acceptorID;
348
349 _idleTick = System.currentTimeMillis();
350 _timeout = new Timeout(this);
351 _timeout.setDuration(0L);
352
353
354 _selector = Selector.open();
355 _monitorStart=System.currentTimeMillis();
356 _monitorNext=_monitorStart+__MONITOR_PERIOD;
357 _log=_monitorStart+60000;
358 }
359
360
361 public void addChange(Object change)
362 {
363 _changes.add(change);
364 }
365
366
367 public void addChange(SelectableChannel channel, Object att)
368 {
369 if (att==null)
370 addChange(channel);
371 else if (att instanceof EndPoint)
372 addChange(att);
373 else
374 addChange(new ChannelAndAttachment(channel,att));
375 }
376
377
378
379
380
381
382
383 public void doSelect() throws IOException
384 {
385 try
386 {
387 _selecting=Thread.currentThread();
388 final Selector selector=_selector;
389
390
391 Object change;
392 int changes=_changes.size();
393 while (changes-->0 && (change=_changes.poll())!=null)
394 {
395 Channel ch=null;
396 SelectionKey key=null;
397
398 try
399 {
400 if (change instanceof EndPoint)
401 {
402
403 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
404 ch=endpoint.getChannel();
405 endpoint.doUpdateKey();
406 }
407 else if (change instanceof ChannelAndAttachment)
408 {
409
410 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
411 final SelectableChannel channel=asc._channel;
412 ch=channel;
413 final Object att = asc._attachment;
414
415 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
416 {
417 key = channel.register(selector,SelectionKey.OP_READ,att);
418 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
419 key.attach(endpoint);
420 endpoint.schedule();
421 }
422 else if (channel.isOpen())
423 {
424 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
425 }
426 }
427 else if (change instanceof SocketChannel)
428 {
429
430 final SocketChannel channel=(SocketChannel)change;
431 ch=channel;
432 key = channel.register(selector,SelectionKey.OP_READ,null);
433 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
434 key.attach(endpoint);
435 endpoint.schedule();
436 }
437 else if (change instanceof ChangeTask)
438 {
439 ((Runnable)change).run();
440 }
441 else if (change instanceof Runnable)
442 {
443 dispatch((Runnable)change);
444 }
445 else
446 throw new IllegalArgumentException(change.toString());
447 }
448 catch (CancelledKeyException e)
449 {
450 Log.ignore(e);
451 }
452 catch (Throwable e)
453 {
454 if (e instanceof ThreadDeath)
455 throw (ThreadDeath)e;
456
457 if (isRunning())
458 Log.warn(e);
459 else
460 Log.debug(e);
461
462 try
463 {
464 ch.close();
465 }
466 catch(IOException e2)
467 {
468 Log.debug(e2);
469 }
470 }
471 }
472
473
474
475 int selected=selector.selectNow();
476 _selects++;
477
478 long now=System.currentTimeMillis();
479
480
481 if (selected==0 && selector.selectedKeys().isEmpty())
482 {
483
484 if (_pausing)
485 {
486 try
487 {
488 Thread.sleep(__BUSY_PAUSE);
489 }
490 catch(InterruptedException e)
491 {
492 Log.ignore(e);
493 }
494 now=System.currentTimeMillis();
495 }
496
497
498 _timeout.setNow(now);
499 long to_next_timeout=_timeout.getTimeToNext();
500
501 long wait = _changes.size()==0?__IDLE_TICK:0L;
502 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
503 wait = to_next_timeout;
504
505
506 if (wait>0)
507 {
508 long before=now;
509 selected=selector.select(wait);
510 _selects++;
511 now = System.currentTimeMillis();
512 _timeout.setNow(now);
513
514 if (__JVMBUG_THRESHHOLD>0)
515 checkJvmBugs(before, now, wait, selected);
516 }
517 }
518
519
520 if (_selector==null || !selector.isOpen())
521 return;
522
523
524 for (SelectionKey key: selector.selectedKeys())
525 {
526 SocketChannel channel=null;
527
528 try
529 {
530 if (!key.isValid())
531 {
532 key.cancel();
533 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
534 if (endpoint != null)
535 endpoint.doUpdateKey();
536 continue;
537 }
538
539 Object att = key.attachment();
540 if (att instanceof SelectChannelEndPoint)
541 {
542 if (key.isReadable()||key.isWritable())
543 ((SelectChannelEndPoint)att).schedule();
544 }
545 else if (key.isConnectable())
546 {
547
548 channel = (SocketChannel)key.channel();
549 boolean connected=false;
550 try
551 {
552 connected=channel.finishConnect();
553 }
554 catch(Exception e)
555 {
556 connectionFailed(channel,e,att);
557 }
558 finally
559 {
560 if (connected)
561 {
562 key.interestOps(SelectionKey.OP_READ);
563 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
564 key.attach(endpoint);
565 endpoint.schedule();
566 }
567 else
568 {
569 key.cancel();
570 }
571 }
572 }
573 else
574 {
575
576 channel = (SocketChannel)key.channel();
577 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
578 key.attach(endpoint);
579 if (key.isReadable())
580 endpoint.schedule();
581 }
582 key = null;
583 }
584 catch (CancelledKeyException e)
585 {
586 Log.ignore(e);
587 }
588 catch (Exception e)
589 {
590 if (isRunning())
591 Log.warn(e);
592 else
593 Log.ignore(e);
594
595 try
596 {
597 if (channel!=null)
598 channel.close();
599 }
600 catch(IOException e2)
601 {
602 Log.debug(e2);
603 }
604
605 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
606 key.cancel();
607 }
608 }
609
610
611 selector.selectedKeys().clear();
612
613 now=System.currentTimeMillis();
614 _timeout.setNow(now);
615 Task task = _timeout.expired();
616 while (task!=null)
617 {
618 if (task instanceof Runnable)
619 dispatch((Runnable)task);
620 task = _timeout.expired();
621 }
622
623
624 if (now-_idleTick>__IDLE_TICK)
625 {
626 _idleTick=now;
627
628 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
629 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
630 :now;
631
632 dispatch(new Runnable()
633 {
634 public void run()
635 {
636 for (SelectChannelEndPoint endp:_endPoints.keySet())
637 {
638 endp.checkIdleTimestamp(idle_now);
639 }
640 }
641 });
642 }
643 }
644 catch (ClosedSelectorException e)
645 {
646 if (isRunning())
647 Log.warn(e);
648 else
649 Log.ignore(e);
650 }
651 catch (CancelledKeyException e)
652 {
653 Log.ignore(e);
654 }
655 finally
656 {
657 _selecting=null;
658 }
659 }
660
661
662 private void checkJvmBugs(long before, long now, long wait, int selected)
663 throws IOException
664 {
665 Selector selector = _selector;
666 if (selector==null)
667 return;
668
669
670
671
672 if (now>_monitorNext)
673 {
674 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
675 _pausing=_selects>__MAX_SELECTS;
676 if (_pausing)
677 _paused++;
678
679 _selects=0;
680 _jvmBug=0;
681 _monitorStart=now;
682 _monitorNext=now+__MONITOR_PERIOD;
683 }
684
685 if (now>_log)
686 {
687 if (_paused>0)
688 Log.debug(this+" Busy selector - injecting delay "+_paused+" times");
689
690 if (_jvmFix2>0)
691 Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
692
693 if (_jvmFix1>0)
694 Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
695
696 else if(Log.isDebugEnabled() && _jvmFix0>0)
697 Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
698 _paused=0;
699 _jvmFix2=0;
700 _jvmFix1=0;
701 _jvmFix0=0;
702 _log=now+60000;
703 }
704
705
706 if (selected==0 && wait>10 && (now-before)<(wait/2))
707 {
708
709 _jvmBug++;
710 if (_jvmBug>(__JVMBUG_THRESHHOLD))
711 {
712 try
713 {
714 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
715 _jvmFix2++;
716
717 Thread.sleep(__BUSY_PAUSE);
718 }
719 catch(InterruptedException e)
720 {
721 Log.ignore(e);
722 }
723 }
724 else if (_jvmBug==__JVMBUG_THRESHHOLD)
725 {
726 renewSelector();
727 }
728 else if (_jvmBug%32==31)
729 {
730
731 int cancelled=0;
732 for (SelectionKey k: selector.keys())
733 {
734 if (k.isValid()&&k.interestOps()==0)
735 {
736 k.cancel();
737 cancelled++;
738 }
739 }
740 if (cancelled>0)
741 _jvmFix0++;
742
743 return;
744 }
745 }
746 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
747 {
748
749 SelectionKey busy = selector.selectedKeys().iterator().next();
750 if (busy==_busyKey)
751 {
752 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
753 {
754 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
755 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
756 busy.cancel();
757 if (endpoint!=null)
758 {
759 dispatch(new Runnable()
760 {
761 public void run()
762 {
763 try
764 {
765 endpoint.close();
766 }
767 catch (IOException e)
768 {
769 Log.ignore(e);
770 }
771 }
772 });
773 }
774 }
775 }
776 else
777 _busyKeyCount=0;
778 _busyKey=busy;
779 }
780 }
781
782
783 private void renewSelector()
784 {
785 try
786 {
787 synchronized (this)
788 {
789 Selector selector=_selector;
790 if (selector==null)
791 return;
792 final Selector new_selector = Selector.open();
793 for (SelectionKey k: selector.keys())
794 {
795 if (!k.isValid() || k.interestOps()==0)
796 continue;
797
798 final SelectableChannel channel = k.channel();
799 final Object attachment = k.attachment();
800
801 if (attachment==null)
802 addChange(channel);
803 else
804 addChange(channel,attachment);
805 }
806 _selector.close();
807 _selector=new_selector;
808 }
809 }
810 catch(IOException e)
811 {
812 throw new RuntimeException("recreating selector",e);
813 }
814 }
815
816
817 public SelectorManager getManager()
818 {
819 return SelectorManager.this;
820 }
821
822
823 public long getNow()
824 {
825 return _timeout.getNow();
826 }
827
828
829
830
831
832
833
834
835 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
836 {
837 if (!(task instanceof Runnable))
838 throw new IllegalArgumentException("!Runnable");
839 _timeout.schedule(task, timeoutMs);
840 }
841
842
843 public void cancelTimeout(Timeout.Task task)
844 {
845 task.cancel();
846 }
847
848
849 public void wakeup()
850 {
851 try
852 {
853 Selector selector = _selector;
854 if (selector!=null)
855 selector.wakeup();
856 }
857 catch(Exception e)
858 {
859 addChange(new ChangeTask()
860 {
861 public void run()
862 {
863 renewSelector();
864 }
865 });
866
867 renewSelector();
868 }
869 }
870
871
872 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
873 {
874 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
875 endPointOpened(endp);
876 _endPoints.put(endp,this);
877 return endp;
878 }
879
880
881 public void destroyEndPoint(SelectChannelEndPoint endp)
882 {
883 _endPoints.remove(endp);
884 endPointClosed(endp);
885 }
886
887
888 Selector getSelector()
889 {
890 return _selector;
891 }
892
893
894 void stop() throws Exception
895 {
896
897
898 try
899 {
900 for (int i=0;i<100 && _selecting!=null;i++)
901 {
902 wakeup();
903 Thread.sleep(10);
904 }
905 }
906 catch(Exception e)
907 {
908 Log.ignore(e);
909 }
910
911
912 synchronized (this)
913 {
914 for (SelectionKey key:_selector.keys())
915 {
916 if (key==null)
917 continue;
918 Object att=key.attachment();
919 if (att instanceof EndPoint)
920 {
921 EndPoint endpoint = (EndPoint)att;
922 try
923 {
924 endpoint.close();
925 }
926 catch(IOException e)
927 {
928 Log.ignore(e);
929 }
930 }
931 }
932
933
934 _timeout.cancelAll();
935 try
936 {
937 if (_selector != null)
938 _selector.close();
939 }
940 catch (IOException e)
941 {
942 Log.ignore(e);
943 }
944 _selector=null;
945 }
946 }
947
948
949 public String dump()
950 {
951 return AggregateLifeCycle.dump(this);
952 }
953
954
955 public void dump(Appendable out, String indent) throws IOException
956 {
957 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
958
959 Thread selecting = _selecting;
960
961 Object where = "not selecting";
962 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
963 if (trace!=null)
964 {
965 for (StackTraceElement t:trace)
966 if (t.getClassName().startsWith("org.eclipse.jetty."))
967 {
968 where=t;
969 break;
970 }
971 }
972
973 Selector selector=_selector;
974 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
975 dump.add(where);
976
977 final CountDownLatch latch = new CountDownLatch(1);
978
979 addChange(new Runnable(){
980 public void run()
981 {
982 dumpKeyState(dump);
983 latch.countDown();
984 }
985 });
986
987 try
988 {
989 latch.await(5,TimeUnit.SECONDS);
990 }
991 catch(InterruptedException e)
992 {
993 Log.ignore(e);
994 }
995 AggregateLifeCycle.dump(out,indent,dump);
996 }
997
998
999 public void dumpKeyState(List<Object> dumpto)
1000 {
1001 Selector selector=_selector;
1002 dumpto.add(selector+" keys="+selector.keys().size());
1003 for (SelectionKey key: selector.keys())
1004 {
1005 if (key.isValid())
1006 dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
1007 else
1008 dumpto.add(key.attachment()+" - - ");
1009 }
1010 }
1011 }
1012
1013
1014 private static class ChannelAndAttachment
1015 {
1016 final SelectableChannel _channel;
1017 final Object _attachment;
1018
1019 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1020 {
1021 super();
1022 _channel = channel;
1023 _attachment = attachment;
1024 }
1025 }
1026
1027
1028 public boolean isDeferringInterestedOps0()
1029 {
1030 return _deferringInterestedOps0;
1031 }
1032
1033
1034 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1035 {
1036 _deferringInterestedOps0 = deferringInterestedOps0;
1037 }
1038
1039
1040
1041
1042
1043 private interface ChangeTask extends Runnable
1044 {}
1045
1046 }