1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.Selector;
21 import java.nio.channels.ServerSocketChannel;
22 import java.nio.channels.SocketChannel;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31
32 import org.eclipse.jetty.io.ConnectedEndPoint;
33 import org.eclipse.jetty.io.Connection;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.util.component.AbstractLifeCycle;
36 import org.eclipse.jetty.util.component.AggregateLifeCycle;
37 import org.eclipse.jetty.util.component.Dumpable;
38 import org.eclipse.jetty.util.log.Log;
39 import org.eclipse.jetty.util.thread.Timeout;
40 import org.eclipse.jetty.util.thread.Timeout.Task;
41
42
43
44
45
46
47
48
49
50
51 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
52 {
53
54 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
55 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
56 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
57 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
58 private static final int __BUSY_KEY=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_KEY",-1).intValue();
59 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
60
61 private int _maxIdleTime;
62 private int _lowResourcesMaxIdleTime;
63 private long _lowResourcesConnections;
64 private SelectSet[] _selectSet;
65 private int _selectSets=1;
66 private volatile int _set;
67 private boolean _deferringInterestedOps0=true;
68
69
70
71
72
73
74 public void setMaxIdleTime(long maxIdleTime)
75 {
76 _maxIdleTime=(int)maxIdleTime;
77 }
78
79
80
81
82
83 public void setSelectSets(int selectSets)
84 {
85 long lrc = _lowResourcesConnections * _selectSets;
86 _selectSets=selectSets;
87 _lowResourcesConnections=lrc/_selectSets;
88 }
89
90
91
92
93
94 public long getMaxIdleTime()
95 {
96 return _maxIdleTime;
97 }
98
99
100
101
102
103 public int getSelectSets()
104 {
105 return _selectSets;
106 }
107
108
109
110
111
112
113 public SelectSet getSelectSet(int i)
114 {
115 return _selectSet[i];
116 }
117
118
119
120
121
122
123 public void register(SocketChannel channel, Object att)
124 {
125
126
127
128
129 int s=_set++;
130 s=s%_selectSets;
131 SelectSet[] sets=_selectSet;
132 if (sets!=null)
133 {
134 SelectSet set=sets[s];
135 set.addChange(channel,att);
136 set.wakeup();
137 }
138 }
139
140
141
142
143
144
145 public void register(SocketChannel channel)
146 {
147
148
149
150
151 int s=_set++;
152 s=s%_selectSets;
153 SelectSet[] sets=_selectSet;
154 if (sets!=null)
155 {
156 SelectSet set=sets[s];
157 set.addChange(channel);
158 set.wakeup();
159 }
160 }
161
162
163
164
165
166 public void register(ServerSocketChannel acceptChannel)
167 {
168 int s=_set++;
169 s=s%_selectSets;
170 SelectSet set=_selectSet[s];
171 set.addChange(acceptChannel);
172 set.wakeup();
173 }
174
175
176
177
178
179 public long getLowResourcesConnections()
180 {
181 return _lowResourcesConnections*_selectSets;
182 }
183
184
185
186
187
188
189
190
191 public void setLowResourcesConnections(long lowResourcesConnections)
192 {
193 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
194 }
195
196
197
198
199
200 public long getLowResourcesMaxIdleTime()
201 {
202 return _lowResourcesMaxIdleTime;
203 }
204
205
206
207
208
209
210 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
211 {
212 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
213 }
214
215
216
217
218
219
220 public void doSelect(int acceptorID) throws IOException
221 {
222 SelectSet[] sets= _selectSet;
223 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
224 sets[acceptorID].doSelect();
225 }
226
227
228 public abstract boolean dispatch(Runnable task);
229
230
231
232
233
234 @Override
235 protected void doStart() throws Exception
236 {
237 _selectSet = new SelectSet[_selectSets];
238 for (int i=0;i<_selectSet.length;i++)
239 _selectSet[i]= new SelectSet(i);
240
241 super.doStart();
242 }
243
244
245
246 @Override
247 protected void doStop() throws Exception
248 {
249 SelectSet[] sets= _selectSet;
250 _selectSet=null;
251 if (sets!=null)
252 {
253 for (SelectSet set : sets)
254 {
255 if (set!=null)
256 set.stop();
257 }
258 }
259 super.doStop();
260 }
261
262
263
264
265
266 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
267
268
269
270
271
272 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
273
274
275 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
276
277
278 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
279
280
281
282
283
284
285
286
287
288
289 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
290
291
292
293 public String dump()
294 {
295 return AggregateLifeCycle.dump(this);
296 }
297
298
299 public void dump(Appendable out, String indent) throws IOException
300 {
301 out.append(String.valueOf(this)).append("\n");
302 AggregateLifeCycle.dump(out,indent,Arrays.asList(_selectSet));
303 }
304
305
306
307
308
309 public class SelectSet implements Dumpable
310 {
311 private final int _setID;
312 private final Timeout _timeout;
313
314 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
315
316 private Selector _selector;
317
318 private volatile Thread _selecting;
319 private int _jvmBug;
320 private int _selects;
321 private long _monitorStart;
322 private long _monitorNext;
323 private boolean _pausing;
324 private SelectionKey _busyKey;
325 private int _busyKeyCount;
326 private long _log;
327 private int _paused;
328 private int _jvmFix0;
329 private int _jvmFix1;
330 private int _jvmFix2;
331 private volatile long _idleTick;
332 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
333
334
335 SelectSet(int acceptorID) throws Exception
336 {
337 _setID=acceptorID;
338
339 _idleTick = System.currentTimeMillis();
340 _timeout = new Timeout(this);
341 _timeout.setDuration(0L);
342
343
344 _selector = Selector.open();
345 _monitorStart=System.currentTimeMillis();
346 _monitorNext=_monitorStart+__MONITOR_PERIOD;
347 _log=_monitorStart+60000;
348 }
349
350
351 public void addChange(Object change)
352 {
353 _changes.add(change);
354 }
355
356
357 public void addChange(SelectableChannel channel, Object att)
358 {
359 if (att==null)
360 addChange(channel);
361 else if (att instanceof EndPoint)
362 addChange(att);
363 else
364 addChange(new ChannelAndAttachment(channel,att));
365 }
366
367
368
369
370
371
372
373 public void doSelect() throws IOException
374 {
375 try
376 {
377 _selecting=Thread.currentThread();
378 final Selector selector=_selector;
379
380
381 Object change;
382 int changes=_changes.size();
383 while (changes-->0 && (change=_changes.poll())!=null)
384 {
385 try
386 {
387 if (change instanceof EndPoint)
388 {
389
390 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
391 endpoint.doUpdateKey();
392 }
393 else if (change instanceof ChannelAndAttachment)
394 {
395
396 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
397 final SelectableChannel channel=asc._channel;
398 final Object att = asc._attachment;
399 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
400 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
401 key.attach(endpoint);
402 endpoint.schedule();
403 }
404 else if (change instanceof SocketChannel)
405 {
406
407 final SocketChannel channel=(SocketChannel)change;
408 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
409 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
410 key.attach(endpoint);
411 endpoint.schedule();
412 }
413 else if (change instanceof Runnable)
414 {
415 dispatch((Runnable)change);
416 }
417 else
418 throw new IllegalArgumentException(change.toString());
419 }
420 catch (Exception e)
421 {
422 if (isRunning())
423 Log.warn(e);
424 else
425 Log.debug(e);
426 }
427 catch (Error e)
428 {
429 if (isRunning())
430 Log.warn(e);
431 else
432 Log.debug(e);
433 }
434 }
435
436
437
438 int selected=selector.selectNow();
439 _selects++;
440
441 long now=System.currentTimeMillis();
442
443
444 if (selected==0 && selector.selectedKeys().isEmpty())
445 {
446
447 if (_pausing)
448 {
449 try
450 {
451 Thread.sleep(__BUSY_PAUSE);
452 }
453 catch(InterruptedException e)
454 {
455 Log.ignore(e);
456 }
457 now=System.currentTimeMillis();
458 }
459
460
461 _timeout.setNow(now);
462 long to_next_timeout=_timeout.getTimeToNext();
463
464 long wait = _changes.size()==0?__IDLE_TICK:0L;
465 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
466 wait = to_next_timeout;
467
468
469 if (wait>0)
470 {
471 long before=now;
472 selected=selector.select(wait);
473 _selects++;
474 now = System.currentTimeMillis();
475 _timeout.setNow(now);
476
477 if (__JVMBUG_THRESHHOLD>0)
478 checkJvmBugs(before, now, wait, selected);
479 }
480 }
481
482
483 if (_selector==null || !selector.isOpen())
484 return;
485
486
487 for (SelectionKey key: selector.selectedKeys())
488 {
489 try
490 {
491 if (!key.isValid())
492 {
493 key.cancel();
494 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
495 if (endpoint != null)
496 endpoint.doUpdateKey();
497 continue;
498 }
499
500 Object att = key.attachment();
501 if (att instanceof SelectChannelEndPoint)
502 {
503 ((SelectChannelEndPoint)att).schedule();
504 }
505 else
506 {
507
508 SocketChannel channel = (SocketChannel)key.channel();
509 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
510 key.attach(endpoint);
511 if (key.isReadable())
512 endpoint.schedule();
513 }
514 key = null;
515 }
516 catch (CancelledKeyException e)
517 {
518 Log.ignore(e);
519 }
520 catch (Exception e)
521 {
522 if (isRunning())
523 Log.warn(e);
524 else
525 Log.ignore(e);
526
527 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
528 key.cancel();
529 }
530 }
531
532
533 selector.selectedKeys().clear();
534
535 now=System.currentTimeMillis();
536 _timeout.setNow(now);
537 Task task = _timeout.expired();
538 while (task!=null)
539 {
540 if (task instanceof Runnable)
541 dispatch((Runnable)task);
542 task = _timeout.expired();
543 }
544
545
546 if (now-_idleTick>__IDLE_TICK)
547 {
548 _idleTick=now;
549
550 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
551 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
552 :now;
553
554 dispatch(new Runnable()
555 {
556 public void run()
557 {
558 for (SelectChannelEndPoint endp:_endPoints.keySet())
559 {
560 endp.checkIdleTimestamp(idle_now);
561 }
562 }
563 });
564
565
566
567 for (SelectionKey key: selector.keys())
568 {
569 if (key.isValid() && key.attachment() instanceof SelectChannelEndPoint)
570 {
571 ((SelectChannelEndPoint)key.attachment()).checkKey(key);
572 }
573 }
574 }
575 }
576 catch (CancelledKeyException e)
577 {
578 Log.ignore(e);
579 }
580 finally
581 {
582 _selecting=null;
583 }
584 }
585
586
587 private void checkJvmBugs(long before, long now, long wait, int selected)
588 throws IOException
589 {
590 Selector selector = _selector;
591 if (selector==null)
592 return;
593
594
595
596
597 if (now>_monitorNext)
598 {
599 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
600 _pausing=_selects>__MAX_SELECTS;
601 if (_pausing)
602 _paused++;
603
604 _selects=0;
605 _jvmBug=0;
606 _monitorStart=now;
607 _monitorNext=now+__MONITOR_PERIOD;
608 }
609
610 if (now>_log)
611 {
612 if (_paused>0)
613 Log.debug(this+" Busy selector - injecting delay "+_paused+" times");
614
615 if (_jvmFix2>0)
616 Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
617
618 if (_jvmFix1>0)
619 Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
620
621 else if(Log.isDebugEnabled() && _jvmFix0>0)
622 Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
623 _paused=0;
624 _jvmFix2=0;
625 _jvmFix1=0;
626 _jvmFix0=0;
627 _log=now+60000;
628 }
629
630
631 if (selected==0 && wait>10 && (now-before)<(wait/2))
632 {
633
634 _jvmBug++;
635 if (_jvmBug>(__JVMBUG_THRESHHOLD))
636 {
637 try
638 {
639 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
640 _jvmFix2++;
641
642 Thread.sleep(__BUSY_PAUSE);
643 }
644 catch(InterruptedException e)
645 {
646 Log.ignore(e);
647 }
648 }
649 else if (_jvmBug==__JVMBUG_THRESHHOLD)
650 {
651 synchronized (this)
652 {
653
654 final Selector new_selector = Selector.open();
655 for (SelectionKey k: selector.keys())
656 {
657 if (!k.isValid() || k.interestOps()==0)
658 continue;
659
660 final SelectableChannel channel = k.channel();
661 final Object attachment = k.attachment();
662
663 if (attachment==null)
664 addChange(channel);
665 else
666 addChange(channel,attachment);
667 }
668 _selector.close();
669 _selector=new_selector;
670 return;
671 }
672 }
673 else if (_jvmBug%32==31)
674 {
675
676 int cancelled=0;
677 for (SelectionKey k: selector.keys())
678 {
679 if (k.isValid()&&k.interestOps()==0)
680 {
681 k.cancel();
682 cancelled++;
683 }
684 }
685 if (cancelled>0)
686 _jvmFix0++;
687
688 return;
689 }
690 }
691 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
692 {
693
694 SelectionKey busy = selector.selectedKeys().iterator().next();
695 if (busy==_busyKey)
696 {
697 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
698 {
699 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
700 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
701 busy.cancel();
702 if (endpoint!=null)
703 {
704 dispatch(new Runnable()
705 {
706 public void run()
707 {
708 try
709 {
710 endpoint.close();
711 }
712 catch (IOException e)
713 {
714 Log.ignore(e);
715 }
716 }
717 });
718 }
719 }
720 }
721 else
722 _busyKeyCount=0;
723 _busyKey=busy;
724 }
725 }
726
727
728 public SelectorManager getManager()
729 {
730 return SelectorManager.this;
731 }
732
733
734 public long getNow()
735 {
736 return _timeout.getNow();
737 }
738
739
740
741
742
743
744
745
746 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
747 {
748 if (!(task instanceof Runnable))
749 throw new IllegalArgumentException("!Runnable");
750 _timeout.schedule(task, timeoutMs);
751 }
752
753
754 public void cancelTimeout(Timeout.Task task)
755 {
756 task.cancel();
757 }
758
759
760 public void wakeup()
761 {
762 Selector selector = _selector;
763 if (selector!=null)
764 selector.wakeup();
765 }
766
767
768 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
769 {
770 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
771 endPointOpened(endp);
772 _endPoints.put(endp,this);
773 return endp;
774 }
775
776
777 public void destroyEndPoint(SelectChannelEndPoint endp)
778 {
779 _endPoints.remove(endp);
780 endPointClosed(endp);
781 }
782
783
784 Selector getSelector()
785 {
786 return _selector;
787 }
788
789
790 void stop() throws Exception
791 {
792
793
794 try
795 {
796 for (int i=0;i<100 && _selecting!=null;i++)
797 {
798 wakeup();
799 Thread.sleep(1);
800 }
801 }
802 catch(Exception e)
803 {
804 Log.ignore(e);
805 }
806
807
808 synchronized (this)
809 {
810 for (SelectionKey key:_selector.keys())
811 {
812 if (key==null)
813 continue;
814 Object att=key.attachment();
815 if (att instanceof EndPoint)
816 {
817 EndPoint endpoint = (EndPoint)att;
818 try
819 {
820 endpoint.close();
821 }
822 catch(IOException e)
823 {
824 Log.ignore(e);
825 }
826 }
827 }
828
829
830 _timeout.cancelAll();
831 try
832 {
833 if (_selector != null)
834 _selector.close();
835 }
836 catch (IOException e)
837 {
838 Log.ignore(e);
839 }
840 _selector=null;
841 }
842 }
843
844
845 public String dump()
846 {
847 return AggregateLifeCycle.dump(this);
848 }
849
850
851 public void dump(Appendable out, String indent) throws IOException
852 {
853 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
854
855 Thread selecting = _selecting;
856
857 Object where = "not selecting";
858 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
859 if (trace!=null)
860 {
861 for (StackTraceElement t:trace)
862 if (t.getClassName().startsWith("org.eclipse.jetty."))
863 {
864 where=t;
865 break;
866 }
867 }
868
869 Selector selector=_selector;
870 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
871 dump.add(where);
872
873 final CountDownLatch latch = new CountDownLatch(1);
874
875 addChange(new Runnable(){
876 public void run()
877 {
878 dumpKeyState(dump);
879 latch.countDown();
880 }
881 });
882
883 try
884 {
885 latch.await(5,TimeUnit.SECONDS);
886 }
887 catch(InterruptedException e)
888 {
889 Log.ignore(e);
890 }
891 AggregateLifeCycle.dump(out,indent,dump);
892 }
893
894
895 public void dumpKeyState(List<Object> dumpto)
896 {
897 Selector selector=_selector;
898 dumpto.add(selector+" keys="+selector.keys().size());
899 for (SelectionKey key: selector.keys())
900 {
901 if (key.isValid())
902 dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
903 else
904 dumpto.add(key.attachment()+" - - ");
905 }
906 }
907 }
908
909
910 private static class ChannelAndAttachment
911 {
912 final SelectableChannel _channel;
913 final Object _attachment;
914
915 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
916 {
917 super();
918 _channel = channel;
919 _attachment = attachment;
920 }
921 }
922
923
924 public boolean isDeferringInterestedOps0()
925 {
926 return _deferringInterestedOps0;
927 }
928
929
930 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
931 {
932 _deferringInterestedOps0 = deferringInterestedOps0;
933 }
934
935 }