1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.Selector;
21 import java.nio.channels.ServerSocketChannel;
22 import java.nio.channels.SocketChannel;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.eclipse.jetty.io.ConnectedEndPoint;
27 import org.eclipse.jetty.io.Connection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.util.component.AbstractLifeCycle;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.thread.Timeout;
32
33
34
35
36
37
38
39
40
41
42 public abstract class SelectorManager extends AbstractLifeCycle
43 {
44
45 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
46 private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
47 private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
48 private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
49 private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
50
51 private long _maxIdleTime;
52 private long _lowResourcesConnections;
53 private long _lowResourcesMaxIdleTime;
54 private transient SelectSet[] _selectSet;
55 private int _selectSets=1;
56 private volatile int _set;
57
58
59
60
61
62
63 public void setMaxIdleTime(long maxIdleTime)
64 {
65 _maxIdleTime=maxIdleTime;
66 }
67
68
69
70
71
72 public void setSelectSets(int selectSets)
73 {
74 long lrc = _lowResourcesConnections * _selectSets;
75 _selectSets=selectSets;
76 _lowResourcesConnections=lrc/_selectSets;
77 }
78
79
80
81
82
83 public long getMaxIdleTime()
84 {
85 return _maxIdleTime;
86 }
87
88
89
90
91
92 public int getSelectSets()
93 {
94 return _selectSets;
95 }
96
97
98
99
100
101
102
103 public void register(SocketChannel channel, Object att)
104 {
105 int s=_set++;
106 s=s%_selectSets;
107 SelectSet[] sets=_selectSet;
108 if (sets!=null)
109 {
110 SelectSet set=sets[s];
111 set.addChange(channel,att);
112 set.wakeup();
113 }
114 }
115
116
117
118
119
120
121
122 public void register(ServerSocketChannel acceptChannel)
123 {
124 int s=_set++;
125 s=s%_selectSets;
126 SelectSet set=_selectSet[s];
127 set.addChange(acceptChannel);
128 set.wakeup();
129 }
130
131
132
133
134
135 public long getLowResourcesConnections()
136 {
137 return _lowResourcesConnections*_selectSets;
138 }
139
140
141
142
143
144
145
146
147 public void setLowResourcesConnections(long lowResourcesConnections)
148 {
149 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
150 }
151
152
153
154
155
156 public long getLowResourcesMaxIdleTime()
157 {
158 return _lowResourcesMaxIdleTime;
159 }
160
161
162
163
164
165
166 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
167 {
168 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
169 }
170
171
172
173
174
175
176 public void doSelect(int acceptorID) throws IOException
177 {
178 SelectSet[] sets= _selectSet;
179 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
180 sets[acceptorID].doSelect();
181 }
182
183
184
185
186
187
188
189 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
190
191
192 public abstract boolean dispatch(Runnable task);
193
194
195
196
197
198 @Override
199 protected void doStart() throws Exception
200 {
201 _selectSet = new SelectSet[_selectSets];
202 for (int i=0;i<_selectSet.length;i++)
203 _selectSet[i]= new SelectSet(i);
204
205 super.doStart();
206 }
207
208
209
210 @Override
211 protected void doStop() throws Exception
212 {
213 SelectSet[] sets= _selectSet;
214 _selectSet=null;
215 if (sets!=null)
216 {
217 for (SelectSet set : sets)
218 {
219 if (set!=null)
220 set.stop();
221 }
222 }
223 super.doStop();
224 }
225
226
227
228
229
230 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
231
232
233
234
235
236 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
237
238
239 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
240
241
242 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
243
244
245
246
247
248
249
250
251
252 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
253
254
255 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
256 {
257 Log.warn(ex+","+channel+","+attachment);
258 Log.debug(ex);
259 }
260
261
262 public void dump()
263 {
264 for (final SelectSet set :_selectSet)
265 {
266 Thread selecting = set._selecting;
267 Log.info("SelectSet "+set._setID+" : "+selecting);
268 if (selecting!=null)
269 {
270 StackTraceElement[] trace =selecting.getStackTrace();
271 if (trace!=null)
272 {
273 for (StackTraceElement e : trace)
274 {
275 Log.info("\tat "+e.toString());
276 }
277 }
278 }
279
280 set.addChange(new ChangeTask(){
281 public void run()
282 {
283 set.dump();
284 }
285 });
286 }
287 }
288
289
290
291
292
293 public class SelectSet
294 {
295 private final int _setID;
296 private final Timeout _idleTimeout;
297 private final Timeout _timeout;
298 private final List<Object>[] _changes;
299
300 private int _change;
301 private int _nextSet;
302 private Selector _selector;
303 private volatile Thread _selecting;
304 private int _jvmBug;
305 private int _selects;
306 private long _monitorStart;
307 private long _monitorNext;
308 private boolean _pausing;
309 private SelectionKey _busyKey;
310 private int _busyKeyCount;
311 private long _log;
312 private int _paused;
313 private int _jvmFix0;
314 private int _jvmFix1;
315 private int _jvmFix2;
316
317
318 SelectSet(int acceptorID) throws Exception
319 {
320 _setID=acceptorID;
321
322 _idleTimeout = new Timeout(this);
323 _idleTimeout.setDuration(getMaxIdleTime());
324 _timeout = new Timeout(this);
325 _timeout.setDuration(0L);
326 _changes = new List[] {new ArrayList(),new ArrayList()};
327
328
329 _selector = Selector.open();
330 _change=0;
331 _monitorStart=System.currentTimeMillis();
332 _monitorNext=_monitorStart+__MONITOR_PERIOD;
333 _log=_monitorStart+60000;
334 }
335
336
337 public void addChange(Object point)
338 {
339 synchronized (_changes)
340 {
341 _changes[_change].add(point);
342 }
343 }
344
345
346 public void addChange(SelectableChannel channel, Object att)
347 {
348 if (att==null)
349 addChange(channel);
350 else if (att instanceof EndPoint)
351 addChange(att);
352 else
353 addChange(new ChangeSelectableChannel(channel,att));
354 }
355
356
357 public void cancelIdle(Timeout.Task task)
358 {
359 task.cancel();
360 }
361
362
363
364
365
366
367
368 public void doSelect() throws IOException
369 {
370 try
371 {
372 _selecting=Thread.currentThread();
373 List<?> changes;
374 final Selector selector;
375 synchronized (_changes)
376 {
377 changes=_changes[_change];
378 _change=_change==0?1:0;
379 selector=_selector;
380 }
381
382
383 final int size=changes.size();
384 for (int i = 0; i < size; i++)
385 {
386 try
387 {
388 Object o = changes.get(i);
389
390 if (o instanceof EndPoint)
391 {
392
393 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
394 endpoint.doUpdateKey();
395 }
396 else if (o instanceof Runnable)
397 {
398 dispatch((Runnable)o);
399 }
400 else if (o instanceof ChangeSelectableChannel)
401 {
402
403 final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
404 final SelectableChannel channel=asc._channel;
405 final Object att = asc._attachment;
406
407 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
408 {
409 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
410 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
411 key.attach(endpoint);
412 endpoint.schedule();
413 }
414 else if (channel.isOpen())
415 {
416 channel.register(selector,SelectionKey.OP_CONNECT,att);
417 }
418 }
419 else if (o instanceof SocketChannel)
420 {
421 final SocketChannel channel=(SocketChannel)o;
422
423 if (channel.isConnected())
424 {
425 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
426 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
427 key.attach(endpoint);
428 endpoint.schedule();
429 }
430 else if (channel.isOpen())
431 {
432 channel.register(selector,SelectionKey.OP_CONNECT,null);
433 }
434 }
435 else if (o instanceof ServerSocketChannel)
436 {
437 ServerSocketChannel channel = (ServerSocketChannel)o;
438 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
439 }
440 else if (o instanceof ChangeTask)
441 {
442 ((ChangeTask)o).run();
443 }
444 else
445 throw new IllegalArgumentException(o.toString());
446 }
447 catch (Exception e)
448 {
449 if (isRunning())
450 Log.warn(e);
451 else
452 Log.debug(e);
453 }
454 }
455 changes.clear();
456
457 long idle_next;
458 long retry_next;
459 long now=System.currentTimeMillis();
460 synchronized (this)
461 {
462 _idleTimeout.setNow(now);
463 _timeout.setNow(now);
464
465 if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
466 _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
467 else
468 _idleTimeout.setDuration(_maxIdleTime);
469 idle_next=_idleTimeout.getTimeToNext();
470 retry_next=_timeout.getTimeToNext();
471 }
472
473
474 long wait = 1000L;
475 if (idle_next >= 0 && wait > idle_next)
476 wait = idle_next;
477 if (wait > 0 && retry_next >= 0 && wait > retry_next)
478 wait = retry_next;
479
480
481 if (wait > 0)
482 {
483
484 if (_pausing)
485 {
486 try
487 {
488 Thread.sleep(__BUSY_PAUSE);
489 }
490 catch(InterruptedException e)
491 {
492 Log.ignore(e);
493 }
494 }
495
496 long before=now;
497 int selected=selector.select(wait);
498 now = System.currentTimeMillis();
499 _idleTimeout.setNow(now);
500 _timeout.setNow(now);
501 _selects++;
502
503
504
505
506 if (now>_monitorNext)
507 {
508 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
509 _pausing=_selects>__MAX_SELECTS;
510 if (_pausing)
511 _paused++;
512
513 _selects=0;
514 _jvmBug=0;
515 _monitorStart=now;
516 _monitorNext=now+__MONITOR_PERIOD;
517 }
518
519 if (now>_log)
520 {
521 if (_paused>0)
522 Log.info(this+" Busy selector - injecting delay "+_paused+" times");
523
524 if (_jvmFix2>0)
525 Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
526
527 if (_jvmFix1>0)
528 Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, canceled keys "+_jvmFix0+" times");
529
530 else if(Log.isDebugEnabled() && _jvmFix0>0)
531 Log.info(this+" JVM BUG(s) - canceled keys "+_jvmFix0+" times");
532 _paused=0;
533 _jvmFix2=0;
534 _jvmFix1=0;
535 _jvmFix0=0;
536 _log=now+60000;
537 }
538
539
540 if (selected==0 && wait>10 && (now-before)<(wait/2))
541 {
542
543 _jvmBug++;
544 if (_jvmBug>(__JVMBUG_THRESHHOLD))
545 {
546 try
547 {
548 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
549 _jvmFix2++;
550
551 Thread.sleep(__BUSY_PAUSE);
552 }
553 catch(InterruptedException e)
554 {
555 Log.ignore(e);
556 }
557 }
558 else if (_jvmBug==__JVMBUG_THRESHHOLD)
559 {
560 synchronized (this)
561 {
562
563 final Selector new_selector = Selector.open();
564 for (SelectionKey k: selector.keys())
565 {
566 if (!k.isValid() || k.interestOps()==0)
567 continue;
568
569 final SelectableChannel channel = k.channel();
570 final Object attachment = k.attachment();
571
572 if (attachment==null)
573 addChange(channel);
574 else
575 addChange(channel,attachment);
576 }
577 _selector.close();
578 _selector=new_selector;
579 return;
580 }
581 }
582 else if (_jvmBug%32==31)
583 {
584
585 int cancelled=0;
586 for (SelectionKey k: selector.keys())
587 {
588 if (k.isValid()&&k.interestOps()==0)
589 {
590 k.cancel();
591 cancelled++;
592 }
593 }
594 if (cancelled>0)
595 _jvmFix0++;
596
597 return;
598 }
599 }
600 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
601 {
602
603 SelectionKey busy = selector.selectedKeys().iterator().next();
604 if (busy==_busyKey)
605 {
606 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
607 {
608 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
609 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
610 busy.cancel();
611 if (endpoint!=null)
612 endpoint.close();
613 }
614 }
615 else
616 _busyKeyCount=0;
617 _busyKey=busy;
618 }
619 }
620 else
621 {
622 selector.selectNow();
623 _selects++;
624 }
625
626
627 if (_selector==null || !selector.isOpen())
628 return;
629
630
631 for (SelectionKey key: selector.selectedKeys())
632 {
633 try
634 {
635 if (!key.isValid())
636 {
637 key.cancel();
638 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
639 if (endpoint != null)
640 endpoint.doUpdateKey();
641 continue;
642 }
643
644 Object att = key.attachment();
645 if (att instanceof SelectChannelEndPoint)
646 {
647 ((SelectChannelEndPoint)att).schedule();
648 }
649 else if (key.isAcceptable())
650 {
651 SocketChannel channel = acceptChannel(key);
652 if (channel==null)
653 continue;
654
655 channel.configureBlocking(false);
656
657
658 _nextSet=++_nextSet%_selectSet.length;
659
660
661 if (_nextSet==_setID)
662 {
663
664 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
665 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
666 cKey.attach(endpoint);
667 if (endpoint != null)
668 endpoint.schedule();
669 }
670 else
671 {
672
673 _selectSet[_nextSet].addChange(channel);
674 _selectSet[_nextSet].wakeup();
675 }
676 }
677 else if (key.isConnectable())
678 {
679
680 SocketChannel channel = (SocketChannel)key.channel();
681 boolean connected=false;
682 try
683 {
684 connected=channel.finishConnect();
685 }
686 catch(Exception e)
687 {
688 connectionFailed(channel,e,att);
689 }
690 finally
691 {
692 if (connected)
693 {
694 key.interestOps(SelectionKey.OP_READ);
695 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
696 key.attach(endpoint);
697 endpoint.schedule();
698 }
699 else
700 {
701 key.cancel();
702 }
703 }
704 }
705 else
706 {
707
708 SocketChannel channel = (SocketChannel)key.channel();
709 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
710 key.attach(endpoint);
711 if (key.isReadable())
712 endpoint.schedule();
713 }
714 key = null;
715 }
716 catch (CancelledKeyException e)
717 {
718 Log.ignore(e);
719 }
720 catch (Exception e)
721 {
722 if (isRunning())
723 Log.warn(e);
724 else
725 Log.ignore(e);
726
727 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
728 key.cancel();
729 }
730 }
731
732
733 selector.selectedKeys().clear();
734
735
736 _idleTimeout.tick(now);
737 _timeout.tick(now);
738 }
739 catch (CancelledKeyException e)
740 {
741 Log.ignore(e);
742 }
743 finally
744 {
745 _selecting=null;
746 }
747 }
748
749
750 public SelectorManager getManager()
751 {
752 return SelectorManager.this;
753 }
754
755
756 public long getNow()
757 {
758 return _idleTimeout.getNow();
759 }
760
761
762 public void scheduleIdle(Timeout.Task task)
763 {
764 if (_idleTimeout.getDuration() <= 0)
765 return;
766 _idleTimeout.schedule(task);
767 }
768
769
770 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
771 {
772 _timeout.schedule(task, timeoutMs);
773 }
774
775
776 public void cancelTimeout(Timeout.Task task)
777 {
778 task.cancel();
779 }
780
781
782 public void wakeup()
783 {
784 Selector selector = _selector;
785 if (selector!=null)
786 selector.wakeup();
787 }
788
789
790 Selector getSelector()
791 {
792 return _selector;
793 }
794
795
796 void stop() throws Exception
797 {
798 boolean selecting=true;
799 while(selecting)
800 {
801 wakeup();
802 selecting=_selecting!=null;
803 }
804
805 for (SelectionKey key:_selector.keys())
806 {
807 if (key==null)
808 continue;
809 Object att=key.attachment();
810 if (att instanceof EndPoint)
811 {
812 EndPoint endpoint = (EndPoint)att;
813 try
814 {
815 endpoint.close();
816 }
817 catch(IOException e)
818 {
819 Log.ignore(e);
820 }
821 }
822 }
823
824 synchronized (this)
825 {
826 selecting=_selecting!=null;
827 while(selecting)
828 {
829 wakeup();
830 selecting=_selecting!=null;
831 }
832
833 _idleTimeout.cancelAll();
834 _timeout.cancelAll();
835 try
836 {
837 if (_selector != null)
838 _selector.close();
839 }
840 catch (IOException e)
841 {
842 Log.ignore(e);
843 }
844 _selector=null;
845 }
846 }
847
848 public void dump()
849 {
850 synchronized (System.err)
851 {
852 Selector selector=_selector;
853 Log.info("SelectSet "+_setID+" "+selector.keys().size());
854 for (SelectionKey key: selector.keys())
855 {
856 if (key.isValid())
857 Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment());
858 else
859 Log.info(key.channel()+" - - "+key.attachment());
860 }
861 }
862 }
863 }
864
865
866 private static class ChangeSelectableChannel
867 {
868 final SelectableChannel _channel;
869 final Object _attachment;
870
871 public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
872 {
873 super();
874 _channel = channel;
875 _attachment = attachment;
876 }
877 }
878
879
880 private interface ChangeTask
881 {
882 public void run();
883 }
884 }