1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.Channel;
19 import java.nio.channels.ClosedSelectorException;
20 import java.nio.channels.SelectableChannel;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.Selector;
23 import java.nio.channels.ServerSocketChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.TimeUnit;
33
34 import org.eclipse.jetty.io.AsyncEndPoint;
35 import org.eclipse.jetty.io.ConnectedEndPoint;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.util.TypeUtil;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.component.AggregateLifeCycle;
41 import org.eclipse.jetty.util.component.Dumpable;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44 import org.eclipse.jetty.util.thread.Timeout;
45 import org.eclipse.jetty.util.thread.Timeout.Task;
46
47
48
49
50
51
52
53
54 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
55 {
56 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
57
58 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
59 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
60 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
61 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
62
63 private int _maxIdleTime;
64 private int _lowResourcesMaxIdleTime;
65 private long _lowResourcesConnections;
66 private SelectSet[] _selectSet;
67 private int _selectSets=1;
68 private volatile int _set=0;
69 private boolean _deferringInterestedOps0=true;
70 private int _selectorPriorityDelta=0;
71
72
73
74
75
76
77 public void setMaxIdleTime(long maxIdleTime)
78 {
79 _maxIdleTime=(int)maxIdleTime;
80 }
81
82
83
84
85
86 public void setSelectSets(int selectSets)
87 {
88 long lrc = _lowResourcesConnections * _selectSets;
89 _selectSets=selectSets;
90 _lowResourcesConnections=lrc/_selectSets;
91 }
92
93
94
95
96
97 public long getMaxIdleTime()
98 {
99 return _maxIdleTime;
100 }
101
102
103
104
105
106 public int getSelectSets()
107 {
108 return _selectSets;
109 }
110
111
112
113
114
115
116 public SelectSet getSelectSet(int i)
117 {
118 return _selectSet[i];
119 }
120
121
122
123
124
125
126 public void register(SocketChannel channel, Object att)
127 {
128
129
130
131
132 int s=_set++;
133 if (s<0)
134 s=-s;
135 s=s%_selectSets;
136 SelectSet[] sets=_selectSet;
137 if (sets!=null)
138 {
139 SelectSet set=sets[s];
140 set.addChange(channel,att);
141 set.wakeup();
142 }
143 }
144
145
146
147
148
149
150 public void register(SocketChannel channel)
151 {
152
153
154
155
156 int s=_set++;
157 if (s<0)
158 s=-s;
159 s=s%_selectSets;
160 SelectSet[] sets=_selectSet;
161 if (sets!=null)
162 {
163 SelectSet set=sets[s];
164 set.addChange(channel);
165 set.wakeup();
166 }
167 }
168
169
170
171
172
173 public void register(ServerSocketChannel acceptChannel)
174 {
175 int s=_set++;
176 if (s<0)
177 s=-s;
178 s=s%_selectSets;
179 SelectSet set=_selectSet[s];
180 set.addChange(acceptChannel);
181 set.wakeup();
182 }
183
184
185
186
187
188 public int getSelectorPriorityDelta()
189 {
190 return _selectorPriorityDelta;
191 }
192
193
194
195
196
197 public void setSelectorPriorityDelta(int delta)
198 {
199 _selectorPriorityDelta=delta;
200 }
201
202
203
204
205
206
207 public long getLowResourcesConnections()
208 {
209 return _lowResourcesConnections*_selectSets;
210 }
211
212
213
214
215
216
217
218
219 public void setLowResourcesConnections(long lowResourcesConnections)
220 {
221 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
222 }
223
224
225
226
227
228 public long getLowResourcesMaxIdleTime()
229 {
230 return _lowResourcesMaxIdleTime;
231 }
232
233
234
235
236
237
238 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
239 {
240 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
241 }
242
243
244
245 public abstract boolean dispatch(Runnable task);
246
247
248
249
250
251 @Override
252 protected void doStart() throws Exception
253 {
254 _selectSet = new SelectSet[_selectSets];
255 for (int i=0;i<_selectSet.length;i++)
256 _selectSet[i]= new SelectSet(i);
257
258 super.doStart();
259
260
261 for (int i=0;i<getSelectSets();i++)
262 {
263 final int id=i;
264 boolean selecting=dispatch(new Runnable()
265 {
266 public void run()
267 {
268 String name=Thread.currentThread().getName();
269 int priority=Thread.currentThread().getPriority();
270 try
271 {
272 SelectSet[] sets=_selectSet;
273 if (sets==null)
274 return;
275 SelectSet set=sets[id];
276
277 Thread.currentThread().setName(name+" Selector"+id);
278 if (getSelectorPriorityDelta()!=0)
279 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
280 LOG.debug("Starting {} on {}",Thread.currentThread(),this);
281 while (isRunning())
282 {
283 try
284 {
285 set.doSelect();
286 }
287 catch(IOException e)
288 {
289 LOG.ignore(e);
290 }
291 catch(Exception e)
292 {
293 LOG.warn(e);
294 }
295 }
296 }
297 finally
298 {
299 LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
300 Thread.currentThread().setName(name);
301 if (getSelectorPriorityDelta()!=0)
302 Thread.currentThread().setPriority(priority);
303 }
304 }
305
306 });
307
308 if (!selecting)
309 throw new IllegalStateException("!Selecting");
310 }
311 }
312
313
314
315 @Override
316 protected void doStop() throws Exception
317 {
318 SelectSet[] sets= _selectSet;
319 _selectSet=null;
320 if (sets!=null)
321 {
322 for (SelectSet set : sets)
323 {
324 if (set!=null)
325 set.stop();
326 }
327 }
328 super.doStop();
329 }
330
331
332
333
334
335 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
336
337
338
339
340
341 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
342
343
344 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
345
346
347 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
348
349
350
351
352
353
354
355
356
357
358 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
359
360
361 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
362 {
363 LOG.warn(ex+","+channel+","+attachment);
364 LOG.debug(ex);
365 }
366
367
368 public String dump()
369 {
370 return AggregateLifeCycle.dump(this);
371 }
372
373
374 public void dump(Appendable out, String indent) throws IOException
375 {
376 AggregateLifeCycle.dumpObject(out,this);
377 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
378 }
379
380
381
382
383
384 public class SelectSet implements Dumpable
385 {
386 private final int _setID;
387 private final Timeout _timeout;
388
389 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
390
391 private volatile Selector _selector;
392
393 private volatile Thread _selecting;
394 private int _busySelects;
395 private long _monitorNext;
396 private boolean _pausing;
397 private boolean _paused;
398 private volatile long _idleTick;
399 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
400
401
402 SelectSet(int acceptorID) throws Exception
403 {
404 _setID=acceptorID;
405
406 _idleTick = System.currentTimeMillis();
407 _timeout = new Timeout(this);
408 _timeout.setDuration(0L);
409
410
411 _selector = Selector.open();
412 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
413 }
414
415
416 public void addChange(Object change)
417 {
418 _changes.add(change);
419 }
420
421
422 public void addChange(SelectableChannel channel, Object att)
423 {
424 if (att==null)
425 addChange(channel);
426 else if (att instanceof EndPoint)
427 addChange(att);
428 else
429 addChange(new ChannelAndAttachment(channel,att));
430 }
431
432
433
434
435
436
437
438 public void doSelect() throws IOException
439 {
440 try
441 {
442 _selecting=Thread.currentThread();
443 final Selector selector=_selector;
444
445 if (selector == null)
446 return;
447
448
449 Object change;
450 int changes=_changes.size();
451 while (changes-->0 && (change=_changes.poll())!=null)
452 {
453 Channel ch=null;
454 SelectionKey key=null;
455
456 try
457 {
458 if (change instanceof EndPoint)
459 {
460
461 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
462 ch=endpoint.getChannel();
463 endpoint.doUpdateKey();
464 }
465 else if (change instanceof ChannelAndAttachment)
466 {
467
468 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
469 final SelectableChannel channel=asc._channel;
470 ch=channel;
471 final Object att = asc._attachment;
472
473 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
474 {
475 key = channel.register(selector,SelectionKey.OP_READ,att);
476 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
477 key.attach(endpoint);
478 endpoint.schedule();
479 }
480 else if (channel.isOpen())
481 {
482 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
483 }
484 }
485 else if (change instanceof SocketChannel)
486 {
487
488 final SocketChannel channel=(SocketChannel)change;
489 ch=channel;
490 key = channel.register(selector,SelectionKey.OP_READ,null);
491 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
492 key.attach(endpoint);
493 endpoint.schedule();
494 }
495 else if (change instanceof ChangeTask)
496 {
497 ((Runnable)change).run();
498 }
499 else if (change instanceof Runnable)
500 {
501 dispatch((Runnable)change);
502 }
503 else
504 throw new IllegalArgumentException(change.toString());
505 }
506 catch (CancelledKeyException e)
507 {
508 LOG.ignore(e);
509 }
510 catch (Throwable e)
511 {
512 if (isRunning())
513 LOG.warn(e);
514 else
515 LOG.debug(e);
516
517 try
518 {
519 if (ch!=null)
520 ch.close();
521 }
522 catch(IOException e2)
523 {
524 LOG.debug(e2);
525 }
526 }
527 }
528
529
530
531 int selected=selector.selectNow();
532
533 long now=System.currentTimeMillis();
534
535
536 if (selected==0 && selector.selectedKeys().isEmpty())
537 {
538
539 if (_pausing)
540 {
541 try
542 {
543 Thread.sleep(__BUSY_PAUSE);
544 }
545 catch(InterruptedException e)
546 {
547 LOG.ignore(e);
548 }
549 now=System.currentTimeMillis();
550 }
551
552
553 _timeout.setNow(now);
554 long to_next_timeout=_timeout.getTimeToNext();
555
556 long wait = _changes.size()==0?__IDLE_TICK:0L;
557 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
558 wait = to_next_timeout;
559
560
561 if (wait>0)
562 {
563 long before=now;
564 selected=selector.select(wait);
565 now = System.currentTimeMillis();
566 _timeout.setNow(now);
567
568
569
570 if (__MONITOR_PERIOD>0 && now-before <=1)
571 {
572
573 if (++_busySelects>__MAX_SELECTS)
574 {
575
576 _pausing=true;
577
578
579 if (!_paused)
580 {
581
582 _paused=true;
583 LOG.warn("Selector {} is too busy, pausing!",this);
584 }
585 }
586 }
587 }
588 }
589
590
591 if (_selector==null || !selector.isOpen())
592 return;
593
594
595 for (SelectionKey key: selector.selectedKeys())
596 {
597 SocketChannel channel=null;
598
599 try
600 {
601 if (!key.isValid())
602 {
603 key.cancel();
604 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
605 if (endpoint != null)
606 endpoint.doUpdateKey();
607 continue;
608 }
609
610 Object att = key.attachment();
611 if (att instanceof SelectChannelEndPoint)
612 {
613 if (key.isReadable()||key.isWritable())
614 ((SelectChannelEndPoint)att).schedule();
615 }
616 else if (key.isConnectable())
617 {
618
619 channel = (SocketChannel)key.channel();
620 boolean connected=false;
621 try
622 {
623 connected=channel.finishConnect();
624 }
625 catch(Exception e)
626 {
627 connectionFailed(channel,e,att);
628 }
629 finally
630 {
631 if (connected)
632 {
633 key.interestOps(SelectionKey.OP_READ);
634 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
635 key.attach(endpoint);
636 endpoint.schedule();
637 }
638 else
639 {
640 key.cancel();
641 }
642 }
643 }
644 else
645 {
646
647 channel = (SocketChannel)key.channel();
648 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
649 key.attach(endpoint);
650 if (key.isReadable())
651 endpoint.schedule();
652 }
653 key = null;
654 }
655 catch (CancelledKeyException e)
656 {
657 LOG.ignore(e);
658 }
659 catch (Exception e)
660 {
661 if (isRunning())
662 LOG.warn(e);
663 else
664 LOG.ignore(e);
665
666 try
667 {
668 if (channel!=null)
669 channel.close();
670 }
671 catch(IOException e2)
672 {
673 LOG.debug(e2);
674 }
675
676 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
677 key.cancel();
678 }
679 }
680
681
682 selector.selectedKeys().clear();
683
684 now=System.currentTimeMillis();
685 _timeout.setNow(now);
686 Task task = _timeout.expired();
687 while (task!=null)
688 {
689 if (task instanceof Runnable)
690 dispatch((Runnable)task);
691 task = _timeout.expired();
692 }
693
694
695 if (now-_idleTick>__IDLE_TICK)
696 {
697 _idleTick=now;
698
699 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
700 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
701 :now;
702
703 dispatch(new Runnable()
704 {
705 public void run()
706 {
707 for (SelectChannelEndPoint endp:_endPoints.keySet())
708 {
709 endp.checkIdleTimestamp(idle_now);
710 }
711 }
712 public String toString() {return "Idle-"+super.toString();}
713 });
714
715 }
716
717
718 if (__MONITOR_PERIOD>0 && now>_monitorNext)
719 {
720 _busySelects=0;
721 _pausing=false;
722 _monitorNext=now+__MONITOR_PERIOD;
723
724 }
725 }
726 catch (ClosedSelectorException e)
727 {
728 if (isRunning())
729 LOG.warn(e);
730 else
731 LOG.ignore(e);
732 }
733 catch (CancelledKeyException e)
734 {
735 LOG.ignore(e);
736 }
737 finally
738 {
739 _selecting=null;
740 }
741 }
742
743
744
745 private void renewSelector()
746 {
747 try
748 {
749 synchronized (this)
750 {
751 Selector selector=_selector;
752 if (selector==null)
753 return;
754 final Selector new_selector = Selector.open();
755 for (SelectionKey k: selector.keys())
756 {
757 if (!k.isValid() || k.interestOps()==0)
758 continue;
759
760 final SelectableChannel channel = k.channel();
761 final Object attachment = k.attachment();
762
763 if (attachment==null)
764 addChange(channel);
765 else
766 addChange(channel,attachment);
767 }
768 _selector.close();
769 _selector=new_selector;
770 }
771 }
772 catch(IOException e)
773 {
774 throw new RuntimeException("recreating selector",e);
775 }
776 }
777
778
779 public SelectorManager getManager()
780 {
781 return SelectorManager.this;
782 }
783
784
785 public long getNow()
786 {
787 return _timeout.getNow();
788 }
789
790
791
792
793
794
795
796
797 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
798 {
799 if (!(task instanceof Runnable))
800 throw new IllegalArgumentException("!Runnable");
801 _timeout.schedule(task, timeoutMs);
802 }
803
804
805 public void cancelTimeout(Timeout.Task task)
806 {
807 task.cancel();
808 }
809
810
811 public void wakeup()
812 {
813 try
814 {
815 Selector selector = _selector;
816 if (selector!=null)
817 selector.wakeup();
818 }
819 catch(Exception e)
820 {
821 addChange(new ChangeTask()
822 {
823 public void run()
824 {
825 renewSelector();
826 }
827 });
828
829 renewSelector();
830 }
831 }
832
833
834 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
835 {
836 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
837 LOG.debug("created {}",endp);
838 endPointOpened(endp);
839 _endPoints.put(endp,this);
840 return endp;
841 }
842
843
844 public void destroyEndPoint(SelectChannelEndPoint endp)
845 {
846 LOG.debug("destroyEndPoint {}",endp);
847 _endPoints.remove(endp);
848 endPointClosed(endp);
849 }
850
851
852 Selector getSelector()
853 {
854 return _selector;
855 }
856
857
858 void stop() throws Exception
859 {
860
861
862 try
863 {
864 for (int i=0;i<100 && _selecting!=null;i++)
865 {
866 wakeup();
867 Thread.sleep(10);
868 }
869 }
870 catch(Exception e)
871 {
872 LOG.ignore(e);
873 }
874
875
876 synchronized (this)
877 {
878 Selector selector=_selector;
879 for (SelectionKey key:selector.keys())
880 {
881 if (key==null)
882 continue;
883 Object att=key.attachment();
884 if (att instanceof EndPoint)
885 {
886 EndPoint endpoint = (EndPoint)att;
887 try
888 {
889 endpoint.close();
890 }
891 catch(IOException e)
892 {
893 LOG.ignore(e);
894 }
895 }
896 }
897
898
899 _timeout.cancelAll();
900 try
901 {
902 selector=_selector;
903 if (selector != null)
904 selector.close();
905 }
906 catch (IOException e)
907 {
908 LOG.ignore(e);
909 }
910 _selector=null;
911 }
912 }
913
914
915 public String dump()
916 {
917 return AggregateLifeCycle.dump(this);
918 }
919
920
921 public void dump(Appendable out, String indent) throws IOException
922 {
923 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
924
925 Thread selecting = _selecting;
926
927 Object where = "not selecting";
928 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
929 if (trace!=null)
930 {
931 for (StackTraceElement t:trace)
932 if (t.getClassName().startsWith("org.eclipse.jetty."))
933 {
934 where=t;
935 break;
936 }
937 }
938
939 Selector selector=_selector;
940 if (selector!=null)
941 {
942 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
943 dump.add(where);
944
945 final CountDownLatch latch = new CountDownLatch(1);
946
947 addChange(new ChangeTask()
948 {
949 public void run()
950 {
951 dumpKeyState(dump);
952 latch.countDown();
953 }
954 });
955
956 try
957 {
958 latch.await(5,TimeUnit.SECONDS);
959 }
960 catch(InterruptedException e)
961 {
962 LOG.ignore(e);
963 }
964
965 AggregateLifeCycle.dump(out,indent,dump);
966 }
967 }
968
969
970 public void dumpKeyState(List<Object> dumpto)
971 {
972 Selector selector=_selector;
973 Set<SelectionKey> keys = selector.keys();
974 dumpto.add(selector + " keys=" + keys.size());
975 for (SelectionKey key: keys)
976 {
977 if (key.isValid())
978 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
979 else
980 dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
981 }
982 }
983
984
985 public String toString()
986 {
987 Selector selector=_selector;
988 return String.format("%s keys=%d selected=%d",
989 super.toString(),
990 selector != null && selector.isOpen() ? selector.keys().size() : -1,
991 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
992 }
993 }
994
995
996 private static class ChannelAndAttachment
997 {
998 final SelectableChannel _channel;
999 final Object _attachment;
1000
1001 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1002 {
1003 super();
1004 _channel = channel;
1005 _attachment = attachment;
1006 }
1007 }
1008
1009
1010 public boolean isDeferringInterestedOps0()
1011 {
1012 return _deferringInterestedOps0;
1013 }
1014
1015
1016 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1017 {
1018 _deferringInterestedOps0 = deferringInterestedOps0;
1019 }
1020
1021
1022
1023
1024
1025 private interface ChangeTask extends Runnable
1026 {}
1027
1028 }