1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.CancelledKeyException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.Selector;
21 import java.nio.channels.ServerSocketChannel;
22 import java.nio.channels.SocketChannel;
23 import java.util.ArrayList;
24 import java.util.Date;
25 import java.util.Iterator;
26 import java.util.List;
27
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.component.AbstractLifeCycle;
31 import org.eclipse.jetty.util.log.Log;
32 import org.eclipse.jetty.util.thread.Timeout;
33
34
35
36
37
38
39
40
41
42
43 public abstract class SelectorManager extends AbstractLifeCycle
44 {
45 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",128);
46 private static final int __JVMBUG_THRESHHOLD2=__JVMBUG_THRESHHOLD*2;
47 private static final int __JVMBUG_THRESHHOLD1=(__JVMBUG_THRESHHOLD2+__JVMBUG_THRESHHOLD)/2;
48 private long _maxIdleTime;
49 private long _lowResourcesConnections;
50 private long _lowResourcesMaxIdleTime;
51 private transient SelectSet[] _selectSet;
52 private int _selectSets=1;
53 private volatile int _set;
54 private boolean _jvmBug0;
55 private boolean _jvmBug1;
56
57
58
59
60
61
62
63 public void setMaxIdleTime(long maxIdleTime)
64 {
65 _maxIdleTime=maxIdleTime;
66 }
67
68
69
70
71
72 public void setSelectSets(int selectSets)
73 {
74 long lrc = _lowResourcesConnections * _selectSets;
75 _selectSets=selectSets;
76 _lowResourcesConnections=lrc/_selectSets;
77 }
78
79
80
81
82
83 public long getMaxIdleTime()
84 {
85 return _maxIdleTime;
86 }
87
88
89
90
91
92 public int getSelectSets()
93 {
94 return _selectSets;
95 }
96
97
98
99
100
101
102
103 public void register(SocketChannel channel, Object att)
104 {
105 int s=_set++;
106 s=s%_selectSets;
107 SelectSet[] sets=_selectSet;
108 if (sets!=null)
109 {
110 SelectSet set=sets[s];
111 set.addChange(channel,att);
112 set.wakeup();
113 }
114 }
115
116
117
118
119
120
121
122 public void register(ServerSocketChannel acceptChannel)
123 {
124 int s=_set++;
125 s=s%_selectSets;
126 SelectSet set=_selectSet[s];
127 set.addChange(acceptChannel);
128 set.wakeup();
129 }
130
131
132
133
134
135 public long getLowResourcesConnections()
136 {
137 return _lowResourcesConnections*_selectSets;
138 }
139
140
141
142
143
144
145
146
147 public void setLowResourcesConnections(long lowResourcesConnections)
148 {
149 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
150 }
151
152
153
154
155
156 public long getLowResourcesMaxIdleTime()
157 {
158 return _lowResourcesMaxIdleTime;
159 }
160
161
162
163
164
165
166 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
167 {
168 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
169 }
170
171
172
173
174
175
176 public void doSelect(int acceptorID) throws IOException
177 {
178 SelectSet[] sets= _selectSet;
179 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
180 sets[acceptorID].doSelect();
181 }
182
183
184
185
186
187
188
189 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
190
191
192 public abstract boolean dispatch(Runnable task);
193
194
195
196
197
198 protected void doStart() throws Exception
199 {
200 _selectSet = new SelectSet[_selectSets];
201 for (int i=0;i<_selectSet.length;i++)
202 _selectSet[i]= new SelectSet(i);
203
204 super.doStart();
205 }
206
207
208
209 protected void doStop() throws Exception
210 {
211 SelectSet[] sets= _selectSet;
212 _selectSet=null;
213 if (sets!=null)
214 {
215 for (SelectSet set : sets)
216 {
217 if (set!=null)
218 set.stop();
219 }
220 }
221 super.doStop();
222 }
223
224
225
226
227
228 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
229
230
231
232
233
234 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
235
236
237 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
238
239
240
241
242
243
244
245
246
247 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
248
249
250 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
251 {
252 Log.warn(ex+","+channel+","+attachment);
253 Log.debug(ex);
254 }
255
256
257 public void dump()
258 {
259 for (final SelectSet set :_selectSet)
260 {
261 Thread selecting = set._selecting;
262 Log.info("SelectSet "+set._setID+" : "+selecting);
263 if (selecting!=null)
264 {
265 StackTraceElement[] trace =selecting.getStackTrace();
266 if (trace!=null)
267 {
268 for (StackTraceElement e : trace)
269 {
270 Log.info("\tat "+e.toString());
271 }
272 }
273 }
274
275 set.addChange(new ChangeTask(){
276 public void run()
277 {
278 set.dump();
279 }
280 });
281 }
282 }
283
284
285
286
287
288 public class SelectSet
289 {
290 private final int _setID;
291 private final Timeout _idleTimeout;
292 private final Timeout _timeout;
293 private final List<Object>[] _changes;
294
295 private int _change;
296 private int _nextSet;
297 private Selector _selector;
298 private int _jvmBug;
299 private volatile Thread _selecting;
300 private long _lastJVMBug;
301
302
303 SelectSet(int acceptorID) throws Exception
304 {
305 _setID=acceptorID;
306
307 _idleTimeout = new Timeout(this);
308 _idleTimeout.setDuration(getMaxIdleTime());
309 _timeout = new Timeout(this);
310 _timeout.setDuration(0L);
311 _changes = new List[] {new ArrayList(),new ArrayList()};
312
313
314 _selector = Selector.open();
315 _change=0;
316 }
317
318
319 public void addChange(Object point)
320 {
321 synchronized (_changes)
322 {
323 _changes[_change].add(point);
324 }
325 }
326
327
328 public void addChange(SelectableChannel channel, Object att)
329 {
330 if (att==null)
331 addChange(channel);
332 else if (att instanceof EndPoint)
333 addChange(att);
334 else
335 addChange(new ChangeSelectableChannel(channel,att));
336 }
337
338
339 public void cancelIdle(Timeout.Task task)
340 {
341 task.cancel();
342 }
343
344
345
346
347
348
349
350 public void doSelect() throws IOException
351 {
352 try
353 {
354 _selecting=Thread.currentThread();
355 List<?> changes;
356 final Selector selector;
357 synchronized (_changes)
358 {
359 changes=_changes[_change];
360 _change=_change==0?1:0;
361 selector=_selector;
362 }
363
364
365
366 final int size=changes.size();
367 for (int i = 0; i < size; i++)
368 {
369 try
370 {
371 Object o = changes.get(i);
372
373 if (o instanceof EndPoint)
374 {
375
376 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
377 endpoint.doUpdateKey();
378 }
379 else if (o instanceof Runnable)
380 {
381 dispatch((Runnable)o);
382 }
383 else if (o instanceof ChangeSelectableChannel)
384 {
385
386 final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
387 final SelectableChannel channel=asc._channel;
388 final Object att = asc._attachment;
389
390 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
391 {
392 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
393 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
394 key.attach(endpoint);
395 endpoint.schedule();
396 }
397 else if (channel.isOpen())
398 {
399 channel.register(selector,SelectionKey.OP_CONNECT,att);
400 }
401 }
402 else if (o instanceof SocketChannel)
403 {
404 final SocketChannel channel=(SocketChannel)o;
405
406 if (channel.isConnected())
407 {
408 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
409 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
410 key.attach(endpoint);
411 endpoint.schedule();
412 }
413 else if (channel.isOpen())
414 {
415 channel.register(selector,SelectionKey.OP_CONNECT,null);
416 }
417 }
418 else if (o instanceof ServerSocketChannel)
419 {
420 ServerSocketChannel channel = (ServerSocketChannel)o;
421 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
422 }
423 else if (o instanceof ChangeTask)
424 {
425 ((ChangeTask)o).run();
426 }
427 else
428 throw new IllegalArgumentException(o.toString());
429 }
430 catch (CancelledKeyException e)
431 {
432 if (isRunning())
433 Log.warn(e);
434 else
435 Log.debug(e);
436 }
437 }
438 changes.clear();
439
440 long idle_next;
441 long retry_next;
442 long now=System.currentTimeMillis();
443 synchronized (this)
444 {
445 _idleTimeout.setNow(now);
446 _timeout.setNow(now);
447
448 if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
449 _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
450 else
451 _idleTimeout.setDuration(_maxIdleTime);
452 idle_next=_idleTimeout.getTimeToNext();
453 retry_next=_timeout.getTimeToNext();
454 }
455
456
457 long wait = 1000L;
458 if (idle_next >= 0 && wait > idle_next)
459 wait = idle_next;
460 if (wait > 0 && retry_next >= 0 && wait > retry_next)
461 wait = retry_next;
462
463
464 if (wait > 0)
465 {
466 long before=now;
467 int selected=selector.select(wait);
468 now = System.currentTimeMillis();
469 _idleTimeout.setNow(now);
470 _timeout.setNow(now);
471
472
473
474 if (__JVMBUG_THRESHHOLD>0 && selected==0 && wait>__JVMBUG_THRESHHOLD && (now-before)<(wait/2) )
475 {
476 _jvmBug++;
477 if (_jvmBug>=(__JVMBUG_THRESHHOLD2))
478 {
479 synchronized (this)
480 {
481 _lastJVMBug=now;
482 if (_jvmBug1)
483 Log.debug("seeing JVM BUG(s) - recreating selector");
484 else
485 {
486 _jvmBug1=true;
487 Log.info("seeing JVM BUG(s) - recreating selector");
488 }
489
490 final Selector new_selector = Selector.open();
491 for (SelectionKey k: selector.keys())
492 {
493 if (!k.isValid() || k.interestOps()==0)
494 continue;
495
496 final SelectableChannel channel = k.channel();
497 final Object attachment = k.attachment();
498
499 if (attachment==null)
500 addChange(channel);
501 else
502 addChange(channel,attachment);
503 }
504 _selector.close();
505 _selector=new_selector;
506 _jvmBug=0;
507 return;
508 }
509 }
510 else if (_jvmBug==__JVMBUG_THRESHHOLD || _jvmBug==__JVMBUG_THRESHHOLD1)
511 {
512
513 if (_jvmBug0)
514 Log.debug("seeing JVM BUG(s) - cancelling interestOps==0");
515 else
516 {
517 _jvmBug0=true;
518 Log.info("seeing JVM BUG(s) - cancelling interestOps==0");
519 }
520 for (SelectionKey k: selector.keys())
521 {
522 if (k.isValid()&&k.interestOps()==0)
523 {
524 k.cancel();
525 }
526 }
527 return;
528 }
529 }
530 else
531 _jvmBug=0;
532 }
533 else
534 {
535 selector.selectNow();
536 _jvmBug=0;
537 }
538
539
540 if (_selector==null || !selector.isOpen())
541 return;
542
543
544 for (SelectionKey key: selector.selectedKeys())
545 {
546 try
547 {
548 if (!key.isValid())
549 {
550 key.cancel();
551 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
552 if (endpoint != null)
553 endpoint.doUpdateKey();
554 continue;
555 }
556
557 Object att = key.attachment();
558 if (att instanceof SelectChannelEndPoint)
559 {
560 ((SelectChannelEndPoint)att).schedule();
561 }
562 else if (key.isAcceptable())
563 {
564 SocketChannel channel = acceptChannel(key);
565 if (channel==null)
566 continue;
567
568 channel.configureBlocking(false);
569
570
571 _nextSet=++_nextSet%_selectSet.length;
572
573
574 if (_nextSet==_setID)
575 {
576
577 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
578 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
579 cKey.attach(endpoint);
580 if (endpoint != null)
581 endpoint.schedule();
582 }
583 else
584 {
585
586 _selectSet[_nextSet].addChange(channel);
587 _selectSet[_nextSet].wakeup();
588 }
589 }
590 else if (key.isConnectable())
591 {
592
593 SocketChannel channel = (SocketChannel)key.channel();
594 boolean connected=false;
595 try
596 {
597 connected=channel.finishConnect();
598 }
599 catch(Exception e)
600 {
601 connectionFailed(channel,e,att);
602 }
603 finally
604 {
605 if (connected)
606 {
607 key.interestOps(SelectionKey.OP_READ);
608 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
609 key.attach(endpoint);
610 endpoint.schedule();
611 }
612 else
613 {
614 key.cancel();
615 }
616 }
617 }
618 else
619 {
620
621 SocketChannel channel = (SocketChannel)key.channel();
622 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
623 key.attach(endpoint);
624 if (key.isReadable())
625 endpoint.schedule();
626 }
627 key = null;
628 }
629 catch (CancelledKeyException e)
630 {
631 Log.ignore(e);
632 }
633 catch (Exception e)
634 {
635 if (isRunning())
636 Log.warn(e);
637 else
638 Log.ignore(e);
639
640 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
641 key.cancel();
642 }
643 }
644
645
646 selector.selectedKeys().clear();
647
648
649 _idleTimeout.tick(now);
650 _timeout.tick(now);
651 }
652 catch (CancelledKeyException e)
653 {
654 Log.ignore(e);
655 }
656 finally
657 {
658 _selecting=null;
659 }
660 }
661
662
663 public SelectorManager getManager()
664 {
665 return SelectorManager.this;
666 }
667
668
669 public long getNow()
670 {
671 return _idleTimeout.getNow();
672 }
673
674
675 public void scheduleIdle(Timeout.Task task)
676 {
677 if (_idleTimeout.getDuration() <= 0)
678 return;
679 _idleTimeout.schedule(task);
680 }
681
682
683 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
684 {
685 _timeout.schedule(task, timeoutMs);
686 }
687
688
689 public void cancelTimeout(Timeout.Task task)
690 {
691 task.cancel();
692 }
693
694
695 public void wakeup()
696 {
697 Selector selector = _selector;
698 if (selector!=null)
699 selector.wakeup();
700 }
701
702
703 Selector getSelector()
704 {
705 return _selector;
706 }
707
708
709 void stop() throws Exception
710 {
711 boolean selecting=true;
712 while(selecting)
713 {
714 wakeup();
715 selecting=_selecting!=null;
716 }
717
718 for (SelectionKey key:_selector.keys())
719 {
720 if (key==null)
721 continue;
722 Object att=key.attachment();
723 if (att instanceof EndPoint)
724 {
725 EndPoint endpoint = (EndPoint)att;
726 try
727 {
728 endpoint.close();
729 }
730 catch(IOException e)
731 {
732 Log.ignore(e);
733 }
734 }
735 }
736
737 synchronized (this)
738 {
739 selecting=_selecting!=null;
740 while(selecting)
741 {
742 wakeup();
743 selecting=_selecting!=null;
744 }
745
746 _idleTimeout.cancelAll();
747 _timeout.cancelAll();
748 try
749 {
750 if (_selector != null)
751 _selector.close();
752 }
753 catch (IOException e)
754 {
755 Log.ignore(e);
756 }
757 _selector=null;
758 }
759 }
760
761 public void dump()
762 {
763 synchronized (System.err)
764 {
765 Selector selector=_selector;
766 Log.info("SelectSet "+_setID+" "+selector.keys().size()+" lastJVMBug="+new Date(_lastJVMBug));
767 for (SelectionKey key: selector.keys())
768 {
769 if (key.isValid())
770 Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment());
771 else
772 Log.info(key.channel()+" - - "+key.attachment());
773 }
774 }
775 }
776 }
777
778
779 private static class ChangeSelectableChannel
780 {
781 final SelectableChannel _channel;
782 final Object _attachment;
783
784 public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
785 {
786 super();
787 _channel = channel;
788 _attachment = attachment;
789 }
790 }
791
792
793 private interface ChangeTask
794 {
795 public void run();
796 }
797 }