1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.Selector;
21 import java.nio.channels.ServerSocketChannel;
22 import java.nio.channels.SocketChannel;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.ConcurrentMap;
29
30 import org.eclipse.jetty.io.ConnectedEndPoint;
31 import org.eclipse.jetty.io.Connection;
32 import org.eclipse.jetty.io.EndPoint;
33 import org.eclipse.jetty.util.component.AbstractLifeCycle;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.thread.Timeout;
36 import org.eclipse.jetty.util.thread.Timeout.Task;
37
38
39
40
41
42
43
44
45
46
47 public abstract class SelectorManager extends AbstractLifeCycle
48 {
49
50 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
51 private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
52 private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
53 private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
54 private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
55
56 private int _maxIdleTime;
57 private int _lowResourcesMaxIdleTime;
58 private long _lowResourcesConnections;
59 private transient SelectSet[] _selectSet;
60 private int _selectSets=1;
61 private volatile int _set;
62
63
64
65
66
67
68 public void setMaxIdleTime(long maxIdleTime)
69 {
70 _maxIdleTime=(int)maxIdleTime;
71 }
72
73
74
75
76
77 public void setSelectSets(int selectSets)
78 {
79 long lrc = _lowResourcesConnections * _selectSets;
80 _selectSets=selectSets;
81 _lowResourcesConnections=lrc/_selectSets;
82 }
83
84
85
86
87
88 public long getMaxIdleTime()
89 {
90 return _maxIdleTime;
91 }
92
93
94
95
96
97 public int getSelectSets()
98 {
99 return _selectSets;
100 }
101
102
103
104
105
106
107 public void register(SocketChannel channel, Object att)
108 {
109
110
111
112
113 int s=_set++;
114 s=s%_selectSets;
115 SelectSet[] sets=_selectSet;
116 if (sets!=null)
117 {
118 SelectSet set=sets[s];
119 set.addChange(channel,att);
120 set.wakeup();
121 }
122 }
123
124
125
126
127
128 public void register(ServerSocketChannel acceptChannel)
129 {
130 int s=_set++;
131 s=s%_selectSets;
132 SelectSet set=_selectSet[s];
133 set.addChange(acceptChannel);
134 set.wakeup();
135 }
136
137
138
139
140
141 public long getLowResourcesConnections()
142 {
143 return _lowResourcesConnections*_selectSets;
144 }
145
146
147
148
149
150
151
152
153 public void setLowResourcesConnections(long lowResourcesConnections)
154 {
155 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
156 }
157
158
159
160
161
162 public long getLowResourcesMaxIdleTime()
163 {
164 return _lowResourcesMaxIdleTime;
165 }
166
167
168
169
170
171
172 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
173 {
174 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
175 }
176
177
178
179
180
181
182 public void doSelect(int acceptorID) throws IOException
183 {
184 SelectSet[] sets= _selectSet;
185 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
186 sets[acceptorID].doSelect();
187 }
188
189
190
191
192
193
194
195 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
196
197
198 public abstract boolean dispatch(Runnable task);
199
200
201
202
203
204 @Override
205 protected void doStart() throws Exception
206 {
207 _selectSet = new SelectSet[_selectSets];
208 for (int i=0;i<_selectSet.length;i++)
209 _selectSet[i]= new SelectSet(i);
210
211 super.doStart();
212 }
213
214
215
216 @Override
217 protected void doStop() throws Exception
218 {
219 SelectSet[] sets= _selectSet;
220 _selectSet=null;
221 if (sets!=null)
222 {
223 for (SelectSet set : sets)
224 {
225 if (set!=null)
226 set.stop();
227 }
228 }
229 super.doStop();
230 }
231
232
233
234
235
236 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
237
238
239
240
241
242 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
243
244
245 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
246
247
248 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
249
250
251
252
253
254
255
256
257
258
259 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
260
261
262 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
263 {
264 Log.warn(ex+","+channel+","+attachment);
265 Log.debug(ex);
266 }
267
268
269 public void dump()
270 {
271 for (final SelectSet set :_selectSet)
272 {
273 Thread selecting = set._selecting;
274 Log.info("SelectSet "+set._setID+" : "+selecting);
275 if (selecting!=null)
276 {
277 StackTraceElement[] trace =selecting.getStackTrace();
278 if (trace!=null)
279 {
280 for (StackTraceElement e : trace)
281 {
282 Log.info("\tat "+e.toString());
283 }
284 }
285 }
286
287 set.addChange(new ChangeTask(){
288 public void run()
289 {
290 set.dump();
291 }
292 });
293 }
294 }
295
296
297
298
299
300 public class SelectSet
301 {
302 private final int _setID;
303 private final Timeout _timeout;
304
305 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
306
307 private Selector _selector;
308
309 private int _nextSet;
310 private volatile Thread _selecting;
311 private int _jvmBug;
312 private int _selects;
313 private long _monitorStart;
314 private long _monitorNext;
315 private boolean _pausing;
316 private SelectionKey _busyKey;
317 private int _busyKeyCount;
318 private long _log;
319 private int _paused;
320 private int _jvmFix0;
321 private int _jvmFix1;
322 private int _jvmFix2;
323 private volatile long _idleTick;
324 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
325
326
327 SelectSet(int acceptorID) throws Exception
328 {
329 _setID=acceptorID;
330
331 _idleTick = System.currentTimeMillis();
332 _timeout = new Timeout(this);
333 _timeout.setDuration(0L);
334
335
336 _selector = Selector.open();
337 _monitorStart=System.currentTimeMillis();
338 _monitorNext=_monitorStart+__MONITOR_PERIOD;
339 _log=_monitorStart+60000;
340 }
341
342
343 public void addChange(Object point)
344 {
345 _changes.add(point);
346 }
347
348
349 public void addChange(SelectableChannel channel, Object att)
350 {
351 if (att==null)
352 addChange(channel);
353 else if (att instanceof EndPoint)
354 addChange(att);
355 else
356 addChange(new ChangeSelectableChannel(channel,att));
357 }
358
359
360
361
362
363
364
365 public void doSelect() throws IOException
366 {
367 try
368 {
369 _selecting=Thread.currentThread();
370 final Selector selector=_selector;
371
372
373 Object change;
374 int changes=_changes.size();
375 while (changes-->0 && (change=_changes.poll())!=null)
376 {
377 try
378 {
379 if (change instanceof EndPoint)
380 {
381
382 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
383 endpoint.doUpdateKey();
384 }
385 else if (change instanceof Runnable)
386 {
387 dispatch((Runnable)change);
388 }
389 else if (change instanceof ChangeSelectableChannel)
390 {
391
392 final ChangeSelectableChannel asc = (ChangeSelectableChannel)change;
393 final SelectableChannel channel=asc._channel;
394 final Object att = asc._attachment;
395
396 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
397 {
398 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
399 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
400 key.attach(endpoint);
401 endpoint.schedule();
402 }
403 else if (channel.isOpen())
404 {
405 channel.register(selector,SelectionKey.OP_CONNECT,att);
406 }
407 }
408 else if (change instanceof SocketChannel)
409 {
410 final SocketChannel channel=(SocketChannel)change;
411
412 if (channel.isConnected())
413 {
414 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
415 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
416 key.attach(endpoint);
417 endpoint.schedule();
418 }
419 else if (channel.isOpen())
420 {
421 channel.register(selector,SelectionKey.OP_CONNECT,null);
422 }
423 }
424 else if (change instanceof ServerSocketChannel)
425 {
426 ServerSocketChannel channel = (ServerSocketChannel)change;
427 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
428 }
429 else if (change instanceof ChangeTask)
430 {
431 ((ChangeTask)change).run();
432 }
433 else
434 throw new IllegalArgumentException(change.toString());
435 }
436 catch (Exception e)
437 {
438 if (isRunning())
439 Log.warn(e);
440 else
441 Log.debug(e);
442 }
443 }
444
445 long retry_next;
446 long now=System.currentTimeMillis();
447 _timeout.setNow(now);
448
449 retry_next=_timeout.getTimeToNext();
450
451
452 long wait = 1000L;
453 if (wait > 0 && retry_next >= 0 && wait > retry_next)
454 wait = retry_next;
455
456
457 if (wait > 0)
458 {
459
460 if (_pausing)
461 {
462 try
463 {
464 Thread.sleep(__BUSY_PAUSE);
465 }
466 catch(InterruptedException e)
467 {
468 Log.ignore(e);
469 }
470 }
471
472 long before=now;
473 int selected=selector.select(wait);
474 now = System.currentTimeMillis();
475 _timeout.setNow(now);
476 _selects++;
477
478
479
480
481 if (now>_monitorNext)
482 {
483 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
484 _pausing=_selects>__MAX_SELECTS;
485 if (_pausing)
486 _paused++;
487
488 _selects=0;
489 _jvmBug=0;
490 _monitorStart=now;
491 _monitorNext=now+__MONITOR_PERIOD;
492 }
493
494 if (now>_log)
495 {
496 if (_paused>0)
497 Log.info(this+" Busy selector - injecting delay "+_paused+" times");
498
499 if (_jvmFix2>0)
500 Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
501
502 if (_jvmFix1>0)
503 Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
504
505 else if(Log.isDebugEnabled() && _jvmFix0>0)
506 Log.info(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
507 _paused=0;
508 _jvmFix2=0;
509 _jvmFix1=0;
510 _jvmFix0=0;
511 _log=now+60000;
512 }
513
514
515 if (selected==0 && wait>10 && (now-before)<(wait/2))
516 {
517
518 _jvmBug++;
519 if (_jvmBug>(__JVMBUG_THRESHHOLD))
520 {
521 try
522 {
523 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
524 _jvmFix2++;
525
526 Thread.sleep(__BUSY_PAUSE);
527 }
528 catch(InterruptedException e)
529 {
530 Log.ignore(e);
531 }
532 }
533 else if (_jvmBug==__JVMBUG_THRESHHOLD)
534 {
535 synchronized (this)
536 {
537
538 final Selector new_selector = Selector.open();
539 for (SelectionKey k: selector.keys())
540 {
541 if (!k.isValid() || k.interestOps()==0)
542 continue;
543
544 final SelectableChannel channel = k.channel();
545 final Object attachment = k.attachment();
546
547 if (attachment==null)
548 addChange(channel);
549 else
550 addChange(channel,attachment);
551 }
552 _selector.close();
553 _selector=new_selector;
554 return;
555 }
556 }
557 else if (_jvmBug%32==31)
558 {
559
560 int cancelled=0;
561 for (SelectionKey k: selector.keys())
562 {
563 if (k.isValid()&&k.interestOps()==0)
564 {
565 k.cancel();
566 cancelled++;
567 }
568 }
569 if (cancelled>0)
570 _jvmFix0++;
571
572 return;
573 }
574 }
575 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
576 {
577
578 SelectionKey busy = selector.selectedKeys().iterator().next();
579 if (busy==_busyKey)
580 {
581 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
582 {
583 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
584 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
585 busy.cancel();
586 if (endpoint!=null)
587 {
588 dispatch(new Runnable()
589 {
590 public void run()
591 {
592 try
593 {
594 endpoint.close();
595 }
596 catch (IOException e)
597 {
598 Log.ignore(e);
599 }
600 }
601 });
602 }
603 }
604 }
605 else
606 _busyKeyCount=0;
607 _busyKey=busy;
608 }
609 }
610 else
611 {
612 selector.selectNow();
613 _selects++;
614 }
615
616
617 if (_selector==null || !selector.isOpen())
618 return;
619
620
621 for (SelectionKey key: selector.selectedKeys())
622 {
623 try
624 {
625 if (!key.isValid())
626 {
627 key.cancel();
628 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
629 if (endpoint != null)
630 endpoint.doUpdateKey();
631 continue;
632 }
633
634 Object att = key.attachment();
635 if (att instanceof SelectChannelEndPoint)
636 {
637 ((SelectChannelEndPoint)att).schedule();
638 }
639 else if (key.isAcceptable())
640 {
641 SocketChannel channel = acceptChannel(key);
642 if (channel==null)
643 continue;
644
645 channel.configureBlocking(false);
646
647
648 _nextSet=++_nextSet%_selectSet.length;
649
650
651 if (_nextSet==_setID)
652 {
653
654 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
655
656 SelectChannelEndPoint endpoint=_selectSet[_nextSet].createEndPoint(channel,cKey);
657 cKey.attach(endpoint);
658 if (endpoint != null)
659 endpoint.schedule();
660 }
661 else
662 {
663
664 _selectSet[_nextSet].addChange(channel);
665 _selectSet[_nextSet].wakeup();
666 }
667 }
668 else if (key.isConnectable())
669 {
670
671 SocketChannel channel = (SocketChannel)key.channel();
672 boolean connected=false;
673 try
674 {
675 connected=channel.finishConnect();
676 }
677 catch(Exception e)
678 {
679 connectionFailed(channel,e,att);
680 }
681 finally
682 {
683 if (connected)
684 {
685 key.interestOps(SelectionKey.OP_READ);
686 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
687 key.attach(endpoint);
688 endpoint.schedule();
689 }
690 else
691 {
692 key.cancel();
693 }
694 }
695 }
696 else
697 {
698
699 SocketChannel channel = (SocketChannel)key.channel();
700 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
701 key.attach(endpoint);
702 if (key.isReadable())
703 endpoint.schedule();
704 }
705 key = null;
706 }
707 catch (CancelledKeyException e)
708 {
709 Log.ignore(e);
710 }
711 catch (Exception e)
712 {
713 if (isRunning())
714 Log.warn(e);
715 else
716 Log.ignore(e);
717
718 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
719 key.cancel();
720 }
721 }
722
723
724 selector.selectedKeys().clear();
725
726 _timeout.setNow(now);
727 Task task = _timeout.expired();
728 while (task!=null)
729 {
730 if (task instanceof Runnable)
731 dispatch((Runnable)task);
732 else
733 task.expired();
734
735 task = _timeout.expired();
736 }
737
738
739 if (now-_idleTick>1000)
740 {
741 _idleTick=now;
742
743 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
744 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
745 :now;
746
747 dispatch(new Runnable()
748 {
749 public void run()
750 {
751 for (SelectChannelEndPoint endp:_endPoints.keySet())
752 {
753 endp.checkIdleTimestamp(idle_now);
754 }
755 }
756 });
757 }
758 }
759 catch (CancelledKeyException e)
760 {
761 Log.ignore(e);
762 }
763 finally
764 {
765 _selecting=null;
766 }
767 }
768
769
770 public SelectorManager getManager()
771 {
772 return SelectorManager.this;
773 }
774
775
776 public long getNow()
777 {
778 return _timeout.getNow();
779 }
780
781
782
783
784
785
786
787
788 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
789 {
790 _timeout.schedule(task, timeoutMs);
791 }
792
793
794 public void cancelTimeout(Timeout.Task task)
795 {
796 task.cancel();
797 }
798
799
800 public void wakeup()
801 {
802 Selector selector = _selector;
803 if (selector!=null)
804 selector.wakeup();
805 }
806
807
808 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
809 {
810 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
811 endPointOpened(endp);
812 _endPoints.put(endp,this);
813 return endp;
814 }
815
816
817 public void destroyEndPoint(SelectChannelEndPoint endp)
818 {
819 _endPoints.remove(endp);
820 endPointClosed(endp);
821 }
822
823
824 Selector getSelector()
825 {
826 return _selector;
827 }
828
829
830 void stop() throws Exception
831 {
832 boolean selecting=true;
833 while(selecting)
834 {
835 wakeup();
836 selecting=_selecting!=null;
837 }
838
839 for (SelectionKey key:_selector.keys())
840 {
841 if (key==null)
842 continue;
843 Object att=key.attachment();
844 if (att instanceof EndPoint)
845 {
846 EndPoint endpoint = (EndPoint)att;
847 try
848 {
849 endpoint.close();
850 }
851 catch(IOException e)
852 {
853 Log.ignore(e);
854 }
855 }
856 }
857
858 synchronized (this)
859 {
860 selecting=_selecting!=null;
861 while(selecting)
862 {
863 wakeup();
864 selecting=_selecting!=null;
865 }
866
867 _timeout.cancelAll();
868 try
869 {
870 if (_selector != null)
871 _selector.close();
872 }
873 catch (IOException e)
874 {
875 Log.ignore(e);
876 }
877 _selector=null;
878 }
879 }
880
881 public void dump()
882 {
883 synchronized (System.err)
884 {
885 Selector selector=_selector;
886 Log.info("SelectSet "+_setID+" "+selector.keys().size());
887 for (SelectionKey key: selector.keys())
888 {
889 if (key.isValid())
890 Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment());
891 else
892 Log.info(key.channel()+" - - "+key.attachment());
893 }
894 }
895 }
896 }
897
898
899 private static class ChangeSelectableChannel
900 {
901 final SelectableChannel _channel;
902 final Object _attachment;
903
904 public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
905 {
906 super();
907 _channel = channel;
908 _attachment = attachment;
909 }
910 }
911
912
913 private interface ChangeTask
914 {
915 public void run();
916 }
917 }