View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.ClosedSelectorException;
19  import java.nio.channels.SelectableChannel;
20  import java.nio.channels.SelectionKey;
21  import java.nio.channels.Selector;
22  import java.nio.channels.ServerSocketChannel;
23  import java.nio.channels.SocketChannel;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.ConcurrentMap;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.eclipse.jetty.io.ConnectedEndPoint;
33  import org.eclipse.jetty.io.Connection;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.util.TypeUtil;
36  import org.eclipse.jetty.util.component.AbstractLifeCycle;
37  import org.eclipse.jetty.util.component.AggregateLifeCycle;
38  import org.eclipse.jetty.util.component.Dumpable;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.thread.Timeout;
41  import org.eclipse.jetty.util.thread.Timeout.Task;
42  
43  
44  /* ------------------------------------------------------------ */
45  /**
46   * The Selector Manager manages and number of SelectSets to allow
47   * NIO scheduling to scale to large numbers of connections.
48   * <p>
49   * This class works around a number of know JVM bugs. For details
50   * see http://wiki.eclipse.org/Jetty/Feature/JVM_NIO_Bug
51   */
52  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
53  {
54      // TODO Tune these by approx system speed.
55      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
56      private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
57      private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
58      private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
59      private static final int __BUSY_KEY=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_KEY",-1).intValue();
60      private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
61      
62      private int _maxIdleTime;
63      private int _lowResourcesMaxIdleTime;
64      private long _lowResourcesConnections;
65      private SelectSet[] _selectSet;
66      private int _selectSets=1;
67      private volatile int _set;
68      private boolean _deferringInterestedOps0=true;
69      
70      /* ------------------------------------------------------------ */
71      /**
72       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
73       * @see #setLowResourcesMaxIdleTime(long)
74       */
75      public void setMaxIdleTime(long maxIdleTime)
76      {
77          _maxIdleTime=(int)maxIdleTime;
78      }
79      
80      /* ------------------------------------------------------------ */
81      /**
82       * @param selectSets number of select sets to create
83       */
84      public void setSelectSets(int selectSets)
85      {
86          long lrc = _lowResourcesConnections * _selectSets; 
87          _selectSets=selectSets;
88          _lowResourcesConnections=lrc/_selectSets;
89      }
90      
91      /* ------------------------------------------------------------ */
92      /**
93       * @return the max idle time
94       */
95      public long getMaxIdleTime()
96      {
97          return _maxIdleTime;
98      }
99      
100     /* ------------------------------------------------------------ */
101     /**
102      * @return the number of select sets in use
103      */
104     public int getSelectSets()
105     {
106         return _selectSets;
107     }
108 
109     /* ------------------------------------------------------------ */
110     /**
111      * @param i 
112      * @return The select set
113      */
114     public SelectSet getSelectSet(int i)
115     {
116         return _selectSet[i];
117     }
118     
119     /* ------------------------------------------------------------ */
120     /** Register a channel
121      * @param channel
122      * @param att Attached Object
123      */
124     public void register(SocketChannel channel, Object att)
125     {
126         // The ++ increment here is not atomic, but it does not matter.
127         // so long as the value changes sometimes, then connections will
128         // be distributed over the available sets.
129         
130         int s=_set++; 
131         s=s%_selectSets;
132         SelectSet[] sets=_selectSet;
133         if (sets!=null)
134         {
135             SelectSet set=sets[s];
136             set.addChange(channel,att);
137             set.wakeup();
138         }
139     }
140 
141     
142     /* ------------------------------------------------------------ */
143     /** Register a channel
144      * @param channel
145      */
146     public void register(SocketChannel channel)
147     {
148         // The ++ increment here is not atomic, but it does not matter.
149         // so long as the value changes sometimes, then connections will
150         // be distributed over the available sets.
151         
152         int s=_set++; 
153         s=s%_selectSets;
154         SelectSet[] sets=_selectSet;
155         if (sets!=null)
156         {
157             SelectSet set=sets[s];
158             set.addChange(channel);
159             set.wakeup();
160         }
161     }
162     
163     /* ------------------------------------------------------------ */
164     /** Register a {@link ServerSocketChannel}
165      * @param acceptChannel
166      */
167     public void register(ServerSocketChannel acceptChannel)
168     {
169         int s=_set++; 
170         s=s%_selectSets;
171         SelectSet set=_selectSet[s];
172         set.addChange(acceptChannel);
173         set.wakeup();
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * @return the lowResourcesConnections
179      */
180     public long getLowResourcesConnections()
181     {
182         return _lowResourcesConnections*_selectSets;
183     }
184 
185     /* ------------------------------------------------------------ */
186     /**
187      * Set the number of connections, which if exceeded places this manager in low resources state.
188      * This is not an exact measure as the connection count is averaged over the select sets.
189      * @param lowResourcesConnections the number of connections
190      * @see #setLowResourcesMaxIdleTime(long)
191      */
192     public void setLowResourcesConnections(long lowResourcesConnections)
193     {
194         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
195     }
196 
197     /* ------------------------------------------------------------ */
198     /**
199      * @return the lowResourcesMaxIdleTime
200      */
201     public long getLowResourcesMaxIdleTime()
202     {
203         return _lowResourcesMaxIdleTime;
204     }
205 
206     /* ------------------------------------------------------------ */
207     /**
208      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
209      * @see #setMaxIdleTime(long)
210      */
211     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
212     {
213         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
214     }
215     
216     /* ------------------------------------------------------------ */
217     /**
218      * @param acceptorID
219      * @throws IOException
220      */
221     public void doSelect(int acceptorID) throws IOException
222     {
223         SelectSet[] sets= _selectSet;
224         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
225             sets[acceptorID].doSelect();
226     }
227 
228     /* ------------------------------------------------------------------------------- */
229     public abstract boolean dispatch(Runnable task);
230 
231     /* ------------------------------------------------------------ */
232     /* (non-Javadoc)
233      * @see org.eclipse.component.AbstractLifeCycle#doStart()
234      */
235     @Override
236     protected void doStart() throws Exception
237     {
238         _selectSet = new SelectSet[_selectSets];
239         for (int i=0;i<_selectSet.length;i++)
240             _selectSet[i]= new SelectSet(i);
241 
242         super.doStart();
243     }
244 
245 
246     /* ------------------------------------------------------------------------------- */
247     @Override
248     protected void doStop() throws Exception
249     {
250         SelectSet[] sets= _selectSet;
251         _selectSet=null;
252         if (sets!=null)
253         {
254             for (SelectSet set : sets)
255             {
256                 if (set!=null)
257                     set.stop();
258             }
259         }
260         super.doStop();
261     }
262 
263     /* ------------------------------------------------------------ */
264     /**
265      * @param endpoint
266      */
267     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
268 
269     /* ------------------------------------------------------------ */
270     /**
271      * @param endpoint
272      */
273     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
274 
275     /* ------------------------------------------------------------ */
276     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
277 
278     /* ------------------------------------------------------------------------------- */
279     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
280 
281     /* ------------------------------------------------------------ */
282     /**
283      * Create a new end point
284      * @param channel
285      * @param selectSet
286      * @param sKey the selection key
287      * @return the new endpoint {@link SelectChannelEndPoint}
288      * @throws IOException
289      */
290     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
291 
292     /* ------------------------------------------------------------------------------- */
293     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
294     {
295         Log.warn(ex+","+channel+","+attachment);
296         Log.debug(ex);
297     }
298     
299     /* ------------------------------------------------------------ */
300     public String dump()
301     {
302         return AggregateLifeCycle.dump(this);
303     }
304 
305     /* ------------------------------------------------------------ */
306     public void dump(Appendable out, String indent) throws IOException
307     {
308         out.append(String.valueOf(this)).append("\n");
309         AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
310     }
311     
312     
313     /* ------------------------------------------------------------------------------- */
314     /* ------------------------------------------------------------------------------- */
315     /* ------------------------------------------------------------------------------- */
316     public class SelectSet implements Dumpable
317     {
318         private final int _setID;
319         private final Timeout _timeout;
320         
321         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
322         
323         private Selector _selector;
324         
325         private volatile Thread _selecting;
326         private int _jvmBug;
327         private int _selects;
328         private long _monitorStart;
329         private long _monitorNext;
330         private boolean _pausing;
331         private SelectionKey _busyKey;
332         private int _busyKeyCount;
333         private long _log;
334         private int _paused;
335         private int _jvmFix0;
336         private int _jvmFix1;
337         private int _jvmFix2;
338         private volatile long _idleTick;
339         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
340         
341         /* ------------------------------------------------------------ */
342         SelectSet(int acceptorID) throws Exception
343         {
344             _setID=acceptorID;
345 
346             _idleTick = System.currentTimeMillis();
347             _timeout = new Timeout(this);
348             _timeout.setDuration(0L);
349 
350             // create a selector;
351             _selector = Selector.open();
352             _monitorStart=System.currentTimeMillis();
353             _monitorNext=_monitorStart+__MONITOR_PERIOD;
354             _log=_monitorStart+60000;
355         }
356         
357         /* ------------------------------------------------------------ */
358         public void addChange(Object change)
359         {
360             _changes.add(change);
361         }
362 
363         /* ------------------------------------------------------------ */
364         public void addChange(SelectableChannel channel, Object att)
365         {   
366             if (att==null)
367                 addChange(channel);
368             else if (att instanceof EndPoint)
369                 addChange(att);
370             else
371                 addChange(new ChannelAndAttachment(channel,att));
372         }
373         
374         /* ------------------------------------------------------------ */
375         /**
376          * Select and dispatch tasks found from changes and the selector.
377          * 
378          * @throws IOException
379          */
380         public void doSelect() throws IOException
381         {
382             Log.debug("doSelect "+SelectorManager.this.isRunning());
383             try
384             {
385                 _selecting=Thread.currentThread();
386                 final Selector selector=_selector;
387 
388                 // Make any key changes required
389                 Object change;
390                 int changes=_changes.size();
391                 while (changes-->0 && (change=_changes.poll())!=null)
392                 {
393                     try
394                     {
395                         if (change instanceof EndPoint)
396                         {
397                             // Update the operations for a key.
398                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
399                             endpoint.doUpdateKey();
400                         }
401                         else if (change instanceof ChannelAndAttachment)
402                         {
403                             // finish accepting/connecting this connection
404                             final ChannelAndAttachment asc = (ChannelAndAttachment)change;
405                             final SelectableChannel channel=asc._channel;
406                             final Object att = asc._attachment;
407                             
408                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
409                             {
410                                 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
411                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
412                                 key.attach(endpoint);
413                                 endpoint.schedule();
414                             }
415                             else if (channel.isOpen())
416                             {
417                                 channel.register(selector,SelectionKey.OP_CONNECT,att);
418                             }
419                         }
420                         else if (change instanceof SocketChannel)
421                         {
422                             // Newly registered channel
423                             final SocketChannel channel=(SocketChannel)change;
424                             SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
425                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
426                             key.attach(endpoint);
427                             endpoint.schedule();
428                         }
429                         else if (change instanceof Runnable)
430                         {
431                             dispatch((Runnable)change);
432                         }
433                         else
434                             throw new IllegalArgumentException(change.toString());
435                     }
436                     catch (Exception e)
437                     {
438                         if (isRunning())
439                             Log.warn(e);
440                         else
441                             Log.debug(e);
442                     }
443                     catch (Error e)
444                     {
445                         if (isRunning())
446                             Log.warn(e);
447                         else
448                             Log.debug(e);
449                     }
450                 }
451 
452 
453                 // Do and instant select to see if any connections can be handled.
454                 int selected=selector.selectNow();
455                 _selects++;
456 
457                 long now=System.currentTimeMillis();
458                 
459                 // if no immediate things to do
460                 if (selected==0 && selector.selectedKeys().isEmpty())
461                 {
462                     // If we are in pausing mode
463                     if (_pausing)
464                     {
465                         try
466                         {
467                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
468                         }
469                         catch(InterruptedException e)
470                         {
471                             Log.ignore(e);
472                         }
473                         now=System.currentTimeMillis();
474                     }
475 
476                     // workout how long to wait in select
477                     _timeout.setNow(now);
478                     long to_next_timeout=_timeout.getTimeToNext();
479 
480                     long wait = _changes.size()==0?__IDLE_TICK:0L;  
481                     if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
482                         wait = to_next_timeout;
483 
484                     // If we should wait with a select
485                     if (wait>0)
486                     {
487                         long before=now;
488                         selected=selector.select(wait);
489                         _selects++;
490                         now = System.currentTimeMillis();
491                         _timeout.setNow(now);
492                         
493                         if (__JVMBUG_THRESHHOLD>0)
494                             checkJvmBugs(before, now, wait, selected);
495                     }
496                 }
497                 
498                 // have we been destroyed while sleeping
499                 if (_selector==null || !selector.isOpen())
500                     return;
501 
502                 // Look for things to do
503                 for (SelectionKey key: selector.selectedKeys())
504                 {   
505                     try
506                     {
507                         if (!key.isValid())
508                         {
509                             key.cancel();
510                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
511                             if (endpoint != null)
512                                 endpoint.doUpdateKey();
513                             continue;
514                         }
515 
516                         Object att = key.attachment();
517                         if (att instanceof SelectChannelEndPoint)
518                         {
519                             if (key.isReadable()||key.isWritable())
520                                 ((SelectChannelEndPoint)att).schedule();
521                         }
522                         else if (key.isConnectable())
523                         {
524                             // Complete a connection of a registered channel
525                             SocketChannel channel = (SocketChannel)key.channel();
526                             boolean connected=false;
527                             try
528                             {
529                                 connected=channel.finishConnect();
530                             }
531                             catch(Exception e)
532                             {
533                                 connectionFailed(channel,e,att);
534                             }
535                             finally
536                             {
537                                 if (connected)
538                                 {
539                                     key.interestOps(SelectionKey.OP_READ);
540                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
541                                     key.attach(endpoint);
542                                     endpoint.schedule();
543                                 }
544                                 else
545                                 {
546                                     key.cancel();
547                                 }
548                             }
549                         }
550                         else
551                         {
552                             // Wrap readable registered channel in an endpoint
553                             SocketChannel channel = (SocketChannel)key.channel();
554                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
555                             key.attach(endpoint);
556                             if (key.isReadable())
557                                 endpoint.schedule();  
558                         }
559                         key = null;
560                     }
561                     catch (CancelledKeyException e)
562                     {
563                         Log.ignore(e);
564                     }
565                     catch (Exception e)
566                     {
567                         if (isRunning())
568                             Log.warn(e);
569                         else
570                             Log.ignore(e);
571 
572                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
573                             key.cancel();
574                     }
575                 }
576                 
577                 // Everything always handled
578                 selector.selectedKeys().clear();
579                 
580                 now=System.currentTimeMillis();
581                 _timeout.setNow(now);
582                 Task task = _timeout.expired();
583                 while (task!=null)
584                 {
585                     if (task instanceof Runnable)
586                         dispatch((Runnable)task);
587                     task = _timeout.expired();
588                 }
589 
590                 // Idle tick
591                 if (now-_idleTick>__IDLE_TICK)
592                 {
593                     _idleTick=now;
594                     
595                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
596                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
597                         :now;
598                         
599                     dispatch(new Runnable()
600                     {
601                         public void run()
602                         {
603                             for (SelectChannelEndPoint endp:_endPoints.keySet())
604                             {
605                                 endp.checkIdleTimestamp(idle_now);
606                             }
607                         }
608                     });
609                 }
610             }
611             catch (ClosedSelectorException e)
612             {
613                 if (isRunning())
614                     Log.warn(e);
615                 else
616                     Log.ignore(e);
617             }
618             catch (CancelledKeyException e)
619             {
620                 Log.ignore(e);
621             }
622             finally
623             {
624                 _selecting=null;
625             }
626         }
627         
628         /* ------------------------------------------------------------ */
629         private void checkJvmBugs(long before, long now, long wait, int selected)
630             throws IOException
631         {
632             Selector selector = _selector;
633             if (selector==null)
634                 return;
635                 
636             // Look for JVM bugs over a monitor period.
637             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
638             // http://bugs.sun.com/view_bug.do?bug_id=6693490
639             if (now>_monitorNext)
640             {
641                 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
642                 _pausing=_selects>__MAX_SELECTS;
643                 if (_pausing)
644                     _paused++;
645 
646                 _selects=0;
647                 _jvmBug=0;
648                 _monitorStart=now;
649                 _monitorNext=now+__MONITOR_PERIOD;
650             }
651 
652             if (now>_log)
653             {
654                 if (_paused>0)  
655                     Log.debug(this+" Busy selector - injecting delay "+_paused+" times");
656 
657                 if (_jvmFix2>0)
658                     Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
659 
660                 if (_jvmFix1>0)
661                     Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
662 
663                 else if(Log.isDebugEnabled() && _jvmFix0>0)
664                     Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
665                 _paused=0;
666                 _jvmFix2=0;
667                 _jvmFix1=0;
668                 _jvmFix0=0;
669                 _log=now+60000;
670             }
671 
672             // If we see signature of possible JVM bug, increment count.
673             if (selected==0 && wait>10 && (now-before)<(wait/2))
674             {
675                 // Increment bug count and try a work around
676                 _jvmBug++;
677                 if (_jvmBug>(__JVMBUG_THRESHHOLD))
678                 {
679                     try
680                     {
681                         if (_jvmBug==__JVMBUG_THRESHHOLD+1)
682                             _jvmFix2++;
683 
684                         Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
685                     }
686                     catch(InterruptedException e)
687                     {
688                         Log.ignore(e);
689                     }
690                 }
691                 else if (_jvmBug==__JVMBUG_THRESHHOLD)
692                 {
693                     synchronized (this)
694                     {
695                         // BLOODY SUN BUG !!!  Try refreshing the entire selector.
696                         final Selector new_selector = Selector.open();
697                         for (SelectionKey k: selector.keys())
698                         {
699                             if (!k.isValid() || k.interestOps()==0)
700                                 continue;
701 
702                             final SelectableChannel channel = k.channel();
703                             final Object attachment = k.attachment();
704 
705                             if (attachment==null)
706                                 addChange(channel);
707                             else
708                                 addChange(channel,attachment);
709                         }
710                         _selector.close();
711                         _selector=new_selector;
712                         return;
713                     }
714                 }
715                 else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
716                 {
717                     // Cancel keys with 0 interested ops
718                     int cancelled=0;
719                     for (SelectionKey k: selector.keys())
720                     {
721                         if (k.isValid()&&k.interestOps()==0)
722                         {
723                             k.cancel();
724                             cancelled++;
725                         }
726                     }
727                     if (cancelled>0)
728                         _jvmFix0++;
729 
730                     return;
731                 }
732             }
733             else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
734             {
735                 // Look for busy key
736                 SelectionKey busy = selector.selectedKeys().iterator().next();
737                 if (busy==_busyKey)
738                 {
739                     if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
740                     {
741                         final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
742                         Log.warn("Busy Key "+busy.channel()+" "+endpoint);
743                         busy.cancel();
744                         if (endpoint!=null)
745                         {
746                             dispatch(new Runnable()
747                             {
748                                 public void run()
749                                 {
750                                     try
751                                     {
752                                         endpoint.close();
753                                     }
754                                     catch (IOException e)
755                                     {
756                                         Log.ignore(e);
757                                     }
758                                 }
759                             });
760                         }
761                     }
762                 }
763                 else
764                     _busyKeyCount=0;
765                 _busyKey=busy;
766             }
767         }
768         
769         /* ------------------------------------------------------------ */
770         public SelectorManager getManager()
771         {
772             return SelectorManager.this;
773         }
774 
775         /* ------------------------------------------------------------ */
776         public long getNow()
777         {
778             return _timeout.getNow();
779         }
780 
781         /* ------------------------------------------------------------ */
782         /**
783          * @param task The task to timeout. If it implements Runnable, then 
784          * expired will be called from a dispatched thread.
785          * 
786          * @param timeoutMs
787          */
788         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
789         {
790             if (!(task instanceof Runnable))
791                 throw new IllegalArgumentException("!Runnable");
792             _timeout.schedule(task, timeoutMs);
793         }
794         
795         /* ------------------------------------------------------------ */
796         public void cancelTimeout(Timeout.Task task)
797         {
798             task.cancel();
799         }
800 
801         /* ------------------------------------------------------------ */
802         public void wakeup()
803         {
804             Selector selector = _selector;
805             if (selector!=null)
806                 selector.wakeup();
807         }
808         
809         /* ------------------------------------------------------------ */
810         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
811         {
812             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
813             endPointOpened(endp); 
814             _endPoints.put(endp,this);
815             return endp;
816         }
817         
818         /* ------------------------------------------------------------ */
819         public void destroyEndPoint(SelectChannelEndPoint endp)
820         {
821             _endPoints.remove(endp);
822             endPointClosed(endp);
823         }
824 
825         /* ------------------------------------------------------------ */
826         Selector getSelector()
827         {
828             return _selector;
829         }
830         
831         /* ------------------------------------------------------------ */
832         void stop() throws Exception
833         {
834             // Spin for a while waiting for selector to complete 
835             // to avoid unneccessary closed channel exceptions
836             try
837             {
838                 for (int i=0;i<100 && _selecting!=null;i++)
839                 {
840                     wakeup();
841                     Thread.sleep(10);
842                 }
843             }
844             catch(Exception e)
845             {
846                 Log.ignore(e);
847             }
848 
849             // close endpoints and selector
850             synchronized (this)
851             {
852                 for (SelectionKey key:_selector.keys())
853                 {
854                     if (key==null)
855                         continue;
856                     Object att=key.attachment();
857                     if (att instanceof EndPoint)
858                     {
859                         EndPoint endpoint = (EndPoint)att;
860                         try
861                         {
862                             endpoint.close();
863                         }
864                         catch(IOException e)
865                         {
866                             Log.ignore(e);
867                         }
868                     }
869                 }
870             
871             
872                 _timeout.cancelAll();
873                 try
874                 {
875                     if (_selector != null)
876                         _selector.close();
877                 }
878                 catch (IOException e)
879                 {
880                     Log.ignore(e);
881                 } 
882                 _selector=null;
883             }
884         }
885 
886         /* ------------------------------------------------------------ */
887         public String dump()
888         {
889             return AggregateLifeCycle.dump(this);
890         }
891 
892         /* ------------------------------------------------------------ */
893         public void dump(Appendable out, String indent) throws IOException
894         {
895             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
896             
897             Thread selecting = _selecting;
898             
899             Object where = "not selecting";
900             StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
901             if (trace!=null)
902             {
903                 for (StackTraceElement t:trace)
904                     if (t.getClassName().startsWith("org.eclipse.jetty."))
905                     {
906                         where=t;
907                         break;
908                     }
909             }
910 
911             Selector selector=_selector;
912             final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
913             dump.add(where);
914             
915             final CountDownLatch latch = new CountDownLatch(1);
916             
917             addChange(new Runnable(){
918                 public void run()
919                 {
920                     dumpKeyState(dump);
921                     latch.countDown();
922                 }
923             });
924             
925             try
926             {
927                 latch.await(5,TimeUnit.SECONDS);
928             }
929             catch(InterruptedException e)
930             {
931                 Log.ignore(e);
932             }
933             AggregateLifeCycle.dump(out,indent,dump);
934         }
935 
936         /* ------------------------------------------------------------ */
937         public void dumpKeyState(List<Object> dumpto)
938         {
939             Selector selector=_selector;
940             dumpto.add(selector+" keys="+selector.keys().size());
941             for (SelectionKey key: selector.keys())
942             {
943                 if (key.isValid())
944                     dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
945                 else
946                     dumpto.add(key.attachment()+" - - ");
947             }
948         }
949     }
950 
951     /* ------------------------------------------------------------ */
952     private static class ChannelAndAttachment
953     {
954         final SelectableChannel _channel;
955         final Object _attachment;
956         
957         public ChannelAndAttachment(SelectableChannel channel, Object attachment)
958         {
959             super();
960             _channel = channel;
961             _attachment = attachment;
962         }
963     }
964 
965     /* ------------------------------------------------------------ */
966     public boolean isDeferringInterestedOps0()
967     {
968         return _deferringInterestedOps0;
969     }
970 
971     /* ------------------------------------------------------------ */
972     public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
973     {
974         _deferringInterestedOps0 = deferringInterestedOps0;
975     }
976     
977 }