1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.Selector;
21 import java.nio.channels.ServerSocketChannel;
22 import java.nio.channels.SocketChannel;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.ConcurrentMap;
29
30 import org.eclipse.jetty.io.ConnectedEndPoint;
31 import org.eclipse.jetty.io.Connection;
32 import org.eclipse.jetty.io.EndPoint;
33 import org.eclipse.jetty.util.component.AbstractLifeCycle;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.thread.Timeout;
36 import org.eclipse.jetty.util.thread.Timeout.Task;
37
38
39
40
41
42
43
44
45
46
47 public abstract class SelectorManager extends AbstractLifeCycle
48 {
49
50 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
51 private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
52 private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
53 private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
54 private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
55
56 private int _maxIdleTime;
57 private int _lowResourcesMaxIdleTime;
58 private long _lowResourcesConnections;
59 private SelectSet[] _selectSet;
60 private int _selectSets=1;
61 private volatile int _set;
62
63
64
65
66
67
68 public void setMaxIdleTime(long maxIdleTime)
69 {
70 _maxIdleTime=(int)maxIdleTime;
71 }
72
73
74
75
76
77 public void setSelectSets(int selectSets)
78 {
79 long lrc = _lowResourcesConnections * _selectSets;
80 _selectSets=selectSets;
81 _lowResourcesConnections=lrc/_selectSets;
82 }
83
84
85
86
87
88 public long getMaxIdleTime()
89 {
90 return _maxIdleTime;
91 }
92
93
94
95
96
97 public int getSelectSets()
98 {
99 return _selectSets;
100 }
101
102
103
104
105
106
107 public SelectSet getSelectSet(int i)
108 {
109 return _selectSet[i];
110 }
111
112
113
114
115
116 public void register(SocketChannel channel, Object att)
117 {
118
119
120
121
122 int s=_set++;
123 s=s%_selectSets;
124 SelectSet[] sets=_selectSet;
125 if (sets!=null)
126 {
127 SelectSet set=sets[s];
128 set.addChange(channel,att);
129 set.wakeup();
130 }
131 }
132
133
134
135
136
137 public void register(ServerSocketChannel acceptChannel)
138 {
139 int s=_set++;
140 s=s%_selectSets;
141 SelectSet set=_selectSet[s];
142 set.addChange(acceptChannel);
143 set.wakeup();
144 }
145
146
147
148
149
150 public long getLowResourcesConnections()
151 {
152 return _lowResourcesConnections*_selectSets;
153 }
154
155
156
157
158
159
160
161
162 public void setLowResourcesConnections(long lowResourcesConnections)
163 {
164 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
165 }
166
167
168
169
170
171 public long getLowResourcesMaxIdleTime()
172 {
173 return _lowResourcesMaxIdleTime;
174 }
175
176
177
178
179
180
181 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
182 {
183 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
184 }
185
186
187
188
189
190
191 public void doSelect(int acceptorID) throws IOException
192 {
193 SelectSet[] sets= _selectSet;
194 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
195 sets[acceptorID].doSelect();
196 }
197
198
199
200
201
202
203
204 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
205
206
207 public abstract boolean dispatch(Runnable task);
208
209
210
211
212
213 @Override
214 protected void doStart() throws Exception
215 {
216 _selectSet = new SelectSet[_selectSets];
217 for (int i=0;i<_selectSet.length;i++)
218 _selectSet[i]= new SelectSet(i);
219
220 super.doStart();
221 }
222
223
224
225 @Override
226 protected void doStop() throws Exception
227 {
228 SelectSet[] sets= _selectSet;
229 _selectSet=null;
230 if (sets!=null)
231 {
232 for (SelectSet set : sets)
233 {
234 if (set!=null)
235 set.stop();
236 }
237 }
238 super.doStop();
239 }
240
241
242
243
244
245 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
246
247
248
249
250
251 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
252
253
254 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
255
256
257 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
258
259
260
261
262
263
264
265
266
267
268 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
269
270
271 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
272 {
273 Log.warn(ex+","+channel+","+attachment);
274 Log.debug(ex);
275 }
276
277
278 public void dump()
279 {
280 for (final SelectSet set :_selectSet)
281 {
282 Thread selecting = set._selecting;
283 Log.info("SelectSet "+set._setID+" : "+selecting);
284 if (selecting!=null)
285 {
286 StackTraceElement[] trace =selecting.getStackTrace();
287 if (trace!=null)
288 {
289 for (StackTraceElement e : trace)
290 {
291 Log.info("\tat "+e.toString());
292 }
293 }
294 }
295
296 set.addChange(new ChangeTask(){
297 public void run()
298 {
299 set.dump();
300 }
301 });
302 }
303 }
304
305
306
307
308
309 public class SelectSet
310 {
311 private final int _setID;
312 private final Timeout _timeout;
313
314 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
315
316 private Selector _selector;
317
318 private int _nextSet;
319 private volatile Thread _selecting;
320 private int _jvmBug;
321 private int _selects;
322 private long _monitorStart;
323 private long _monitorNext;
324 private boolean _pausing;
325 private SelectionKey _busyKey;
326 private int _busyKeyCount;
327 private long _log;
328 private int _paused;
329 private int _jvmFix0;
330 private int _jvmFix1;
331 private int _jvmFix2;
332 private volatile long _idleTick;
333 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
334
335
336 SelectSet(int acceptorID) throws Exception
337 {
338 _setID=acceptorID;
339
340 _idleTick = System.currentTimeMillis();
341 _timeout = new Timeout(this);
342 _timeout.setDuration(0L);
343
344
345 _selector = Selector.open();
346 _monitorStart=System.currentTimeMillis();
347 _monitorNext=_monitorStart+__MONITOR_PERIOD;
348 _log=_monitorStart+60000;
349 }
350
351
352 public void addChange(Object point)
353 {
354 _changes.add(point);
355 }
356
357
358 public void addChange(SelectableChannel channel, Object att)
359 {
360 if (att==null)
361 addChange(channel);
362 else if (att instanceof EndPoint)
363 addChange(att);
364 else
365 addChange(new ChangeSelectableChannel(channel,att));
366 }
367
368
369
370
371
372
373
374 public void doSelect() throws IOException
375 {
376 try
377 {
378 _selecting=Thread.currentThread();
379 final Selector selector=_selector;
380
381
382 Object change;
383 int changes=_changes.size();
384 while (changes-->0 && (change=_changes.poll())!=null)
385 {
386 try
387 {
388 if (change instanceof EndPoint)
389 {
390
391 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
392 endpoint.doUpdateKey();
393 }
394 else if (change instanceof Runnable)
395 {
396 dispatch((Runnable)change);
397 }
398 else if (change instanceof ChangeSelectableChannel)
399 {
400
401 final ChangeSelectableChannel asc = (ChangeSelectableChannel)change;
402 final SelectableChannel channel=asc._channel;
403 final Object att = asc._attachment;
404
405 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
406 {
407 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
408 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
409 key.attach(endpoint);
410 endpoint.schedule();
411 }
412 else if (channel.isOpen())
413 {
414 channel.register(selector,SelectionKey.OP_CONNECT,att);
415 }
416 }
417 else if (change instanceof SocketChannel)
418 {
419 final SocketChannel channel=(SocketChannel)change;
420
421 if (channel.isConnected())
422 {
423 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
424 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
425 key.attach(endpoint);
426 endpoint.schedule();
427 }
428 else if (channel.isOpen())
429 {
430 channel.register(selector,SelectionKey.OP_CONNECT,null);
431 }
432 }
433 else if (change instanceof ServerSocketChannel)
434 {
435 ServerSocketChannel channel = (ServerSocketChannel)change;
436 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
437 }
438 else if (change instanceof ChangeTask)
439 {
440 ((ChangeTask)change).run();
441 }
442 else
443 throw new IllegalArgumentException(change.toString());
444 }
445 catch (Exception e)
446 {
447 if (isRunning())
448 Log.warn(e);
449 else
450 Log.debug(e);
451 }
452 }
453
454 long retry_next;
455 long now=System.currentTimeMillis();
456 _timeout.setNow(now);
457
458 retry_next=_timeout.getTimeToNext();
459
460
461 long wait = 1000L;
462 if (wait > 0 && retry_next >= 0 && wait > retry_next)
463 wait = retry_next;
464
465
466 if (wait > 0)
467 {
468
469 if (_pausing)
470 {
471 try
472 {
473 Thread.sleep(__BUSY_PAUSE);
474 }
475 catch(InterruptedException e)
476 {
477 Log.ignore(e);
478 }
479 }
480
481 long before=now;
482 int selected=selector.select(wait);
483 now = System.currentTimeMillis();
484 _timeout.setNow(now);
485 _selects++;
486
487
488
489
490 if (now>_monitorNext)
491 {
492 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
493 _pausing=_selects>__MAX_SELECTS;
494 if (_pausing)
495 _paused++;
496
497 _selects=0;
498 _jvmBug=0;
499 _monitorStart=now;
500 _monitorNext=now+__MONITOR_PERIOD;
501 }
502
503 if (now>_log)
504 {
505 if (_paused>0)
506 Log.info(this+" Busy selector - injecting delay "+_paused+" times");
507
508 if (_jvmFix2>0)
509 Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
510
511 if (_jvmFix1>0)
512 Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
513
514 else if(Log.isDebugEnabled() && _jvmFix0>0)
515 Log.info(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
516 _paused=0;
517 _jvmFix2=0;
518 _jvmFix1=0;
519 _jvmFix0=0;
520 _log=now+60000;
521 }
522
523
524 if (selected==0 && wait>10 && (now-before)<(wait/2))
525 {
526
527 _jvmBug++;
528 if (_jvmBug>(__JVMBUG_THRESHHOLD))
529 {
530 try
531 {
532 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
533 _jvmFix2++;
534
535 Thread.sleep(__BUSY_PAUSE);
536 }
537 catch(InterruptedException e)
538 {
539 Log.ignore(e);
540 }
541 }
542 else if (_jvmBug==__JVMBUG_THRESHHOLD)
543 {
544 synchronized (this)
545 {
546
547 final Selector new_selector = Selector.open();
548 for (SelectionKey k: selector.keys())
549 {
550 if (!k.isValid() || k.interestOps()==0)
551 continue;
552
553 final SelectableChannel channel = k.channel();
554 final Object attachment = k.attachment();
555
556 if (attachment==null)
557 addChange(channel);
558 else
559 addChange(channel,attachment);
560 }
561 _selector.close();
562 _selector=new_selector;
563 return;
564 }
565 }
566 else if (_jvmBug%32==31)
567 {
568
569 int cancelled=0;
570 for (SelectionKey k: selector.keys())
571 {
572 if (k.isValid()&&k.interestOps()==0)
573 {
574 k.cancel();
575 cancelled++;
576 }
577 }
578 if (cancelled>0)
579 _jvmFix0++;
580
581 return;
582 }
583 }
584 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
585 {
586
587 SelectionKey busy = selector.selectedKeys().iterator().next();
588 if (busy==_busyKey)
589 {
590 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
591 {
592 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
593 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
594 busy.cancel();
595 if (endpoint!=null)
596 {
597 dispatch(new Runnable()
598 {
599 public void run()
600 {
601 try
602 {
603 endpoint.close();
604 }
605 catch (IOException e)
606 {
607 Log.ignore(e);
608 }
609 }
610 });
611 }
612 }
613 }
614 else
615 _busyKeyCount=0;
616 _busyKey=busy;
617 }
618 }
619 else
620 {
621 selector.selectNow();
622 _selects++;
623 }
624
625
626 if (_selector==null || !selector.isOpen())
627 return;
628
629
630 for (SelectionKey key: selector.selectedKeys())
631 {
632 try
633 {
634 if (!key.isValid())
635 {
636 key.cancel();
637 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
638 if (endpoint != null)
639 endpoint.doUpdateKey();
640 continue;
641 }
642
643 Object att = key.attachment();
644 if (att instanceof SelectChannelEndPoint)
645 {
646 ((SelectChannelEndPoint)att).schedule();
647 }
648 else if (key.isAcceptable())
649 {
650 SocketChannel channel = acceptChannel(key);
651 if (channel==null)
652 continue;
653
654 channel.configureBlocking(false);
655
656
657 _nextSet=++_nextSet%_selectSet.length;
658
659
660 if (_nextSet==_setID)
661 {
662
663 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
664
665 SelectChannelEndPoint endpoint=_selectSet[_nextSet].createEndPoint(channel,cKey);
666 cKey.attach(endpoint);
667 if (endpoint != null)
668 endpoint.schedule();
669 }
670 else
671 {
672
673 _selectSet[_nextSet].addChange(channel);
674 _selectSet[_nextSet].wakeup();
675 }
676 }
677 else if (key.isConnectable())
678 {
679
680 SocketChannel channel = (SocketChannel)key.channel();
681 boolean connected=false;
682 try
683 {
684 connected=channel.finishConnect();
685 }
686 catch(Exception e)
687 {
688 connectionFailed(channel,e,att);
689 }
690 finally
691 {
692 if (connected)
693 {
694 key.interestOps(SelectionKey.OP_READ);
695 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
696 key.attach(endpoint);
697 endpoint.schedule();
698 }
699 else
700 {
701 key.cancel();
702 }
703 }
704 }
705 else
706 {
707
708 SocketChannel channel = (SocketChannel)key.channel();
709 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
710 key.attach(endpoint);
711 if (key.isReadable())
712 endpoint.schedule();
713 }
714 key = null;
715 }
716 catch (CancelledKeyException e)
717 {
718 Log.ignore(e);
719 }
720 catch (Exception e)
721 {
722 if (isRunning())
723 Log.warn(e);
724 else
725 Log.ignore(e);
726
727 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
728 key.cancel();
729 }
730 }
731
732
733 selector.selectedKeys().clear();
734
735 _timeout.setNow(now);
736 Task task = _timeout.expired();
737 while (task!=null)
738 {
739 if (task instanceof Runnable)
740 dispatch((Runnable)task);
741 else
742 task.expired();
743
744 task = _timeout.expired();
745 }
746
747
748 if (now-_idleTick>1000)
749 {
750 _idleTick=now;
751
752 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
753 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
754 :now;
755
756 dispatch(new Runnable()
757 {
758 public void run()
759 {
760 for (SelectChannelEndPoint endp:_endPoints.keySet())
761 {
762 endp.checkIdleTimestamp(idle_now);
763 }
764 }
765 });
766 }
767 }
768 catch (CancelledKeyException e)
769 {
770 Log.ignore(e);
771 }
772 finally
773 {
774 _selecting=null;
775 }
776 }
777
778
779 public SelectorManager getManager()
780 {
781 return SelectorManager.this;
782 }
783
784
785 public long getNow()
786 {
787 return _timeout.getNow();
788 }
789
790
791
792
793
794
795
796
797 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
798 {
799 _timeout.schedule(task, timeoutMs);
800 }
801
802
803 public void cancelTimeout(Timeout.Task task)
804 {
805 task.cancel();
806 }
807
808
809 public void wakeup()
810 {
811 Selector selector = _selector;
812 if (selector!=null)
813 selector.wakeup();
814 }
815
816
817 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
818 {
819 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
820 endPointOpened(endp);
821 _endPoints.put(endp,this);
822 return endp;
823 }
824
825
826 public void destroyEndPoint(SelectChannelEndPoint endp)
827 {
828 _endPoints.remove(endp);
829 endPointClosed(endp);
830 }
831
832
833 Selector getSelector()
834 {
835 return _selector;
836 }
837
838
839 void stop() throws Exception
840 {
841 boolean selecting=true;
842 while(selecting)
843 {
844 wakeup();
845 selecting=_selecting!=null;
846 }
847
848 for (SelectionKey key:_selector.keys())
849 {
850 if (key==null)
851 continue;
852 Object att=key.attachment();
853 if (att instanceof EndPoint)
854 {
855 EndPoint endpoint = (EndPoint)att;
856 try
857 {
858 endpoint.close();
859 }
860 catch(IOException e)
861 {
862 Log.ignore(e);
863 }
864 }
865 }
866
867 synchronized (this)
868 {
869 selecting=_selecting!=null;
870 while(selecting)
871 {
872 wakeup();
873 selecting=_selecting!=null;
874 }
875
876 _timeout.cancelAll();
877 try
878 {
879 if (_selector != null)
880 _selector.close();
881 }
882 catch (IOException e)
883 {
884 Log.ignore(e);
885 }
886 _selector=null;
887 }
888 }
889
890 public void dump()
891 {
892 synchronized (System.err)
893 {
894 Selector selector=_selector;
895 Log.info("SelectSet "+_setID+" "+selector.keys().size());
896 for (SelectionKey key: selector.keys())
897 {
898 if (key.isValid())
899 Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment());
900 else
901 Log.info(key.channel()+" - - "+key.attachment());
902 }
903 }
904 }
905 }
906
907
908 private static class ChangeSelectableChannel
909 {
910 final SelectableChannel _channel;
911 final Object _attachment;
912
913 public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
914 {
915 super();
916 _channel = channel;
917 _attachment = attachment;
918 }
919 }
920
921
922 private interface ChangeTask
923 {
924 public void run();
925 }
926 }