1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.Channel;
19 import java.nio.channels.ClosedSelectorException;
20 import java.nio.channels.SelectableChannel;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.Selector;
23 import java.nio.channels.ServerSocketChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import org.eclipse.jetty.io.ConnectedEndPoint;
34 import org.eclipse.jetty.io.Connection;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.util.TypeUtil;
37 import org.eclipse.jetty.util.component.AbstractLifeCycle;
38 import org.eclipse.jetty.util.component.AggregateLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Timeout;
43 import org.eclipse.jetty.util.thread.Timeout.Task;
44
45
46
47
48
49
50
51
52 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
53 {
54 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
55
56 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
57 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
58 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
59 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
60
61 private int _maxIdleTime;
62 private int _lowResourcesMaxIdleTime;
63 private long _lowResourcesConnections;
64 private SelectSet[] _selectSet;
65 private int _selectSets=1;
66 private volatile int _set;
67 private boolean _deferringInterestedOps0=true;
68 private int _selectorPriorityDelta=0;
69
70
71
72
73
74
75 public void setMaxIdleTime(long maxIdleTime)
76 {
77 _maxIdleTime=(int)maxIdleTime;
78 }
79
80
81
82
83
84 public void setSelectSets(int selectSets)
85 {
86 long lrc = _lowResourcesConnections * _selectSets;
87 _selectSets=selectSets;
88 _lowResourcesConnections=lrc/_selectSets;
89 }
90
91
92
93
94
95 public long getMaxIdleTime()
96 {
97 return _maxIdleTime;
98 }
99
100
101
102
103
104 public int getSelectSets()
105 {
106 return _selectSets;
107 }
108
109
110
111
112
113
114 public SelectSet getSelectSet(int i)
115 {
116 return _selectSet[i];
117 }
118
119
120
121
122
123
124 public void register(SocketChannel channel, Object att)
125 {
126
127
128
129
130 int s=_set++;
131 s=s%_selectSets;
132 SelectSet[] sets=_selectSet;
133 if (sets!=null)
134 {
135 SelectSet set=sets[s];
136 set.addChange(channel,att);
137 set.wakeup();
138 }
139 }
140
141
142
143
144
145
146 public void register(SocketChannel channel)
147 {
148
149
150
151
152 int s=_set++;
153 s=s%_selectSets;
154 SelectSet[] sets=_selectSet;
155 if (sets!=null)
156 {
157 SelectSet set=sets[s];
158 set.addChange(channel);
159 set.wakeup();
160 }
161 }
162
163
164
165
166
167 public void register(ServerSocketChannel acceptChannel)
168 {
169 int s=_set++;
170 s=s%_selectSets;
171 SelectSet set=_selectSet[s];
172 set.addChange(acceptChannel);
173 set.wakeup();
174 }
175
176
177
178
179
180 public int getSelectorPriorityDelta()
181 {
182 return _selectorPriorityDelta;
183 }
184
185
186
187
188
189 public void setSelectorPriorityDelta(int delta)
190 {
191 _selectorPriorityDelta=delta;
192 }
193
194
195
196
197
198
199 public long getLowResourcesConnections()
200 {
201 return _lowResourcesConnections*_selectSets;
202 }
203
204
205
206
207
208
209
210
211 public void setLowResourcesConnections(long lowResourcesConnections)
212 {
213 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
214 }
215
216
217
218
219
220 public long getLowResourcesMaxIdleTime()
221 {
222 return _lowResourcesMaxIdleTime;
223 }
224
225
226
227
228
229
230 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
231 {
232 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
233 }
234
235
236
237 public abstract boolean dispatch(Runnable task);
238
239
240
241
242
243 @Override
244 protected void doStart() throws Exception
245 {
246 _selectSet = new SelectSet[_selectSets];
247 for (int i=0;i<_selectSet.length;i++)
248 _selectSet[i]= new SelectSet(i);
249
250 super.doStart();
251
252
253 for (int i=0;i<getSelectSets();i++)
254 {
255 final int id=i;
256 dispatch(new Runnable()
257 {
258 public void run()
259 {
260 String name=Thread.currentThread().getName();
261 int priority=Thread.currentThread().getPriority();
262 try
263 {
264 SelectSet[] sets=_selectSet;
265 if (sets==null)
266 return;
267 SelectSet set=sets[id];
268
269 Thread.currentThread().setName(name+" Selector"+id);
270 if (getSelectorPriorityDelta()!=0)
271 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
272 LOG.debug("Starting {} on {}",Thread.currentThread(),this);
273 while (isRunning())
274 {
275 try
276 {
277 set.doSelect();
278 }
279 catch(ThreadDeath e)
280 {
281 throw e;
282 }
283 catch(IOException e)
284 {
285 LOG.ignore(e);
286 }
287 catch(Exception e)
288 {
289 LOG.warn(e);
290 }
291 }
292 }
293 finally
294 {
295 LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
296 Thread.currentThread().setName(name);
297 if (getSelectorPriorityDelta()!=0)
298 Thread.currentThread().setPriority(priority);
299 }
300 }
301
302 });
303 }
304 }
305
306
307
308 @Override
309 protected void doStop() throws Exception
310 {
311 SelectSet[] sets= _selectSet;
312 _selectSet=null;
313 if (sets!=null)
314 {
315 for (SelectSet set : sets)
316 {
317 if (set!=null)
318 set.stop();
319 }
320 }
321 super.doStop();
322 }
323
324
325
326
327
328 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
329
330
331
332
333
334 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
335
336
337 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
338
339
340 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
341
342
343
344
345
346
347
348
349
350
351 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
352
353
354 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
355 {
356 LOG.warn(ex+","+channel+","+attachment);
357 LOG.debug(ex);
358 }
359
360
361 public String dump()
362 {
363 return AggregateLifeCycle.dump(this);
364 }
365
366
367 public void dump(Appendable out, String indent) throws IOException
368 {
369 out.append(String.valueOf(this)).append("\n");
370 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
371 }
372
373
374
375
376
377 public class SelectSet implements Dumpable
378 {
379 private final int _setID;
380 private final Timeout _timeout;
381
382 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
383
384 private volatile Selector _selector;
385
386 private volatile Thread _selecting;
387 private int _busySelects;
388 private long _monitorNext;
389 private boolean _pausing;
390 private boolean _paused;
391 private volatile long _idleTick;
392 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
393
394
395 SelectSet(int acceptorID) throws Exception
396 {
397 _setID=acceptorID;
398
399 _idleTick = System.currentTimeMillis();
400 _timeout = new Timeout(this);
401 _timeout.setDuration(0L);
402
403
404 _selector = Selector.open();
405 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
406 }
407
408
409 public void addChange(Object change)
410 {
411 _changes.add(change);
412 }
413
414
415 public void addChange(SelectableChannel channel, Object att)
416 {
417 if (att==null)
418 addChange(channel);
419 else if (att instanceof EndPoint)
420 addChange(att);
421 else
422 addChange(new ChannelAndAttachment(channel,att));
423 }
424
425
426
427
428
429
430
431 public void doSelect() throws IOException
432 {
433 try
434 {
435 _selecting=Thread.currentThread();
436 final Selector selector=_selector;
437
438 if (selector == null)
439 return;
440
441
442 Object change;
443 int changes=_changes.size();
444 while (changes-->0 && (change=_changes.poll())!=null)
445 {
446 Channel ch=null;
447 SelectionKey key=null;
448
449 try
450 {
451 if (change instanceof EndPoint)
452 {
453
454 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
455 ch=endpoint.getChannel();
456 endpoint.doUpdateKey();
457 }
458 else if (change instanceof ChannelAndAttachment)
459 {
460
461 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
462 final SelectableChannel channel=asc._channel;
463 ch=channel;
464 final Object att = asc._attachment;
465
466 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
467 {
468 key = channel.register(selector,SelectionKey.OP_READ,att);
469 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
470 key.attach(endpoint);
471 endpoint.schedule();
472 }
473 else if (channel.isOpen())
474 {
475 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
476 }
477 }
478 else if (change instanceof SocketChannel)
479 {
480
481 final SocketChannel channel=(SocketChannel)change;
482 ch=channel;
483 key = channel.register(selector,SelectionKey.OP_READ,null);
484 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
485 key.attach(endpoint);
486 endpoint.schedule();
487 }
488 else if (change instanceof ChangeTask)
489 {
490 ((Runnable)change).run();
491 }
492 else if (change instanceof Runnable)
493 {
494 dispatch((Runnable)change);
495 }
496 else
497 throw new IllegalArgumentException(change.toString());
498 }
499 catch (CancelledKeyException e)
500 {
501 LOG.ignore(e);
502 }
503 catch (Throwable e)
504 {
505 if (e instanceof ThreadDeath)
506 throw (ThreadDeath)e;
507
508 if (isRunning())
509 LOG.warn(e);
510 else
511 LOG.debug(e);
512
513 try
514 {
515 if (ch!=null)
516 ch.close();
517 }
518 catch(IOException e2)
519 {
520 LOG.debug(e2);
521 }
522 }
523 }
524
525
526
527 int selected=selector.selectNow();
528
529 long now=System.currentTimeMillis();
530
531
532 if (selected==0 && selector.selectedKeys().isEmpty())
533 {
534
535 if (_pausing)
536 {
537 try
538 {
539 Thread.sleep(__BUSY_PAUSE);
540 }
541 catch(InterruptedException e)
542 {
543 LOG.ignore(e);
544 }
545 now=System.currentTimeMillis();
546 }
547
548
549 _timeout.setNow(now);
550 long to_next_timeout=_timeout.getTimeToNext();
551
552 long wait = _changes.size()==0?__IDLE_TICK:0L;
553 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
554 wait = to_next_timeout;
555
556
557 if (wait>0)
558 {
559 long before=now;
560 selected=selector.select(wait);
561 now = System.currentTimeMillis();
562 _timeout.setNow(now);
563
564
565
566 if (__MONITOR_PERIOD>0 && now-before <=1)
567 {
568
569 if (++_busySelects>__MAX_SELECTS)
570 {
571
572 _pausing=true;
573
574
575 if (!_paused)
576 {
577
578 _paused=true;
579 LOG.warn("Selector {} is too busy, pausing!",this);
580 final SelectSet set = this;
581 SelectorManager.this.dispatch(
582 new Runnable(){
583 public void run()
584 {
585 System.err.println(set+":\n"+set.dump());
586 }
587 });
588 }
589 }
590 }
591 }
592 }
593
594
595 if (_selector==null || !selector.isOpen())
596 return;
597
598
599 for (SelectionKey key: selector.selectedKeys())
600 {
601 SocketChannel channel=null;
602
603 try
604 {
605 if (!key.isValid())
606 {
607 key.cancel();
608 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
609 if (endpoint != null)
610 endpoint.doUpdateKey();
611 continue;
612 }
613
614 Object att = key.attachment();
615 if (att instanceof SelectChannelEndPoint)
616 {
617 if (key.isReadable()||key.isWritable())
618 ((SelectChannelEndPoint)att).schedule();
619 }
620 else if (key.isConnectable())
621 {
622
623 channel = (SocketChannel)key.channel();
624 boolean connected=false;
625 try
626 {
627 connected=channel.finishConnect();
628 }
629 catch(Exception e)
630 {
631 connectionFailed(channel,e,att);
632 }
633 finally
634 {
635 if (connected)
636 {
637 key.interestOps(SelectionKey.OP_READ);
638 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
639 key.attach(endpoint);
640 endpoint.schedule();
641 }
642 else
643 {
644 key.cancel();
645 }
646 }
647 }
648 else
649 {
650
651 channel = (SocketChannel)key.channel();
652 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
653 key.attach(endpoint);
654 if (key.isReadable())
655 endpoint.schedule();
656 }
657 key = null;
658 }
659 catch (CancelledKeyException e)
660 {
661 LOG.ignore(e);
662 }
663 catch (Exception e)
664 {
665 if (isRunning())
666 LOG.warn(e);
667 else
668 LOG.ignore(e);
669
670 try
671 {
672 if (channel!=null)
673 channel.close();
674 }
675 catch(IOException e2)
676 {
677 LOG.debug(e2);
678 }
679
680 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
681 key.cancel();
682 }
683 }
684
685
686 selector.selectedKeys().clear();
687
688 now=System.currentTimeMillis();
689 _timeout.setNow(now);
690 Task task = _timeout.expired();
691 while (task!=null)
692 {
693 if (task instanceof Runnable)
694 dispatch((Runnable)task);
695 task = _timeout.expired();
696 }
697
698
699 if (now-_idleTick>__IDLE_TICK)
700 {
701 _idleTick=now;
702
703 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
704 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
705 :now;
706
707 dispatch(new Runnable()
708 {
709 public void run()
710 {
711 for (SelectChannelEndPoint endp:_endPoints.keySet())
712 {
713 endp.checkIdleTimestamp(idle_now);
714 }
715 }
716 });
717
718 }
719
720
721 if (__MONITOR_PERIOD>0 && now>_monitorNext)
722 {
723 _busySelects=0;
724 _pausing=false;
725 _monitorNext=now+__MONITOR_PERIOD;
726
727 }
728 }
729 catch (ClosedSelectorException e)
730 {
731 if (isRunning())
732 LOG.warn(e);
733 else
734 LOG.ignore(e);
735 }
736 catch (CancelledKeyException e)
737 {
738 LOG.ignore(e);
739 }
740 finally
741 {
742 _selecting=null;
743 }
744 }
745
746
747
748 private void renewSelector()
749 {
750 try
751 {
752 synchronized (this)
753 {
754 Selector selector=_selector;
755 if (selector==null)
756 return;
757 final Selector new_selector = Selector.open();
758 for (SelectionKey k: selector.keys())
759 {
760 if (!k.isValid() || k.interestOps()==0)
761 continue;
762
763 final SelectableChannel channel = k.channel();
764 final Object attachment = k.attachment();
765
766 if (attachment==null)
767 addChange(channel);
768 else
769 addChange(channel,attachment);
770 }
771 _selector.close();
772 _selector=new_selector;
773 }
774 }
775 catch(IOException e)
776 {
777 throw new RuntimeException("recreating selector",e);
778 }
779 }
780
781
782 public SelectorManager getManager()
783 {
784 return SelectorManager.this;
785 }
786
787
788 public long getNow()
789 {
790 return _timeout.getNow();
791 }
792
793
794
795
796
797
798
799
800 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
801 {
802 if (!(task instanceof Runnable))
803 throw new IllegalArgumentException("!Runnable");
804 _timeout.schedule(task, timeoutMs);
805 }
806
807
808 public void cancelTimeout(Timeout.Task task)
809 {
810 task.cancel();
811 }
812
813
814 public void wakeup()
815 {
816 try
817 {
818 Selector selector = _selector;
819 if (selector!=null)
820 selector.wakeup();
821 }
822 catch(Exception e)
823 {
824 addChange(new ChangeTask()
825 {
826 public void run()
827 {
828 renewSelector();
829 }
830 });
831
832 renewSelector();
833 }
834 }
835
836
837 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
838 {
839 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
840 endPointOpened(endp);
841 _endPoints.put(endp,this);
842 return endp;
843 }
844
845
846 public void destroyEndPoint(SelectChannelEndPoint endp)
847 {
848 LOG.debug("destroyEndPoint {}",endp);
849 _endPoints.remove(endp);
850 endPointClosed(endp);
851 }
852
853
854 Selector getSelector()
855 {
856 return _selector;
857 }
858
859
860 void stop() throws Exception
861 {
862
863
864 try
865 {
866 for (int i=0;i<100 && _selecting!=null;i++)
867 {
868 wakeup();
869 Thread.sleep(10);
870 }
871 }
872 catch(Exception e)
873 {
874 LOG.ignore(e);
875 }
876
877
878 synchronized (this)
879 {
880 Selector selector=_selector;
881 for (SelectionKey key:selector.keys())
882 {
883 if (key==null)
884 continue;
885 Object att=key.attachment();
886 if (att instanceof EndPoint)
887 {
888 EndPoint endpoint = (EndPoint)att;
889 try
890 {
891 endpoint.close();
892 }
893 catch(IOException e)
894 {
895 LOG.ignore(e);
896 }
897 }
898 }
899
900
901 _timeout.cancelAll();
902 try
903 {
904 selector=_selector;
905 if (selector != null)
906 selector.close();
907 }
908 catch (IOException e)
909 {
910 LOG.ignore(e);
911 }
912 _selector=null;
913 }
914 }
915
916
917 public String dump()
918 {
919 return AggregateLifeCycle.dump(this);
920 }
921
922
923 public void dump(Appendable out, String indent) throws IOException
924 {
925 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
926
927 Thread selecting = _selecting;
928
929 Object where = "not selecting";
930 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
931 if (trace!=null)
932 {
933 for (StackTraceElement t:trace)
934 if (t.getClassName().startsWith("org.eclipse.jetty."))
935 {
936 where=t;
937 break;
938 }
939 }
940
941 Selector selector=_selector;
942 if (selector!=null)
943 {
944 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
945 dump.add(where);
946
947 final CountDownLatch latch = new CountDownLatch(1);
948
949 addChange(new ChangeTask()
950 {
951 public void run()
952 {
953 dumpKeyState(dump);
954 latch.countDown();
955 }
956 });
957
958 try
959 {
960 latch.await(5,TimeUnit.SECONDS);
961 }
962 catch(InterruptedException e)
963 {
964 LOG.ignore(e);
965 }
966 AggregateLifeCycle.dump(out,indent,dump);
967 }
968 }
969
970
971 public void dumpKeyState(List<Object> dumpto)
972 {
973 Selector selector=_selector;
974 dumpto.add(selector+" keys="+selector.keys().size());
975 for (SelectionKey key: selector.keys())
976 {
977 if (key.isValid())
978 dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
979 else
980 dumpto.add(key.attachment()+" - - ");
981 }
982 }
983 }
984
985
986 private static class ChannelAndAttachment
987 {
988 final SelectableChannel _channel;
989 final Object _attachment;
990
991 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
992 {
993 super();
994 _channel = channel;
995 _attachment = attachment;
996 }
997 }
998
999
1000 public boolean isDeferringInterestedOps0()
1001 {
1002 return _deferringInterestedOps0;
1003 }
1004
1005
1006 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1007 {
1008 _deferringInterestedOps0 = deferringInterestedOps0;
1009 }
1010
1011
1012
1013
1014
1015 private interface ChangeTask extends Runnable
1016 {}
1017
1018 }