1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io.nio;
20
21 import java.io.IOException;
22 import java.nio.channels.CancelledKeyException;
23 import java.nio.channels.Channel;
24 import java.nio.channels.ClosedSelectorException;
25 import java.nio.channels.SelectableChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.ServerSocketChannel;
29 import java.nio.channels.SocketChannel;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38
39 import org.eclipse.jetty.io.AsyncEndPoint;
40 import org.eclipse.jetty.io.ConnectedEndPoint;
41 import org.eclipse.jetty.io.Connection;
42 import org.eclipse.jetty.io.EndPoint;
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.component.AbstractLifeCycle;
45 import org.eclipse.jetty.util.component.AggregateLifeCycle;
46 import org.eclipse.jetty.util.component.Dumpable;
47 import org.eclipse.jetty.util.log.Log;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.thread.Timeout;
50 import org.eclipse.jetty.util.thread.Timeout.Task;
51
52
53
54
55
56
57
58
59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
60 {
61 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
62
63 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
64 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
65 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
66 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
67
68 private int _maxIdleTime;
69 private int _lowResourcesMaxIdleTime;
70 private long _lowResourcesConnections;
71 private SelectSet[] _selectSet;
72 private int _selectSets=1;
73 private volatile int _set=0;
74 private boolean _deferringInterestedOps0=true;
75 private int _selectorPriorityDelta=0;
76
77
78
79
80
81
82 public void setMaxIdleTime(long maxIdleTime)
83 {
84 _maxIdleTime=(int)maxIdleTime;
85 }
86
87
88
89
90
91 public void setSelectSets(int selectSets)
92 {
93 long lrc = _lowResourcesConnections * _selectSets;
94 _selectSets=selectSets;
95 _lowResourcesConnections=lrc/_selectSets;
96 }
97
98
99
100
101
102 public long getMaxIdleTime()
103 {
104 return _maxIdleTime;
105 }
106
107
108
109
110
111 public int getSelectSets()
112 {
113 return _selectSets;
114 }
115
116
117
118
119
120
121 public SelectSet getSelectSet(int i)
122 {
123 return _selectSet[i];
124 }
125
126
127
128
129
130
131 public void register(SocketChannel channel, Object att)
132 {
133
134
135
136
137 int s=_set++;
138 if (s<0)
139 s=-s;
140 s=s%_selectSets;
141 SelectSet[] sets=_selectSet;
142 if (sets!=null)
143 {
144 SelectSet set=sets[s];
145 set.addChange(channel,att);
146 set.wakeup();
147 }
148 }
149
150
151
152
153
154
155 public void register(SocketChannel channel)
156 {
157
158
159
160
161 int s=_set++;
162 if (s<0)
163 s=-s;
164 s=s%_selectSets;
165 SelectSet[] sets=_selectSet;
166 if (sets!=null)
167 {
168 SelectSet set=sets[s];
169 set.addChange(channel);
170 set.wakeup();
171 }
172 }
173
174
175
176
177
178 public void register(ServerSocketChannel acceptChannel)
179 {
180 int s=_set++;
181 if (s<0)
182 s=-s;
183 s=s%_selectSets;
184 SelectSet set=_selectSet[s];
185 set.addChange(acceptChannel);
186 set.wakeup();
187 }
188
189
190
191
192
193 public int getSelectorPriorityDelta()
194 {
195 return _selectorPriorityDelta;
196 }
197
198
199
200
201
202 public void setSelectorPriorityDelta(int delta)
203 {
204 _selectorPriorityDelta=delta;
205 }
206
207
208
209
210
211
212 public long getLowResourcesConnections()
213 {
214 return _lowResourcesConnections*_selectSets;
215 }
216
217
218
219
220
221
222
223
224 public void setLowResourcesConnections(long lowResourcesConnections)
225 {
226 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
227 }
228
229
230
231
232
233 public long getLowResourcesMaxIdleTime()
234 {
235 return _lowResourcesMaxIdleTime;
236 }
237
238
239
240
241
242
243 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
244 {
245 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
246 }
247
248
249
250 public abstract boolean dispatch(Runnable task);
251
252
253
254
255
256 @Override
257 protected void doStart() throws Exception
258 {
259 _selectSet = new SelectSet[_selectSets];
260 for (int i=0;i<_selectSet.length;i++)
261 _selectSet[i]= new SelectSet(i);
262
263 super.doStart();
264
265
266 for (int i=0;i<getSelectSets();i++)
267 {
268 final int id=i;
269 boolean selecting=dispatch(new Runnable()
270 {
271 public void run()
272 {
273 String name=Thread.currentThread().getName();
274 int priority=Thread.currentThread().getPriority();
275 try
276 {
277 SelectSet[] sets=_selectSet;
278 if (sets==null)
279 return;
280 SelectSet set=sets[id];
281
282 Thread.currentThread().setName(name+" Selector"+id);
283 if (getSelectorPriorityDelta()!=0)
284 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
285 LOG.debug("Starting {} on {}",Thread.currentThread(),this);
286 while (isRunning())
287 {
288 try
289 {
290 set.doSelect();
291 }
292 catch(IOException e)
293 {
294 LOG.ignore(e);
295 }
296 catch(Exception e)
297 {
298 LOG.warn(e);
299 }
300 }
301 }
302 finally
303 {
304 LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
305 Thread.currentThread().setName(name);
306 if (getSelectorPriorityDelta()!=0)
307 Thread.currentThread().setPriority(priority);
308 }
309 }
310
311 });
312
313 if (!selecting)
314 throw new IllegalStateException("!Selecting");
315 }
316 }
317
318
319
320 @Override
321 protected void doStop() throws Exception
322 {
323 SelectSet[] sets= _selectSet;
324 _selectSet=null;
325 if (sets!=null)
326 {
327 for (SelectSet set : sets)
328 {
329 if (set!=null)
330 set.stop();
331 }
332 }
333 super.doStop();
334 }
335
336
337
338
339
340 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
341
342
343
344
345
346 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
347
348
349 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
350
351
352 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
353
354
355
356
357
358
359
360
361
362
363 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
364
365
366 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
367 {
368 LOG.warn(ex+","+channel+","+attachment);
369 LOG.debug(ex);
370 }
371
372
373 public String dump()
374 {
375 return AggregateLifeCycle.dump(this);
376 }
377
378
379 public void dump(Appendable out, String indent) throws IOException
380 {
381 AggregateLifeCycle.dumpObject(out,this);
382 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
383 }
384
385
386
387
388
389 public class SelectSet implements Dumpable
390 {
391 private final int _setID;
392 private final Timeout _timeout;
393
394 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
395
396 private volatile Selector _selector;
397
398 private volatile Thread _selecting;
399 private int _busySelects;
400 private long _monitorNext;
401 private boolean _pausing;
402 private boolean _paused;
403 private volatile long _idleTick;
404 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
405
406
407 SelectSet(int acceptorID) throws Exception
408 {
409 _setID=acceptorID;
410
411 _idleTick = System.currentTimeMillis();
412 _timeout = new Timeout(this);
413 _timeout.setDuration(0L);
414
415
416 _selector = Selector.open();
417 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
418 }
419
420
421 public void addChange(Object change)
422 {
423 _changes.add(change);
424 }
425
426
427 public void addChange(SelectableChannel channel, Object att)
428 {
429 if (att==null)
430 addChange(channel);
431 else if (att instanceof EndPoint)
432 addChange(att);
433 else
434 addChange(new ChannelAndAttachment(channel,att));
435 }
436
437
438
439
440
441
442
443 public void doSelect() throws IOException
444 {
445 try
446 {
447 _selecting=Thread.currentThread();
448 final Selector selector=_selector;
449
450 if (selector == null)
451 return;
452
453
454 Object change;
455 int changes=_changes.size();
456 while (changes-->0 && (change=_changes.poll())!=null)
457 {
458 Channel ch=null;
459 SelectionKey key=null;
460
461 try
462 {
463 if (change instanceof EndPoint)
464 {
465
466 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
467 ch=endpoint.getChannel();
468 endpoint.doUpdateKey();
469 }
470 else if (change instanceof ChannelAndAttachment)
471 {
472
473 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
474 final SelectableChannel channel=asc._channel;
475 ch=channel;
476 final Object att = asc._attachment;
477
478 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
479 {
480 key = channel.register(selector,SelectionKey.OP_READ,att);
481 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
482 key.attach(endpoint);
483 endpoint.schedule();
484 }
485 else if (channel.isOpen())
486 {
487 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
488 }
489 }
490 else if (change instanceof SocketChannel)
491 {
492
493 final SocketChannel channel=(SocketChannel)change;
494 ch=channel;
495 key = channel.register(selector,SelectionKey.OP_READ,null);
496 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
497 key.attach(endpoint);
498 endpoint.schedule();
499 }
500 else if (change instanceof ChangeTask)
501 {
502 ((Runnable)change).run();
503 }
504 else if (change instanceof Runnable)
505 {
506 dispatch((Runnable)change);
507 }
508 else
509 throw new IllegalArgumentException(change.toString());
510 }
511 catch (CancelledKeyException e)
512 {
513 LOG.ignore(e);
514 }
515 catch (Throwable e)
516 {
517 if (isRunning())
518 LOG.warn(e);
519 else
520 LOG.debug(e);
521
522 try
523 {
524 if (ch!=null)
525 ch.close();
526 }
527 catch(IOException e2)
528 {
529 LOG.debug(e2);
530 }
531 }
532 }
533
534
535
536 int selected=selector.selectNow();
537
538 long now=System.currentTimeMillis();
539
540
541 if (selected==0 && selector.selectedKeys().isEmpty())
542 {
543
544 if (_pausing)
545 {
546 try
547 {
548 Thread.sleep(__BUSY_PAUSE);
549 }
550 catch(InterruptedException e)
551 {
552 LOG.ignore(e);
553 }
554 now=System.currentTimeMillis();
555 }
556
557
558 _timeout.setNow(now);
559 long to_next_timeout=_timeout.getTimeToNext();
560
561 long wait = _changes.size()==0?__IDLE_TICK:0L;
562 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
563 wait = to_next_timeout;
564
565
566 if (wait>0)
567 {
568 long before=now;
569 selector.select(wait);
570 now = System.currentTimeMillis();
571 _timeout.setNow(now);
572
573
574
575 if (__MONITOR_PERIOD>0 && now-before <=1)
576 {
577
578 if (++_busySelects>__MAX_SELECTS)
579 {
580
581 _pausing=true;
582
583
584 if (!_paused)
585 {
586
587 _paused=true;
588 LOG.warn("Selector {} is too busy, pausing!",this);
589 }
590 }
591 }
592 }
593 }
594
595
596 if (_selector==null || !selector.isOpen())
597 return;
598
599
600 for (SelectionKey key: selector.selectedKeys())
601 {
602 SocketChannel channel=null;
603
604 try
605 {
606 if (!key.isValid())
607 {
608 key.cancel();
609 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
610 if (endpoint != null)
611 endpoint.doUpdateKey();
612 continue;
613 }
614
615 Object att = key.attachment();
616 if (att instanceof SelectChannelEndPoint)
617 {
618 if (key.isReadable()||key.isWritable())
619 ((SelectChannelEndPoint)att).schedule();
620 }
621 else if (key.isConnectable())
622 {
623
624 channel = (SocketChannel)key.channel();
625 boolean connected=false;
626 try
627 {
628 connected=channel.finishConnect();
629 }
630 catch(Exception e)
631 {
632 connectionFailed(channel,e,att);
633 }
634 finally
635 {
636 if (connected)
637 {
638 key.interestOps(SelectionKey.OP_READ);
639 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
640 key.attach(endpoint);
641 endpoint.schedule();
642 }
643 else
644 {
645 key.cancel();
646 }
647 }
648 }
649 else
650 {
651
652 channel = (SocketChannel)key.channel();
653 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
654 key.attach(endpoint);
655 if (key.isReadable())
656 endpoint.schedule();
657 }
658 key = null;
659 }
660 catch (CancelledKeyException e)
661 {
662 LOG.ignore(e);
663 }
664 catch (Exception e)
665 {
666 if (isRunning())
667 LOG.warn(e);
668 else
669 LOG.ignore(e);
670
671 try
672 {
673 if (channel!=null)
674 channel.close();
675 }
676 catch(IOException e2)
677 {
678 LOG.debug(e2);
679 }
680
681 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
682 key.cancel();
683 }
684 }
685
686
687 selector.selectedKeys().clear();
688
689 now=System.currentTimeMillis();
690 _timeout.setNow(now);
691 Task task = _timeout.expired();
692 while (task!=null)
693 {
694 if (task instanceof Runnable)
695 dispatch((Runnable)task);
696 task = _timeout.expired();
697 }
698
699
700 if (now-_idleTick>__IDLE_TICK)
701 {
702 _idleTick=now;
703
704 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
705 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
706 :now;
707
708 dispatch(new Runnable()
709 {
710 public void run()
711 {
712 for (SelectChannelEndPoint endp:_endPoints.keySet())
713 {
714 endp.checkIdleTimestamp(idle_now);
715 }
716 }
717 public String toString() {return "Idle-"+super.toString();}
718 });
719
720 }
721
722
723 if (__MONITOR_PERIOD>0 && now>_monitorNext)
724 {
725 _busySelects=0;
726 _pausing=false;
727 _monitorNext=now+__MONITOR_PERIOD;
728
729 }
730 }
731 catch (ClosedSelectorException e)
732 {
733 if (isRunning())
734 LOG.warn(e);
735 else
736 LOG.ignore(e);
737 }
738 catch (CancelledKeyException e)
739 {
740 LOG.ignore(e);
741 }
742 finally
743 {
744 _selecting=null;
745 }
746 }
747
748
749
750 private void renewSelector()
751 {
752 try
753 {
754 synchronized (this)
755 {
756 Selector selector=_selector;
757 if (selector==null)
758 return;
759 final Selector new_selector = Selector.open();
760 for (SelectionKey k: selector.keys())
761 {
762 if (!k.isValid() || k.interestOps()==0)
763 continue;
764
765 final SelectableChannel channel = k.channel();
766 final Object attachment = k.attachment();
767
768 if (attachment==null)
769 addChange(channel);
770 else
771 addChange(channel,attachment);
772 }
773 _selector.close();
774 _selector=new_selector;
775 }
776 }
777 catch(IOException e)
778 {
779 throw new RuntimeException("recreating selector",e);
780 }
781 }
782
783
784 public SelectorManager getManager()
785 {
786 return SelectorManager.this;
787 }
788
789
790 public long getNow()
791 {
792 return _timeout.getNow();
793 }
794
795
796
797
798
799
800
801
802 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
803 {
804 if (!(task instanceof Runnable))
805 throw new IllegalArgumentException("!Runnable");
806 _timeout.schedule(task, timeoutMs);
807 }
808
809
810 public void cancelTimeout(Timeout.Task task)
811 {
812 task.cancel();
813 }
814
815
816 public void wakeup()
817 {
818 try
819 {
820 Selector selector = _selector;
821 if (selector!=null)
822 selector.wakeup();
823 }
824 catch(Exception e)
825 {
826 addChange(new ChangeTask()
827 {
828 public void run()
829 {
830 renewSelector();
831 }
832 });
833
834 renewSelector();
835 }
836 }
837
838
839 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
840 {
841 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
842 LOG.debug("created {}",endp);
843 endPointOpened(endp);
844 _endPoints.put(endp,this);
845 return endp;
846 }
847
848
849 public void destroyEndPoint(SelectChannelEndPoint endp)
850 {
851 LOG.debug("destroyEndPoint {}",endp);
852 _endPoints.remove(endp);
853 endPointClosed(endp);
854 }
855
856
857 Selector getSelector()
858 {
859 return _selector;
860 }
861
862
863 void stop() throws Exception
864 {
865
866
867 try
868 {
869 for (int i=0;i<100 && _selecting!=null;i++)
870 {
871 wakeup();
872 Thread.sleep(10);
873 }
874 }
875 catch(Exception e)
876 {
877 LOG.ignore(e);
878 }
879
880
881 synchronized (this)
882 {
883 Selector selector=_selector;
884 for (SelectionKey key:selector.keys())
885 {
886 if (key==null)
887 continue;
888 Object att=key.attachment();
889 if (att instanceof EndPoint)
890 {
891 EndPoint endpoint = (EndPoint)att;
892 try
893 {
894 endpoint.close();
895 }
896 catch(IOException e)
897 {
898 LOG.ignore(e);
899 }
900 }
901 }
902
903
904 _timeout.cancelAll();
905 try
906 {
907 selector=_selector;
908 if (selector != null)
909 selector.close();
910 }
911 catch (IOException e)
912 {
913 LOG.ignore(e);
914 }
915 _selector=null;
916 }
917 }
918
919
920 public String dump()
921 {
922 return AggregateLifeCycle.dump(this);
923 }
924
925
926 public void dump(Appendable out, String indent) throws IOException
927 {
928 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
929
930 Thread selecting = _selecting;
931
932 Object where = "not selecting";
933 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
934 if (trace!=null)
935 {
936 for (StackTraceElement t:trace)
937 if (t.getClassName().startsWith("org.eclipse.jetty."))
938 {
939 where=t;
940 break;
941 }
942 }
943
944 Selector selector=_selector;
945 if (selector!=null)
946 {
947 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
948 dump.add(where);
949
950 final CountDownLatch latch = new CountDownLatch(1);
951
952 addChange(new ChangeTask()
953 {
954 public void run()
955 {
956 dumpKeyState(dump);
957 latch.countDown();
958 }
959 });
960
961 try
962 {
963 latch.await(5,TimeUnit.SECONDS);
964 }
965 catch(InterruptedException e)
966 {
967 LOG.ignore(e);
968 }
969
970 AggregateLifeCycle.dump(out,indent,dump);
971 }
972 }
973
974
975 public void dumpKeyState(List<Object> dumpto)
976 {
977 Selector selector=_selector;
978 Set<SelectionKey> keys = selector.keys();
979 dumpto.add(selector + " keys=" + keys.size());
980 for (SelectionKey key: keys)
981 {
982 if (key.isValid())
983 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
984 else
985 dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
986 }
987 }
988
989
990 public String toString()
991 {
992 Selector selector=_selector;
993 return String.format("%s keys=%d selected=%d",
994 super.toString(),
995 selector != null && selector.isOpen() ? selector.keys().size() : -1,
996 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
997 }
998 }
999
1000
1001 private static class ChannelAndAttachment
1002 {
1003 final SelectableChannel _channel;
1004 final Object _attachment;
1005
1006 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1007 {
1008 super();
1009 _channel = channel;
1010 _attachment = attachment;
1011 }
1012 }
1013
1014
1015 public boolean isDeferringInterestedOps0()
1016 {
1017 return _deferringInterestedOps0;
1018 }
1019
1020
1021 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1022 {
1023 _deferringInterestedOps0 = deferringInterestedOps0;
1024 }
1025
1026
1027
1028
1029
1030 private interface ChangeTask extends Runnable
1031 {}
1032
1033 }