View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.Channel;
19  import java.nio.channels.ClosedSelectorException;
20  import java.nio.channels.SelectableChannel;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.Selector;
23  import java.nio.channels.ServerSocketChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.TimeUnit;
32  
33  
34  import org.eclipse.jetty.io.ConnectedEndPoint;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.util.TypeUtil;
38  import org.eclipse.jetty.util.component.AbstractLifeCycle;
39  import org.eclipse.jetty.util.component.AggregateLifeCycle;
40  import org.eclipse.jetty.util.component.Dumpable;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  import org.eclipse.jetty.util.thread.Timeout;
44  import org.eclipse.jetty.util.thread.Timeout.Task;
45  
46  
47  /* ------------------------------------------------------------ */
48  /**
49   * The Selector Manager manages and number of SelectSets to allow
50   * NIO scheduling to scale to large numbers of connections.
51   * <p>
52   * This class works around a number of know JVM bugs. For details
53   * see http://wiki.eclipse.org/Jetty/Feature/JVM_NIO_Bug
54   */
55  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
56  {
57      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
58      
59      // TODO Tune these by approx system speed.
60      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
61      private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
62      private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
63      private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
64      private static final int __BUSY_KEY=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_KEY",-1).intValue();
65      private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
66      
67      private int _maxIdleTime;
68      private int _lowResourcesMaxIdleTime;
69      private long _lowResourcesConnections;
70      private SelectSet[] _selectSet;
71      private int _selectSets=1;
72      private volatile int _set;
73      private boolean _deferringInterestedOps0=true;
74      private int _selectorPriorityDelta=0;
75      
76      /* ------------------------------------------------------------ */
77      /**
78       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
79       * @see #setLowResourcesMaxIdleTime(long)
80       */
81      public void setMaxIdleTime(long maxIdleTime)
82      {
83          _maxIdleTime=(int)maxIdleTime;
84      }
85      
86      /* ------------------------------------------------------------ */
87      /**
88       * @param selectSets number of select sets to create
89       */
90      public void setSelectSets(int selectSets)
91      {
92          long lrc = _lowResourcesConnections * _selectSets; 
93          _selectSets=selectSets;
94          _lowResourcesConnections=lrc/_selectSets;
95      }
96      
97      /* ------------------------------------------------------------ */
98      /**
99       * @return the max idle time
100      */
101     public long getMaxIdleTime()
102     {
103         return _maxIdleTime;
104     }
105     
106     /* ------------------------------------------------------------ */
107     /**
108      * @return the number of select sets in use
109      */
110     public int getSelectSets()
111     {
112         return _selectSets;
113     }
114 
115     /* ------------------------------------------------------------ */
116     /**
117      * @param i 
118      * @return The select set
119      */
120     public SelectSet getSelectSet(int i)
121     {
122         return _selectSet[i];
123     }
124     
125     /* ------------------------------------------------------------ */
126     /** Register a channel
127      * @param channel
128      * @param att Attached Object
129      */
130     public void register(SocketChannel channel, Object att)
131     {
132         // The ++ increment here is not atomic, but it does not matter.
133         // so long as the value changes sometimes, then connections will
134         // be distributed over the available sets.
135         
136         int s=_set++; 
137         s=s%_selectSets;
138         SelectSet[] sets=_selectSet;
139         if (sets!=null)
140         {
141             SelectSet set=sets[s];
142             set.addChange(channel,att);
143             set.wakeup();
144         }
145     }
146 
147     
148     /* ------------------------------------------------------------ */
149     /** Register a channel
150      * @param channel
151      */
152     public void register(SocketChannel channel)
153     {
154         // The ++ increment here is not atomic, but it does not matter.
155         // so long as the value changes sometimes, then connections will
156         // be distributed over the available sets.
157         
158         int s=_set++; 
159         s=s%_selectSets;
160         SelectSet[] sets=_selectSet;
161         if (sets!=null)
162         {
163             SelectSet set=sets[s];
164             set.addChange(channel);
165             set.wakeup();
166         }
167     }
168     
169     /* ------------------------------------------------------------ */
170     /** Register a {@link ServerSocketChannel}
171      * @param acceptChannel
172      */
173     public void register(ServerSocketChannel acceptChannel)
174     {
175         int s=_set++; 
176         s=s%_selectSets;
177         SelectSet set=_selectSet[s];
178         set.addChange(acceptChannel);
179         set.wakeup();
180     }
181 
182     /* ------------------------------------------------------------ */
183     /**
184      * @return delta The value to add to the selector thread priority.
185      */
186     public int getSelectorPriorityDelta()
187     {
188         return _selectorPriorityDelta;
189     }
190 
191     /* ------------------------------------------------------------ */
192     /** Set the selector thread priorty delta.
193      * @param delta The value to add to the selector thread priority.
194      */
195     public void setSelectorPriorityDelta(int delta)
196     {
197         _selectorPriorityDelta=delta;
198     }
199     
200     
201     /* ------------------------------------------------------------ */
202     /**
203      * @return the lowResourcesConnections
204      */
205     public long getLowResourcesConnections()
206     {
207         return _lowResourcesConnections*_selectSets;
208     }
209 
210     /* ------------------------------------------------------------ */
211     /**
212      * Set the number of connections, which if exceeded places this manager in low resources state.
213      * This is not an exact measure as the connection count is averaged over the select sets.
214      * @param lowResourcesConnections the number of connections
215      * @see #setLowResourcesMaxIdleTime(long)
216      */
217     public void setLowResourcesConnections(long lowResourcesConnections)
218     {
219         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
220     }
221 
222     /* ------------------------------------------------------------ */
223     /**
224      * @return the lowResourcesMaxIdleTime
225      */
226     public long getLowResourcesMaxIdleTime()
227     {
228         return _lowResourcesMaxIdleTime;
229     }
230 
231     /* ------------------------------------------------------------ */
232     /**
233      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
234      * @see #setMaxIdleTime(long)
235      */
236     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
237     {
238         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
239     }
240     
241 
242     /* ------------------------------------------------------------------------------- */
243     public abstract boolean dispatch(Runnable task);
244 
245     /* ------------------------------------------------------------ */
246     /* (non-Javadoc)
247      * @see org.eclipse.component.AbstractLifeCycle#doStart()
248      */
249     @Override
250     protected void doStart() throws Exception
251     {
252         _selectSet = new SelectSet[_selectSets];
253         for (int i=0;i<_selectSet.length;i++)
254             _selectSet[i]= new SelectSet(i);
255 
256         super.doStart();
257         
258         // start a thread to Select
259         for (int i=0;i<getSelectSets();i++)
260         {
261             final int id=i;
262             dispatch(new Runnable()
263             {
264                 public void run()
265                 {
266                     String name=Thread.currentThread().getName();
267                     int priority=Thread.currentThread().getPriority();
268                     try
269                     {
270                         SelectSet[] sets=_selectSet;
271                         if (sets==null)
272                             return;
273                         SelectSet set=sets[id];
274                         
275                         Thread.currentThread().setName(name+" Selector"+id);
276                         if (getSelectorPriorityDelta()!=0)
277                             Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
278                         LOG.debug("Starting {} on {}",Thread.currentThread(),this);
279                         while (isRunning())
280                         {
281                             try
282                             {
283                                 set.doSelect();
284                             }
285                             catch(ThreadDeath e)
286                             {
287                                 throw e;
288                             }
289                             catch(IOException e)
290                             {
291                                 LOG.ignore(e);
292                             }
293                             catch(Exception e)
294                             {
295                                 LOG.warn(e);
296                             }
297                         }
298                     }
299                     finally
300                     {
301                         LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
302                         Thread.currentThread().setName(name);
303                         if (getSelectorPriorityDelta()!=0)
304                             Thread.currentThread().setPriority(priority);
305                     }
306                 }
307 
308             });
309         }
310     }
311 
312 
313     /* ------------------------------------------------------------------------------- */
314     @Override
315     protected void doStop() throws Exception
316     {
317         SelectSet[] sets= _selectSet;
318         _selectSet=null;
319         if (sets!=null)
320         {
321             for (SelectSet set : sets)
322             {
323                 if (set!=null)
324                     set.stop();
325             }
326         }
327         super.doStop();
328     }
329 
330     /* ------------------------------------------------------------ */
331     /**
332      * @param endpoint
333      */
334     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
335 
336     /* ------------------------------------------------------------ */
337     /**
338      * @param endpoint
339      */
340     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
341 
342     /* ------------------------------------------------------------ */
343     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
344 
345     /* ------------------------------------------------------------------------------- */
346     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
347 
348     /* ------------------------------------------------------------ */
349     /**
350      * Create a new end point
351      * @param channel
352      * @param selectSet
353      * @param sKey the selection key
354      * @return the new endpoint {@link SelectChannelEndPoint}
355      * @throws IOException
356      */
357     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
358 
359     /* ------------------------------------------------------------------------------- */
360     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
361     {
362         LOG.warn(ex+","+channel+","+attachment);
363         LOG.debug(ex);
364     }
365     
366     /* ------------------------------------------------------------ */
367     public String dump()
368     {
369         return AggregateLifeCycle.dump(this);
370     }
371 
372     /* ------------------------------------------------------------ */
373     public void dump(Appendable out, String indent) throws IOException
374     {
375         out.append(String.valueOf(this)).append("\n");
376         AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
377     }
378     
379     
380     /* ------------------------------------------------------------------------------- */
381     /* ------------------------------------------------------------------------------- */
382     /* ------------------------------------------------------------------------------- */
383     public class SelectSet implements Dumpable
384     {
385         private final int _setID;
386         private final Timeout _timeout;
387         
388         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
389         
390         private Selector _selector;
391         
392         private volatile Thread _selecting;
393         private int _jvmBug;
394         private int _selects;
395         private long _monitorStart;
396         private long _monitorNext;
397         private boolean _pausing;
398         private SelectionKey _busyKey;
399         private int _busyKeyCount;
400         private long _log;
401         private int _paused;
402         private int _jvmFix0;
403         private int _jvmFix1;
404         private int _jvmFix2;
405         private volatile long _idleTick;
406         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
407         
408         /* ------------------------------------------------------------ */
409         SelectSet(int acceptorID) throws Exception
410         {
411             _setID=acceptorID;
412 
413             _idleTick = System.currentTimeMillis();
414             _timeout = new Timeout(this);
415             _timeout.setDuration(0L);
416 
417             // create a selector;
418             _selector = Selector.open();
419             _monitorStart=System.currentTimeMillis();
420             _monitorNext=_monitorStart+__MONITOR_PERIOD;
421             _log=_monitorStart+60000;
422         }
423         
424         /* ------------------------------------------------------------ */
425         public void addChange(Object change)
426         {
427             _changes.add(change);
428         }
429 
430         /* ------------------------------------------------------------ */
431         public void addChange(SelectableChannel channel, Object att)
432         {   
433             if (att==null)
434                 addChange(channel);
435             else if (att instanceof EndPoint)
436                 addChange(att);
437             else
438                 addChange(new ChannelAndAttachment(channel,att));
439         }
440         
441         /* ------------------------------------------------------------ */
442         /**
443          * Select and dispatch tasks found from changes and the selector.
444          * 
445          * @throws IOException
446          */
447         public void doSelect() throws IOException
448         {
449             try
450             {
451                 _selecting=Thread.currentThread();
452                 final Selector selector=_selector;
453 
454                 // Make any key changes required
455                 Object change;
456                 int changes=_changes.size();
457                 while (changes-->0 && (change=_changes.poll())!=null)
458                 {
459                     Channel ch=null;
460                     SelectionKey key=null;
461                     
462                     try
463                     {
464                         if (change instanceof EndPoint)
465                         {
466                             // Update the operations for a key.
467                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
468                             ch=endpoint.getChannel();
469                             endpoint.doUpdateKey();
470                         }
471                         else if (change instanceof ChannelAndAttachment)
472                         {
473                             // finish accepting/connecting this connection
474                             final ChannelAndAttachment asc = (ChannelAndAttachment)change;
475                             final SelectableChannel channel=asc._channel;
476                             ch=channel;
477                             final Object att = asc._attachment;
478                             
479                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
480                             {
481                                 key = channel.register(selector,SelectionKey.OP_READ,att);
482                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
483                                 key.attach(endpoint);
484                                 endpoint.schedule();
485                             }
486                             else if (channel.isOpen())
487                             {
488                                 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
489                             }
490                         }
491                         else if (change instanceof SocketChannel)
492                         {
493                             // Newly registered channel
494                             final SocketChannel channel=(SocketChannel)change;
495                             ch=channel;
496                             key = channel.register(selector,SelectionKey.OP_READ,null);
497                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
498                             key.attach(endpoint);
499                             endpoint.schedule();
500                         }
501                         else if (change instanceof ChangeTask)
502                         {
503                             ((Runnable)change).run();
504                         }
505                         else if (change instanceof Runnable)
506                         {
507                             dispatch((Runnable)change);
508                         }
509                         else
510                             throw new IllegalArgumentException(change.toString());
511                     }
512                     catch (CancelledKeyException e)
513                     {
514                         LOG.ignore(e);
515                     }
516                     catch (Throwable e)
517                     {
518                         if (e instanceof ThreadDeath)
519                             throw (ThreadDeath)e;
520                         
521                         if (isRunning())
522                             LOG.warn(e);
523                         else
524                             LOG.debug(e);
525 
526                         try
527                         {
528                             ch.close();
529                         }
530                         catch(IOException e2)
531                         {
532                             LOG.debug(e2);
533                         }
534                     }
535                 }
536 
537 
538                 // Do and instant select to see if any connections can be handled.
539                 int selected=selector.selectNow();
540                 _selects++;
541 
542                 long now=System.currentTimeMillis();
543                 
544                 // if no immediate things to do
545                 if (selected==0 && selector.selectedKeys().isEmpty())
546                 {
547                     // If we are in pausing mode
548                     if (_pausing)
549                     {
550                         try
551                         {
552                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
553                         }
554                         catch(InterruptedException e)
555                         {
556                             LOG.ignore(e);
557                         }
558                         now=System.currentTimeMillis();
559                     }
560 
561                     // workout how long to wait in select
562                     _timeout.setNow(now);
563                     long to_next_timeout=_timeout.getTimeToNext();
564 
565                     long wait = _changes.size()==0?__IDLE_TICK:0L;  
566                     if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
567                         wait = to_next_timeout;
568 
569                     // If we should wait with a select
570                     if (wait>0)
571                     {
572                         long before=now;
573                         selected=selector.select(wait);
574                         _selects++;
575                         now = System.currentTimeMillis();
576                         _timeout.setNow(now);
577                         
578                         if (__JVMBUG_THRESHHOLD>0)
579                             checkJvmBugs(before, now, wait, selected);
580                     }
581                 }
582                 
583                 // have we been destroyed while sleeping
584                 if (_selector==null || !selector.isOpen())
585                     return;
586 
587                 // Look for things to do
588                 for (SelectionKey key: selector.selectedKeys())
589                 {   
590                     SocketChannel channel=null;
591                     
592                     try
593                     {
594                         if (!key.isValid())
595                         {
596                             key.cancel();
597                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
598                             if (endpoint != null)
599                                 endpoint.doUpdateKey();
600                             continue;
601                         }
602 
603                         Object att = key.attachment();
604                         if (att instanceof SelectChannelEndPoint)
605                         {
606                             if (key.isReadable()||key.isWritable())
607                                 ((SelectChannelEndPoint)att).schedule();
608                         }
609                         else if (key.isConnectable())
610                         {
611                             // Complete a connection of a registered channel
612                             channel = (SocketChannel)key.channel();
613                             boolean connected=false;
614                             try
615                             {
616                                 connected=channel.finishConnect();
617                             }
618                             catch(Exception e)
619                             {
620                                 connectionFailed(channel,e,att);
621                             }
622                             finally
623                             {
624                                 if (connected)
625                                 {
626                                     key.interestOps(SelectionKey.OP_READ);
627                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
628                                     key.attach(endpoint);
629                                     endpoint.schedule();
630                                 }
631                                 else
632                                 {
633                                     key.cancel();
634                                 }
635                             }
636                         }
637                         else
638                         {
639                             // Wrap readable registered channel in an endpoint
640                             channel = (SocketChannel)key.channel();
641                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
642                             key.attach(endpoint);
643                             if (key.isReadable())
644                                 endpoint.schedule();  
645                         }
646                         key = null;
647                     }
648                     catch (CancelledKeyException e)
649                     {
650                         LOG.ignore(e);
651                     }
652                     catch (Exception e)
653                     {
654                         if (isRunning())
655                             LOG.warn(e);
656                         else
657                             LOG.ignore(e);
658 
659                         try
660                         {
661                             if (channel!=null)
662                                 channel.close();
663                         }
664                         catch(IOException e2)
665                         {
666                             LOG.debug(e2);
667                         }
668                         
669                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
670                             key.cancel();
671                     }
672                 }
673                 
674                 // Everything always handled
675                 selector.selectedKeys().clear();
676                 
677                 now=System.currentTimeMillis();
678                 _timeout.setNow(now);
679                 Task task = _timeout.expired();
680                 while (task!=null)
681                 {
682                     if (task instanceof Runnable)
683                         dispatch((Runnable)task);
684                     task = _timeout.expired();
685                 }
686 
687                 // Idle tick
688                 if (now-_idleTick>__IDLE_TICK)
689                 {
690                     _idleTick=now;
691                     
692                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
693                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
694                         :now;
695                         
696                     dispatch(new Runnable()
697                     {
698                         public void run()
699                         {
700                             for (SelectChannelEndPoint endp:_endPoints.keySet())
701                             {
702                                 endp.checkIdleTimestamp(idle_now);
703                             }
704                         }
705                     });
706                 }
707             }
708             catch (ClosedSelectorException e)
709             {
710                 if (isRunning())
711                     LOG.warn(e);
712                 else
713                     LOG.ignore(e);
714             }
715             catch (CancelledKeyException e)
716             {
717                 LOG.ignore(e);
718             }
719             finally
720             {
721                 _selecting=null;
722             }
723         }
724         
725         /* ------------------------------------------------------------ */
726         private void checkJvmBugs(long before, long now, long wait, int selected)
727             throws IOException
728         {
729             Selector selector = _selector;
730             if (selector==null)
731                 return;
732                 
733             // Look for JVM bugs over a monitor period.
734             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
735             // http://bugs.sun.com/view_bug.do?bug_id=6693490
736             if (now>_monitorNext)
737             {
738                 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
739                 _pausing=_selects>__MAX_SELECTS;
740                 if (_pausing)
741                     _paused++;
742 
743                 _selects=0;
744                 _jvmBug=0;
745                 _monitorStart=now;
746                 _monitorNext=now+__MONITOR_PERIOD;
747             }
748 
749             if (now>_log)
750             {
751                 if (_paused>0)  
752                     LOG.debug(this+" Busy selector - injecting delay "+_paused+" times");
753 
754                 if (_jvmFix2>0)
755                     LOG.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
756 
757                 if (_jvmFix1>0)
758                     LOG.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
759 
760                 else if(LOG.isDebugEnabled() && _jvmFix0>0)
761                     LOG.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
762                 _paused=0;
763                 _jvmFix2=0;
764                 _jvmFix1=0;
765                 _jvmFix0=0;
766                 _log=now+60000;
767             }
768 
769             // If we see signature of possible JVM bug, increment count.
770             if (selected==0 && wait>10 && (now-before)<(wait/2))
771             {
772                 // Increment bug count and try a work around
773                 _jvmBug++;
774                 if (_jvmBug>(__JVMBUG_THRESHHOLD))
775                 {
776                     try
777                     {
778                         if (_jvmBug==__JVMBUG_THRESHHOLD+1)
779                             _jvmFix2++;
780 
781                         Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
782                     }
783                     catch(InterruptedException e)
784                     {
785                         LOG.ignore(e);
786                     }
787                 }
788                 else if (_jvmBug==__JVMBUG_THRESHHOLD)
789                 {
790                     renewSelector();
791                 }
792                 else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
793                 {
794                     // Cancel keys with 0 interested ops
795                     int cancelled=0;
796                     for (SelectionKey k: selector.keys())
797                     {
798                         if (k.isValid()&&k.interestOps()==0)
799                         {
800                             k.cancel();
801                             cancelled++;
802                         }
803                     }
804                     if (cancelled>0)
805                         _jvmFix0++;
806 
807                     return;
808                 }
809             }
810             else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
811             {
812                 // Look for busy key
813                 SelectionKey busy = selector.selectedKeys().iterator().next();
814                 if (busy==_busyKey)
815                 {
816                     if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
817                     {
818                         final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
819                         LOG.warn("Busy Key "+busy.channel()+" "+endpoint);
820                         busy.cancel();
821                         if (endpoint!=null)
822                         {
823                             dispatch(new Runnable()
824                             {
825                                 public void run()
826                                 {
827                                     try
828                                     {
829                                         endpoint.close();
830                                     }
831                                     catch (IOException e)
832                                     {
833                                         LOG.ignore(e);
834                                     }
835                                 }
836                             });
837                         }
838                     }
839                 }
840                 else
841                     _busyKeyCount=0;
842                 _busyKey=busy;
843             }
844         }
845         
846         /* ------------------------------------------------------------ */
847         private void renewSelector() 
848         {
849             try
850             {
851                 synchronized (this)
852                 {
853                     Selector selector=_selector;
854                     if (selector==null)
855                         return;
856                     final Selector new_selector = Selector.open();
857                     for (SelectionKey k: selector.keys())
858                     {
859                         if (!k.isValid() || k.interestOps()==0)
860                             continue;
861 
862                         final SelectableChannel channel = k.channel();
863                         final Object attachment = k.attachment();
864 
865                         if (attachment==null)
866                             addChange(channel);
867                         else
868                             addChange(channel,attachment);
869                     }
870                     _selector.close();
871                     _selector=new_selector;
872                 }
873             }
874             catch(IOException e)
875             {
876                 throw new RuntimeException("recreating selector",e);
877             }
878         }
879         
880         /* ------------------------------------------------------------ */
881         public SelectorManager getManager()
882         {
883             return SelectorManager.this;
884         }
885 
886         /* ------------------------------------------------------------ */
887         public long getNow()
888         {
889             return _timeout.getNow();
890         }
891 
892         /* ------------------------------------------------------------ */
893         /**
894          * @param task The task to timeout. If it implements Runnable, then 
895          * expired will be called from a dispatched thread.
896          * 
897          * @param timeoutMs
898          */
899         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
900         {
901             if (!(task instanceof Runnable))
902                 throw new IllegalArgumentException("!Runnable");
903             _timeout.schedule(task, timeoutMs);
904         }
905         
906         /* ------------------------------------------------------------ */
907         public void cancelTimeout(Timeout.Task task)
908         {
909             task.cancel();
910         }
911 
912         /* ------------------------------------------------------------ */
913         public void wakeup()
914         {
915             try
916             {
917                 Selector selector = _selector;
918                 if (selector!=null)
919                     selector.wakeup();
920             }
921             catch(Exception e)
922             {
923                 addChange(new ChangeTask()
924                 {
925                     public void run()
926                     {
927                         renewSelector();
928                     }
929                 });
930                 
931                 renewSelector();
932             }
933         }
934         
935         /* ------------------------------------------------------------ */
936         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
937         {
938             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
939             endPointOpened(endp); 
940             _endPoints.put(endp,this);
941             return endp;
942         }
943         
944         /* ------------------------------------------------------------ */
945         public void destroyEndPoint(SelectChannelEndPoint endp)
946         {
947             _endPoints.remove(endp);
948             endPointClosed(endp);
949         }
950 
951         /* ------------------------------------------------------------ */
952         Selector getSelector()
953         {
954             return _selector;
955         }
956         
957         /* ------------------------------------------------------------ */
958         void stop() throws Exception
959         {
960             // Spin for a while waiting for selector to complete 
961             // to avoid unneccessary closed channel exceptions
962             try
963             {
964                 for (int i=0;i<100 && _selecting!=null;i++)
965                 {
966                     wakeup();
967                     Thread.sleep(10);
968                 }
969             }
970             catch(Exception e)
971             {
972                 LOG.ignore(e);
973             }
974 
975             // close endpoints and selector
976             synchronized (this)
977             {
978                 for (SelectionKey key:_selector.keys())
979                 {
980                     if (key==null)
981                         continue;
982                     Object att=key.attachment();
983                     if (att instanceof EndPoint)
984                     {
985                         EndPoint endpoint = (EndPoint)att;
986                         try
987                         {
988                             endpoint.close();
989                         }
990                         catch(IOException e)
991                         {
992                             LOG.ignore(e);
993                         }
994                     }
995                 }
996             
997             
998                 _timeout.cancelAll();
999                 try
1000                 {
1001                     if (_selector != null)
1002                         _selector.close();
1003                 }
1004                 catch (IOException e)
1005                 {
1006                     LOG.ignore(e);
1007                 } 
1008                 _selector=null;
1009             }
1010         }
1011 
1012         /* ------------------------------------------------------------ */
1013         public String dump()
1014         {
1015             return AggregateLifeCycle.dump(this);
1016         }
1017 
1018         /* ------------------------------------------------------------ */
1019         public void dump(Appendable out, String indent) throws IOException
1020         {
1021             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
1022             
1023             Thread selecting = _selecting;
1024             
1025             Object where = "not selecting";
1026             StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
1027             if (trace!=null)
1028             {
1029                 for (StackTraceElement t:trace)
1030                     if (t.getClassName().startsWith("org.eclipse.jetty."))
1031                     {
1032                         where=t;
1033                         break;
1034                     }
1035             }
1036 
1037             Selector selector=_selector;
1038             final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
1039             dump.add(where);
1040             
1041             final CountDownLatch latch = new CountDownLatch(1);
1042             
1043             addChange(new Runnable(){
1044                 public void run()
1045                 {
1046                     dumpKeyState(dump);
1047                     latch.countDown();
1048                 }
1049             });
1050             
1051             try
1052             {
1053                 latch.await(5,TimeUnit.SECONDS);
1054             }
1055             catch(InterruptedException e)
1056             {
1057                 LOG.ignore(e);
1058             }
1059             AggregateLifeCycle.dump(out,indent,dump);
1060         }
1061 
1062         /* ------------------------------------------------------------ */
1063         public void dumpKeyState(List<Object> dumpto)
1064         {
1065             Selector selector=_selector;
1066             dumpto.add(selector+" keys="+selector.keys().size());
1067             for (SelectionKey key: selector.keys())
1068             {
1069                 if (key.isValid())
1070                     dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
1071                 else
1072                     dumpto.add(key.attachment()+" - - ");
1073             }
1074         }
1075     }
1076 
1077     /* ------------------------------------------------------------ */
1078     private static class ChannelAndAttachment
1079     {
1080         final SelectableChannel _channel;
1081         final Object _attachment;
1082         
1083         public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1084         {
1085             super();
1086             _channel = channel;
1087             _attachment = attachment;
1088         }
1089     }
1090 
1091     /* ------------------------------------------------------------ */
1092     public boolean isDeferringInterestedOps0()
1093     {
1094         return _deferringInterestedOps0;
1095     }
1096 
1097     /* ------------------------------------------------------------ */
1098     public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1099     {
1100         _deferringInterestedOps0 = deferringInterestedOps0;
1101     }
1102     
1103 
1104     /* ------------------------------------------------------------ */
1105     /* ------------------------------------------------------------ */
1106     /* ------------------------------------------------------------ */
1107     private interface ChangeTask extends Runnable
1108     {}
1109     
1110 }