1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.ClosedSelectorException;
19 import java.nio.channels.SelectableChannel;
20 import java.nio.channels.SelectionKey;
21 import java.nio.channels.Selector;
22 import java.nio.channels.ServerSocketChannel;
23 import java.nio.channels.SocketChannel;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31
32 import org.eclipse.jetty.io.ConnectedEndPoint;
33 import org.eclipse.jetty.io.Connection;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.util.TypeUtil;
36 import org.eclipse.jetty.util.component.AbstractLifeCycle;
37 import org.eclipse.jetty.util.component.AggregateLifeCycle;
38 import org.eclipse.jetty.util.component.Dumpable;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.thread.Timeout;
41 import org.eclipse.jetty.util.thread.Timeout.Task;
42
43
44
45
46
47
48
49
50
51
52 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
53 {
54
55 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
56 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
57 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
58 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
59 private static final int __BUSY_KEY=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_KEY",-1).intValue();
60 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
61
62 private int _maxIdleTime;
63 private int _lowResourcesMaxIdleTime;
64 private long _lowResourcesConnections;
65 private SelectSet[] _selectSet;
66 private int _selectSets=1;
67 private volatile int _set;
68 private boolean _deferringInterestedOps0=true;
69
70
71
72
73
74
75 public void setMaxIdleTime(long maxIdleTime)
76 {
77 _maxIdleTime=(int)maxIdleTime;
78 }
79
80
81
82
83
84 public void setSelectSets(int selectSets)
85 {
86 long lrc = _lowResourcesConnections * _selectSets;
87 _selectSets=selectSets;
88 _lowResourcesConnections=lrc/_selectSets;
89 }
90
91
92
93
94
95 public long getMaxIdleTime()
96 {
97 return _maxIdleTime;
98 }
99
100
101
102
103
104 public int getSelectSets()
105 {
106 return _selectSets;
107 }
108
109
110
111
112
113
114 public SelectSet getSelectSet(int i)
115 {
116 return _selectSet[i];
117 }
118
119
120
121
122
123
124 public void register(SocketChannel channel, Object att)
125 {
126
127
128
129
130 int s=_set++;
131 s=s%_selectSets;
132 SelectSet[] sets=_selectSet;
133 if (sets!=null)
134 {
135 SelectSet set=sets[s];
136 set.addChange(channel,att);
137 set.wakeup();
138 }
139 }
140
141
142
143
144
145
146 public void register(SocketChannel channel)
147 {
148
149
150
151
152 int s=_set++;
153 s=s%_selectSets;
154 SelectSet[] sets=_selectSet;
155 if (sets!=null)
156 {
157 SelectSet set=sets[s];
158 set.addChange(channel);
159 set.wakeup();
160 }
161 }
162
163
164
165
166
167 public void register(ServerSocketChannel acceptChannel)
168 {
169 int s=_set++;
170 s=s%_selectSets;
171 SelectSet set=_selectSet[s];
172 set.addChange(acceptChannel);
173 set.wakeup();
174 }
175
176
177
178
179
180 public long getLowResourcesConnections()
181 {
182 return _lowResourcesConnections*_selectSets;
183 }
184
185
186
187
188
189
190
191
192 public void setLowResourcesConnections(long lowResourcesConnections)
193 {
194 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
195 }
196
197
198
199
200
201 public long getLowResourcesMaxIdleTime()
202 {
203 return _lowResourcesMaxIdleTime;
204 }
205
206
207
208
209
210
211 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
212 {
213 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
214 }
215
216
217
218
219
220
221 public void doSelect(int acceptorID) throws IOException
222 {
223 SelectSet[] sets= _selectSet;
224 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
225 sets[acceptorID].doSelect();
226 }
227
228
229 public abstract boolean dispatch(Runnable task);
230
231
232
233
234
235 @Override
236 protected void doStart() throws Exception
237 {
238 _selectSet = new SelectSet[_selectSets];
239 for (int i=0;i<_selectSet.length;i++)
240 _selectSet[i]= new SelectSet(i);
241
242 super.doStart();
243 }
244
245
246
247 @Override
248 protected void doStop() throws Exception
249 {
250 SelectSet[] sets= _selectSet;
251 _selectSet=null;
252 if (sets!=null)
253 {
254 for (SelectSet set : sets)
255 {
256 if (set!=null)
257 set.stop();
258 }
259 }
260 super.doStop();
261 }
262
263
264
265
266
267 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
268
269
270
271
272
273 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
274
275
276 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
277
278
279 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
280
281
282
283
284
285
286
287
288
289
290 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
291
292
293 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
294 {
295 Log.warn(ex+","+channel+","+attachment);
296 Log.debug(ex);
297 }
298
299
300 public String dump()
301 {
302 return AggregateLifeCycle.dump(this);
303 }
304
305
306 public void dump(Appendable out, String indent) throws IOException
307 {
308 out.append(String.valueOf(this)).append("\n");
309 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
310 }
311
312
313
314
315
316 public class SelectSet implements Dumpable
317 {
318 private final int _setID;
319 private final Timeout _timeout;
320
321 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
322
323 private Selector _selector;
324
325 private volatile Thread _selecting;
326 private int _jvmBug;
327 private int _selects;
328 private long _monitorStart;
329 private long _monitorNext;
330 private boolean _pausing;
331 private SelectionKey _busyKey;
332 private int _busyKeyCount;
333 private long _log;
334 private int _paused;
335 private int _jvmFix0;
336 private int _jvmFix1;
337 private int _jvmFix2;
338 private volatile long _idleTick;
339 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
340
341
342 SelectSet(int acceptorID) throws Exception
343 {
344 _setID=acceptorID;
345
346 _idleTick = System.currentTimeMillis();
347 _timeout = new Timeout(this);
348 _timeout.setDuration(0L);
349
350
351 _selector = Selector.open();
352 _monitorStart=System.currentTimeMillis();
353 _monitorNext=_monitorStart+__MONITOR_PERIOD;
354 _log=_monitorStart+60000;
355 }
356
357
358 public void addChange(Object change)
359 {
360 _changes.add(change);
361 }
362
363
364 public void addChange(SelectableChannel channel, Object att)
365 {
366 if (att==null)
367 addChange(channel);
368 else if (att instanceof EndPoint)
369 addChange(att);
370 else
371 addChange(new ChannelAndAttachment(channel,att));
372 }
373
374
375
376
377
378
379
380 public void doSelect() throws IOException
381 {
382 Log.debug("doSelect "+SelectorManager.this.isRunning());
383 try
384 {
385 _selecting=Thread.currentThread();
386 final Selector selector=_selector;
387
388
389 Object change;
390 int changes=_changes.size();
391 while (changes-->0 && (change=_changes.poll())!=null)
392 {
393 try
394 {
395 if (change instanceof EndPoint)
396 {
397
398 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
399 endpoint.doUpdateKey();
400 }
401 else if (change instanceof ChannelAndAttachment)
402 {
403
404 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
405 final SelectableChannel channel=asc._channel;
406 final Object att = asc._attachment;
407
408 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
409 {
410 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
411 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
412 key.attach(endpoint);
413 endpoint.schedule();
414 }
415 else if (channel.isOpen())
416 {
417 channel.register(selector,SelectionKey.OP_CONNECT,att);
418 }
419 }
420 else if (change instanceof SocketChannel)
421 {
422
423 final SocketChannel channel=(SocketChannel)change;
424 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
425 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
426 key.attach(endpoint);
427 endpoint.schedule();
428 }
429 else if (change instanceof Runnable)
430 {
431 dispatch((Runnable)change);
432 }
433 else
434 throw new IllegalArgumentException(change.toString());
435 }
436 catch (Exception e)
437 {
438 if (isRunning())
439 Log.warn(e);
440 else
441 Log.debug(e);
442 }
443 catch (Error e)
444 {
445 if (isRunning())
446 Log.warn(e);
447 else
448 Log.debug(e);
449 }
450 }
451
452
453
454 int selected=selector.selectNow();
455 _selects++;
456
457 long now=System.currentTimeMillis();
458
459
460 if (selected==0 && selector.selectedKeys().isEmpty())
461 {
462
463 if (_pausing)
464 {
465 try
466 {
467 Thread.sleep(__BUSY_PAUSE);
468 }
469 catch(InterruptedException e)
470 {
471 Log.ignore(e);
472 }
473 now=System.currentTimeMillis();
474 }
475
476
477 _timeout.setNow(now);
478 long to_next_timeout=_timeout.getTimeToNext();
479
480 long wait = _changes.size()==0?__IDLE_TICK:0L;
481 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
482 wait = to_next_timeout;
483
484
485 if (wait>0)
486 {
487 long before=now;
488 selected=selector.select(wait);
489 _selects++;
490 now = System.currentTimeMillis();
491 _timeout.setNow(now);
492
493 if (__JVMBUG_THRESHHOLD>0)
494 checkJvmBugs(before, now, wait, selected);
495 }
496 }
497
498
499 if (_selector==null || !selector.isOpen())
500 return;
501
502
503 for (SelectionKey key: selector.selectedKeys())
504 {
505 try
506 {
507 if (!key.isValid())
508 {
509 key.cancel();
510 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
511 if (endpoint != null)
512 endpoint.doUpdateKey();
513 continue;
514 }
515
516 Object att = key.attachment();
517 if (att instanceof SelectChannelEndPoint)
518 {
519 if (key.isReadable()||key.isWritable())
520 ((SelectChannelEndPoint)att).schedule();
521 }
522 else if (key.isConnectable())
523 {
524
525 SocketChannel channel = (SocketChannel)key.channel();
526 boolean connected=false;
527 try
528 {
529 connected=channel.finishConnect();
530 }
531 catch(Exception e)
532 {
533 connectionFailed(channel,e,att);
534 }
535 finally
536 {
537 if (connected)
538 {
539 key.interestOps(SelectionKey.OP_READ);
540 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
541 key.attach(endpoint);
542 endpoint.schedule();
543 }
544 else
545 {
546 key.cancel();
547 }
548 }
549 }
550 else
551 {
552
553 SocketChannel channel = (SocketChannel)key.channel();
554 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
555 key.attach(endpoint);
556 if (key.isReadable())
557 endpoint.schedule();
558 }
559 key = null;
560 }
561 catch (CancelledKeyException e)
562 {
563 Log.ignore(e);
564 }
565 catch (Exception e)
566 {
567 if (isRunning())
568 Log.warn(e);
569 else
570 Log.ignore(e);
571
572 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
573 key.cancel();
574 }
575 }
576
577
578 selector.selectedKeys().clear();
579
580 now=System.currentTimeMillis();
581 _timeout.setNow(now);
582 Task task = _timeout.expired();
583 while (task!=null)
584 {
585 if (task instanceof Runnable)
586 dispatch((Runnable)task);
587 task = _timeout.expired();
588 }
589
590
591 if (now-_idleTick>__IDLE_TICK)
592 {
593 _idleTick=now;
594
595 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
596 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
597 :now;
598
599 dispatch(new Runnable()
600 {
601 public void run()
602 {
603 for (SelectChannelEndPoint endp:_endPoints.keySet())
604 {
605 endp.checkIdleTimestamp(idle_now);
606 }
607 }
608 });
609 }
610 }
611 catch (ClosedSelectorException e)
612 {
613 if (isRunning())
614 Log.warn(e);
615 else
616 Log.ignore(e);
617 }
618 catch (CancelledKeyException e)
619 {
620 Log.ignore(e);
621 }
622 finally
623 {
624 _selecting=null;
625 }
626 }
627
628
629 private void checkJvmBugs(long before, long now, long wait, int selected)
630 throws IOException
631 {
632 Selector selector = _selector;
633 if (selector==null)
634 return;
635
636
637
638
639 if (now>_monitorNext)
640 {
641 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
642 _pausing=_selects>__MAX_SELECTS;
643 if (_pausing)
644 _paused++;
645
646 _selects=0;
647 _jvmBug=0;
648 _monitorStart=now;
649 _monitorNext=now+__MONITOR_PERIOD;
650 }
651
652 if (now>_log)
653 {
654 if (_paused>0)
655 Log.debug(this+" Busy selector - injecting delay "+_paused+" times");
656
657 if (_jvmFix2>0)
658 Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
659
660 if (_jvmFix1>0)
661 Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
662
663 else if(Log.isDebugEnabled() && _jvmFix0>0)
664 Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
665 _paused=0;
666 _jvmFix2=0;
667 _jvmFix1=0;
668 _jvmFix0=0;
669 _log=now+60000;
670 }
671
672
673 if (selected==0 && wait>10 && (now-before)<(wait/2))
674 {
675
676 _jvmBug++;
677 if (_jvmBug>(__JVMBUG_THRESHHOLD))
678 {
679 try
680 {
681 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
682 _jvmFix2++;
683
684 Thread.sleep(__BUSY_PAUSE);
685 }
686 catch(InterruptedException e)
687 {
688 Log.ignore(e);
689 }
690 }
691 else if (_jvmBug==__JVMBUG_THRESHHOLD)
692 {
693 synchronized (this)
694 {
695
696 final Selector new_selector = Selector.open();
697 for (SelectionKey k: selector.keys())
698 {
699 if (!k.isValid() || k.interestOps()==0)
700 continue;
701
702 final SelectableChannel channel = k.channel();
703 final Object attachment = k.attachment();
704
705 if (attachment==null)
706 addChange(channel);
707 else
708 addChange(channel,attachment);
709 }
710 _selector.close();
711 _selector=new_selector;
712 return;
713 }
714 }
715 else if (_jvmBug%32==31)
716 {
717
718 int cancelled=0;
719 for (SelectionKey k: selector.keys())
720 {
721 if (k.isValid()&&k.interestOps()==0)
722 {
723 k.cancel();
724 cancelled++;
725 }
726 }
727 if (cancelled>0)
728 _jvmFix0++;
729
730 return;
731 }
732 }
733 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
734 {
735
736 SelectionKey busy = selector.selectedKeys().iterator().next();
737 if (busy==_busyKey)
738 {
739 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
740 {
741 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
742 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
743 busy.cancel();
744 if (endpoint!=null)
745 {
746 dispatch(new Runnable()
747 {
748 public void run()
749 {
750 try
751 {
752 endpoint.close();
753 }
754 catch (IOException e)
755 {
756 Log.ignore(e);
757 }
758 }
759 });
760 }
761 }
762 }
763 else
764 _busyKeyCount=0;
765 _busyKey=busy;
766 }
767 }
768
769
770 public SelectorManager getManager()
771 {
772 return SelectorManager.this;
773 }
774
775
776 public long getNow()
777 {
778 return _timeout.getNow();
779 }
780
781
782
783
784
785
786
787
788 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
789 {
790 if (!(task instanceof Runnable))
791 throw new IllegalArgumentException("!Runnable");
792 _timeout.schedule(task, timeoutMs);
793 }
794
795
796 public void cancelTimeout(Timeout.Task task)
797 {
798 task.cancel();
799 }
800
801
802 public void wakeup()
803 {
804 Selector selector = _selector;
805 if (selector!=null)
806 selector.wakeup();
807 }
808
809
810 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
811 {
812 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
813 endPointOpened(endp);
814 _endPoints.put(endp,this);
815 return endp;
816 }
817
818
819 public void destroyEndPoint(SelectChannelEndPoint endp)
820 {
821 _endPoints.remove(endp);
822 endPointClosed(endp);
823 }
824
825
826 Selector getSelector()
827 {
828 return _selector;
829 }
830
831
832 void stop() throws Exception
833 {
834
835
836 try
837 {
838 for (int i=0;i<100 && _selecting!=null;i++)
839 {
840 wakeup();
841 Thread.sleep(10);
842 }
843 }
844 catch(Exception e)
845 {
846 Log.ignore(e);
847 }
848
849
850 synchronized (this)
851 {
852 for (SelectionKey key:_selector.keys())
853 {
854 if (key==null)
855 continue;
856 Object att=key.attachment();
857 if (att instanceof EndPoint)
858 {
859 EndPoint endpoint = (EndPoint)att;
860 try
861 {
862 endpoint.close();
863 }
864 catch(IOException e)
865 {
866 Log.ignore(e);
867 }
868 }
869 }
870
871
872 _timeout.cancelAll();
873 try
874 {
875 if (_selector != null)
876 _selector.close();
877 }
878 catch (IOException e)
879 {
880 Log.ignore(e);
881 }
882 _selector=null;
883 }
884 }
885
886
887 public String dump()
888 {
889 return AggregateLifeCycle.dump(this);
890 }
891
892
893 public void dump(Appendable out, String indent) throws IOException
894 {
895 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
896
897 Thread selecting = _selecting;
898
899 Object where = "not selecting";
900 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
901 if (trace!=null)
902 {
903 for (StackTraceElement t:trace)
904 if (t.getClassName().startsWith("org.eclipse.jetty."))
905 {
906 where=t;
907 break;
908 }
909 }
910
911 Selector selector=_selector;
912 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
913 dump.add(where);
914
915 final CountDownLatch latch = new CountDownLatch(1);
916
917 addChange(new Runnable(){
918 public void run()
919 {
920 dumpKeyState(dump);
921 latch.countDown();
922 }
923 });
924
925 try
926 {
927 latch.await(5,TimeUnit.SECONDS);
928 }
929 catch(InterruptedException e)
930 {
931 Log.ignore(e);
932 }
933 AggregateLifeCycle.dump(out,indent,dump);
934 }
935
936
937 public void dumpKeyState(List<Object> dumpto)
938 {
939 Selector selector=_selector;
940 dumpto.add(selector+" keys="+selector.keys().size());
941 for (SelectionKey key: selector.keys())
942 {
943 if (key.isValid())
944 dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
945 else
946 dumpto.add(key.attachment()+" - - ");
947 }
948 }
949 }
950
951
952 private static class ChannelAndAttachment
953 {
954 final SelectableChannel _channel;
955 final Object _attachment;
956
957 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
958 {
959 super();
960 _channel = channel;
961 _attachment = attachment;
962 }
963 }
964
965
966 public boolean isDeferringInterestedOps0()
967 {
968 return _deferringInterestedOps0;
969 }
970
971
972 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
973 {
974 _deferringInterestedOps0 = deferringInterestedOps0;
975 }
976
977 }