1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.Channel;
19 import java.nio.channels.ClosedSelectorException;
20 import java.nio.channels.SelectableChannel;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.Selector;
23 import java.nio.channels.ServerSocketChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32
33 import org.eclipse.jetty.io.AsyncEndPoint;
34 import org.eclipse.jetty.io.ConnectedEndPoint;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.util.TypeUtil;
38 import org.eclipse.jetty.util.component.AbstractLifeCycle;
39 import org.eclipse.jetty.util.component.AggregateLifeCycle;
40 import org.eclipse.jetty.util.component.Dumpable;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43 import org.eclipse.jetty.util.thread.Timeout;
44 import org.eclipse.jetty.util.thread.Timeout.Task;
45
46
47
48
49
50
51
52
53 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
54 {
55 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
56
57 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
58 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
59 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
60 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
61
62 private int _maxIdleTime;
63 private int _lowResourcesMaxIdleTime;
64 private long _lowResourcesConnections;
65 private SelectSet[] _selectSet;
66 private int _selectSets=1;
67 private volatile int _set=0;
68 private boolean _deferringInterestedOps0=true;
69 private int _selectorPriorityDelta=0;
70
71
72
73
74
75
76 public void setMaxIdleTime(long maxIdleTime)
77 {
78 _maxIdleTime=(int)maxIdleTime;
79 }
80
81
82
83
84
85 public void setSelectSets(int selectSets)
86 {
87 long lrc = _lowResourcesConnections * _selectSets;
88 _selectSets=selectSets;
89 _lowResourcesConnections=lrc/_selectSets;
90 }
91
92
93
94
95
96 public long getMaxIdleTime()
97 {
98 return _maxIdleTime;
99 }
100
101
102
103
104
105 public int getSelectSets()
106 {
107 return _selectSets;
108 }
109
110
111
112
113
114
115 public SelectSet getSelectSet(int i)
116 {
117 return _selectSet[i];
118 }
119
120
121
122
123
124
125 public void register(SocketChannel channel, Object att)
126 {
127
128
129
130
131 int s=_set++;
132 if (s<0)
133 s=-s;
134 s=s%_selectSets;
135 SelectSet[] sets=_selectSet;
136 if (sets!=null)
137 {
138 SelectSet set=sets[s];
139 set.addChange(channel,att);
140 set.wakeup();
141 }
142 }
143
144
145
146
147
148
149 public void register(SocketChannel channel)
150 {
151
152
153
154
155 int s=_set++;
156 if (s<0)
157 s=-s;
158 s=s%_selectSets;
159 SelectSet[] sets=_selectSet;
160 if (sets!=null)
161 {
162 SelectSet set=sets[s];
163 set.addChange(channel);
164 set.wakeup();
165 }
166 }
167
168
169
170
171
172 public void register(ServerSocketChannel acceptChannel)
173 {
174 int s=_set++;
175 if (s<0)
176 s=-s;
177 s=s%_selectSets;
178 SelectSet set=_selectSet[s];
179 set.addChange(acceptChannel);
180 set.wakeup();
181 }
182
183
184
185
186
187 public int getSelectorPriorityDelta()
188 {
189 return _selectorPriorityDelta;
190 }
191
192
193
194
195
196 public void setSelectorPriorityDelta(int delta)
197 {
198 _selectorPriorityDelta=delta;
199 }
200
201
202
203
204
205
206 public long getLowResourcesConnections()
207 {
208 return _lowResourcesConnections*_selectSets;
209 }
210
211
212
213
214
215
216
217
218 public void setLowResourcesConnections(long lowResourcesConnections)
219 {
220 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
221 }
222
223
224
225
226
227 public long getLowResourcesMaxIdleTime()
228 {
229 return _lowResourcesMaxIdleTime;
230 }
231
232
233
234
235
236
237 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
238 {
239 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
240 }
241
242
243
244 public abstract boolean dispatch(Runnable task);
245
246
247
248
249
250 @Override
251 protected void doStart() throws Exception
252 {
253 _selectSet = new SelectSet[_selectSets];
254 for (int i=0;i<_selectSet.length;i++)
255 _selectSet[i]= new SelectSet(i);
256
257 super.doStart();
258
259
260 for (int i=0;i<getSelectSets();i++)
261 {
262 final int id=i;
263 dispatch(new Runnable()
264 {
265 public void run()
266 {
267 String name=Thread.currentThread().getName();
268 int priority=Thread.currentThread().getPriority();
269 try
270 {
271 SelectSet[] sets=_selectSet;
272 if (sets==null)
273 return;
274 SelectSet set=sets[id];
275
276 Thread.currentThread().setName(name+" Selector"+id);
277 if (getSelectorPriorityDelta()!=0)
278 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
279 LOG.debug("Starting {} on {}",Thread.currentThread(),this);
280 while (isRunning())
281 {
282 try
283 {
284 set.doSelect();
285 }
286 catch(IOException e)
287 {
288 LOG.ignore(e);
289 }
290 catch(Exception e)
291 {
292 LOG.warn(e);
293 }
294 }
295 }
296 finally
297 {
298 LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
299 Thread.currentThread().setName(name);
300 if (getSelectorPriorityDelta()!=0)
301 Thread.currentThread().setPriority(priority);
302 }
303 }
304
305 });
306 }
307 }
308
309
310
311 @Override
312 protected void doStop() throws Exception
313 {
314 SelectSet[] sets= _selectSet;
315 _selectSet=null;
316 if (sets!=null)
317 {
318 for (SelectSet set : sets)
319 {
320 if (set!=null)
321 set.stop();
322 }
323 }
324 super.doStop();
325 }
326
327
328
329
330
331 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
332
333
334
335
336
337 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
338
339
340 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
341
342
343 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
344
345
346
347
348
349
350
351
352
353
354 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
355
356
357 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
358 {
359 LOG.warn(ex+","+channel+","+attachment);
360 LOG.debug(ex);
361 }
362
363
364 public String dump()
365 {
366 return AggregateLifeCycle.dump(this);
367 }
368
369
370 public void dump(Appendable out, String indent) throws IOException
371 {
372 out.append(String.valueOf(this)).append("\n");
373 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
374 }
375
376
377
378
379
380 public class SelectSet implements Dumpable
381 {
382 private final int _setID;
383 private final Timeout _timeout;
384
385 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
386
387 private volatile Selector _selector;
388
389 private volatile Thread _selecting;
390 private int _busySelects;
391 private long _monitorNext;
392 private boolean _pausing;
393 private boolean _paused;
394 private volatile long _idleTick;
395 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
396
397
398 SelectSet(int acceptorID) throws Exception
399 {
400 _setID=acceptorID;
401
402 _idleTick = System.currentTimeMillis();
403 _timeout = new Timeout(this);
404 _timeout.setDuration(0L);
405
406
407 _selector = Selector.open();
408 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
409 }
410
411
412 public void addChange(Object change)
413 {
414 _changes.add(change);
415 }
416
417
418 public void addChange(SelectableChannel channel, Object att)
419 {
420 if (att==null)
421 addChange(channel);
422 else if (att instanceof EndPoint)
423 addChange(att);
424 else
425 addChange(new ChannelAndAttachment(channel,att));
426 }
427
428
429
430
431
432
433
434 public void doSelect() throws IOException
435 {
436 try
437 {
438 _selecting=Thread.currentThread();
439 final Selector selector=_selector;
440
441 if (selector == null)
442 return;
443
444
445 Object change;
446 int changes=_changes.size();
447 while (changes-->0 && (change=_changes.poll())!=null)
448 {
449 Channel ch=null;
450 SelectionKey key=null;
451
452 try
453 {
454 if (change instanceof EndPoint)
455 {
456
457 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
458 ch=endpoint.getChannel();
459 endpoint.doUpdateKey();
460 }
461 else if (change instanceof ChannelAndAttachment)
462 {
463
464 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
465 final SelectableChannel channel=asc._channel;
466 ch=channel;
467 final Object att = asc._attachment;
468
469 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
470 {
471 key = channel.register(selector,SelectionKey.OP_READ,att);
472 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
473 key.attach(endpoint);
474 endpoint.schedule();
475 }
476 else if (channel.isOpen())
477 {
478 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
479 }
480 }
481 else if (change instanceof SocketChannel)
482 {
483
484 final SocketChannel channel=(SocketChannel)change;
485 ch=channel;
486 key = channel.register(selector,SelectionKey.OP_READ,null);
487 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
488 key.attach(endpoint);
489 endpoint.schedule();
490 }
491 else if (change instanceof ChangeTask)
492 {
493 ((Runnable)change).run();
494 }
495 else if (change instanceof Runnable)
496 {
497 dispatch((Runnable)change);
498 }
499 else
500 throw new IllegalArgumentException(change.toString());
501 }
502 catch (CancelledKeyException e)
503 {
504 LOG.ignore(e);
505 }
506 catch (Throwable e)
507 {
508 if (isRunning())
509 LOG.warn(e);
510 else
511 LOG.debug(e);
512
513 try
514 {
515 if (ch!=null)
516 ch.close();
517 }
518 catch(IOException e2)
519 {
520 LOG.debug(e2);
521 }
522 }
523 }
524
525
526
527 int selected=selector.selectNow();
528
529 long now=System.currentTimeMillis();
530
531
532 if (selected==0 && selector.selectedKeys().isEmpty())
533 {
534
535 if (_pausing)
536 {
537 try
538 {
539 Thread.sleep(__BUSY_PAUSE);
540 }
541 catch(InterruptedException e)
542 {
543 LOG.ignore(e);
544 }
545 now=System.currentTimeMillis();
546 }
547
548
549 _timeout.setNow(now);
550 long to_next_timeout=_timeout.getTimeToNext();
551
552 long wait = _changes.size()==0?__IDLE_TICK:0L;
553 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
554 wait = to_next_timeout;
555
556
557 if (wait>0)
558 {
559 long before=now;
560 selected=selector.select(wait);
561 now = System.currentTimeMillis();
562 _timeout.setNow(now);
563
564
565
566 if (__MONITOR_PERIOD>0 && now-before <=1)
567 {
568
569 if (++_busySelects>__MAX_SELECTS)
570 {
571
572 _pausing=true;
573
574
575 if (!_paused)
576 {
577
578 _paused=true;
579 LOG.warn("Selector {} is too busy, pausing!",this);
580 }
581 }
582 }
583 }
584 }
585
586
587 if (_selector==null || !selector.isOpen())
588 return;
589
590
591 for (SelectionKey key: selector.selectedKeys())
592 {
593 SocketChannel channel=null;
594
595 try
596 {
597 if (!key.isValid())
598 {
599 key.cancel();
600 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
601 if (endpoint != null)
602 endpoint.doUpdateKey();
603 continue;
604 }
605
606 Object att = key.attachment();
607 if (att instanceof SelectChannelEndPoint)
608 {
609 if (key.isReadable()||key.isWritable())
610 ((SelectChannelEndPoint)att).schedule();
611 }
612 else if (key.isConnectable())
613 {
614
615 channel = (SocketChannel)key.channel();
616 boolean connected=false;
617 try
618 {
619 connected=channel.finishConnect();
620 }
621 catch(Exception e)
622 {
623 connectionFailed(channel,e,att);
624 }
625 finally
626 {
627 if (connected)
628 {
629 key.interestOps(SelectionKey.OP_READ);
630 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
631 key.attach(endpoint);
632 endpoint.schedule();
633 }
634 else
635 {
636 key.cancel();
637 }
638 }
639 }
640 else
641 {
642
643 channel = (SocketChannel)key.channel();
644 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
645 key.attach(endpoint);
646 if (key.isReadable())
647 endpoint.schedule();
648 }
649 key = null;
650 }
651 catch (CancelledKeyException e)
652 {
653 LOG.ignore(e);
654 }
655 catch (Exception e)
656 {
657 if (isRunning())
658 LOG.warn(e);
659 else
660 LOG.ignore(e);
661
662 try
663 {
664 if (channel!=null)
665 channel.close();
666 }
667 catch(IOException e2)
668 {
669 LOG.debug(e2);
670 }
671
672 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
673 key.cancel();
674 }
675 }
676
677
678 selector.selectedKeys().clear();
679
680 now=System.currentTimeMillis();
681 _timeout.setNow(now);
682 Task task = _timeout.expired();
683 while (task!=null)
684 {
685 if (task instanceof Runnable)
686 dispatch((Runnable)task);
687 task = _timeout.expired();
688 }
689
690
691 if (now-_idleTick>__IDLE_TICK)
692 {
693 _idleTick=now;
694
695 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
696 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
697 :now;
698
699 dispatch(new Runnable()
700 {
701 public void run()
702 {
703 for (SelectChannelEndPoint endp:_endPoints.keySet())
704 {
705 endp.checkIdleTimestamp(idle_now);
706 }
707 }
708 public String toString() {return "Idle-"+super.toString();}
709 });
710
711 }
712
713
714 if (__MONITOR_PERIOD>0 && now>_monitorNext)
715 {
716 _busySelects=0;
717 _pausing=false;
718 _monitorNext=now+__MONITOR_PERIOD;
719
720 }
721 }
722 catch (ClosedSelectorException e)
723 {
724 if (isRunning())
725 LOG.warn(e);
726 else
727 LOG.ignore(e);
728 }
729 catch (CancelledKeyException e)
730 {
731 LOG.ignore(e);
732 }
733 finally
734 {
735 _selecting=null;
736 }
737 }
738
739
740
741 private void renewSelector()
742 {
743 try
744 {
745 synchronized (this)
746 {
747 Selector selector=_selector;
748 if (selector==null)
749 return;
750 final Selector new_selector = Selector.open();
751 for (SelectionKey k: selector.keys())
752 {
753 if (!k.isValid() || k.interestOps()==0)
754 continue;
755
756 final SelectableChannel channel = k.channel();
757 final Object attachment = k.attachment();
758
759 if (attachment==null)
760 addChange(channel);
761 else
762 addChange(channel,attachment);
763 }
764 _selector.close();
765 _selector=new_selector;
766 }
767 }
768 catch(IOException e)
769 {
770 throw new RuntimeException("recreating selector",e);
771 }
772 }
773
774
775 public SelectorManager getManager()
776 {
777 return SelectorManager.this;
778 }
779
780
781 public long getNow()
782 {
783 return _timeout.getNow();
784 }
785
786
787
788
789
790
791
792
793 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
794 {
795 if (!(task instanceof Runnable))
796 throw new IllegalArgumentException("!Runnable");
797 _timeout.schedule(task, timeoutMs);
798 }
799
800
801 public void cancelTimeout(Timeout.Task task)
802 {
803 task.cancel();
804 }
805
806
807 public void wakeup()
808 {
809 try
810 {
811 Selector selector = _selector;
812 if (selector!=null)
813 selector.wakeup();
814 }
815 catch(Exception e)
816 {
817 addChange(new ChangeTask()
818 {
819 public void run()
820 {
821 renewSelector();
822 }
823 });
824
825 renewSelector();
826 }
827 }
828
829
830 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
831 {
832 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
833 LOG.debug("created {}",endp);
834 endPointOpened(endp);
835 _endPoints.put(endp,this);
836 return endp;
837 }
838
839
840 public void destroyEndPoint(SelectChannelEndPoint endp)
841 {
842 LOG.debug("destroyEndPoint {}",endp);
843 _endPoints.remove(endp);
844 endPointClosed(endp);
845 }
846
847
848 Selector getSelector()
849 {
850 return _selector;
851 }
852
853
854 void stop() throws Exception
855 {
856
857
858 try
859 {
860 for (int i=0;i<100 && _selecting!=null;i++)
861 {
862 wakeup();
863 Thread.sleep(10);
864 }
865 }
866 catch(Exception e)
867 {
868 LOG.ignore(e);
869 }
870
871
872 synchronized (this)
873 {
874 Selector selector=_selector;
875 for (SelectionKey key:selector.keys())
876 {
877 if (key==null)
878 continue;
879 Object att=key.attachment();
880 if (att instanceof EndPoint)
881 {
882 EndPoint endpoint = (EndPoint)att;
883 try
884 {
885 endpoint.close();
886 }
887 catch(IOException e)
888 {
889 LOG.ignore(e);
890 }
891 }
892 }
893
894
895 _timeout.cancelAll();
896 try
897 {
898 selector=_selector;
899 if (selector != null)
900 selector.close();
901 }
902 catch (IOException e)
903 {
904 LOG.ignore(e);
905 }
906 _selector=null;
907 }
908 }
909
910
911 public String dump()
912 {
913 return AggregateLifeCycle.dump(this);
914 }
915
916
917 public void dump(Appendable out, String indent) throws IOException
918 {
919 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
920
921 Thread selecting = _selecting;
922
923 Object where = "not selecting";
924 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
925 if (trace!=null)
926 {
927 for (StackTraceElement t:trace)
928 if (t.getClassName().startsWith("org.eclipse.jetty."))
929 {
930 where=t;
931 break;
932 }
933 }
934
935 Selector selector=_selector;
936 if (selector!=null)
937 {
938 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
939 dump.add(where);
940
941 final CountDownLatch latch = new CountDownLatch(1);
942
943 addChange(new ChangeTask()
944 {
945 public void run()
946 {
947 dumpKeyState(dump);
948 latch.countDown();
949 }
950 });
951
952 try
953 {
954 latch.await(5,TimeUnit.SECONDS);
955 }
956 catch(InterruptedException e)
957 {
958 LOG.ignore(e);
959 }
960 AggregateLifeCycle.dump(out,indent,dump);
961 }
962 }
963
964
965 public void dumpKeyState(List<Object> dumpto)
966 {
967 Selector selector=_selector;
968 dumpto.add(selector+" keys="+selector.keys().size());
969 for (SelectionKey key: selector.keys())
970 {
971 if (key.isValid())
972 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
973 else
974 dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
975 }
976 }
977
978
979 public String toString()
980 {
981 Selector selector=_selector;
982 return String.format("%s %s keys=%d selected=%d",
983 super.toString(),
984 SelectorManager.this.getState(),
985 selector != null && selector.isOpen() ? selector.keys().size() : -1,
986 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
987 }
988 }
989
990
991 private static class ChannelAndAttachment
992 {
993 final SelectableChannel _channel;
994 final Object _attachment;
995
996 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
997 {
998 super();
999 _channel = channel;
1000 _attachment = attachment;
1001 }
1002 }
1003
1004
1005 public boolean isDeferringInterestedOps0()
1006 {
1007 return _deferringInterestedOps0;
1008 }
1009
1010
1011 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1012 {
1013 _deferringInterestedOps0 = deferringInterestedOps0;
1014 }
1015
1016
1017
1018
1019
1020 private interface ChangeTask extends Runnable
1021 {}
1022
1023 }