View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.Channel;
19  import java.nio.channels.ClosedSelectorException;
20  import java.nio.channels.SelectableChannel;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.Selector;
23  import java.nio.channels.ServerSocketChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.TimeUnit;
32  
33  
34  import org.eclipse.jetty.io.ConnectedEndPoint;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.util.TypeUtil;
38  import org.eclipse.jetty.util.component.AbstractLifeCycle;
39  import org.eclipse.jetty.util.component.AggregateLifeCycle;
40  import org.eclipse.jetty.util.component.Dumpable;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  import org.eclipse.jetty.util.thread.Timeout;
44  import org.eclipse.jetty.util.thread.Timeout.Task;
45  
46  
47  /* ------------------------------------------------------------ */
48  /**
49   * The Selector Manager manages and number of SelectSets to allow
50   * NIO scheduling to scale to large numbers of connections.
51   * <p>
52   * This class works around a number of know JVM bugs. For details
53   * see http://wiki.eclipse.org/Jetty/Feature/JVM_NIO_Bug
54   */
55  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
56  {
57      public static final Logger __log=Log.getLogger("org.eclipse.jetty.io.nio");
58      
59      // TODO Tune these by approx system speed.
60      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
61      private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
62      private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
63      private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
64      private static final int __BUSY_KEY=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_KEY",-1).intValue();
65      private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
66      
67      private int _maxIdleTime;
68      private int _lowResourcesMaxIdleTime;
69      private long _lowResourcesConnections;
70      private SelectSet[] _selectSet;
71      private int _selectSets=1;
72      private volatile int _set;
73      private boolean _deferringInterestedOps0=true;
74      
75      /* ------------------------------------------------------------ */
76      /**
77       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
78       * @see #setLowResourcesMaxIdleTime(long)
79       */
80      public void setMaxIdleTime(long maxIdleTime)
81      {
82          _maxIdleTime=(int)maxIdleTime;
83      }
84      
85      /* ------------------------------------------------------------ */
86      /**
87       * @param selectSets number of select sets to create
88       */
89      public void setSelectSets(int selectSets)
90      {
91          long lrc = _lowResourcesConnections * _selectSets; 
92          _selectSets=selectSets;
93          _lowResourcesConnections=lrc/_selectSets;
94      }
95      
96      /* ------------------------------------------------------------ */
97      /**
98       * @return the max idle time
99       */
100     public long getMaxIdleTime()
101     {
102         return _maxIdleTime;
103     }
104     
105     /* ------------------------------------------------------------ */
106     /**
107      * @return the number of select sets in use
108      */
109     public int getSelectSets()
110     {
111         return _selectSets;
112     }
113 
114     /* ------------------------------------------------------------ */
115     /**
116      * @param i 
117      * @return The select set
118      */
119     public SelectSet getSelectSet(int i)
120     {
121         return _selectSet[i];
122     }
123     
124     /* ------------------------------------------------------------ */
125     /** Register a channel
126      * @param channel
127      * @param att Attached Object
128      */
129     public void register(SocketChannel channel, Object att)
130     {
131         // The ++ increment here is not atomic, but it does not matter.
132         // so long as the value changes sometimes, then connections will
133         // be distributed over the available sets.
134         
135         int s=_set++; 
136         s=s%_selectSets;
137         SelectSet[] sets=_selectSet;
138         if (sets!=null)
139         {
140             SelectSet set=sets[s];
141             set.addChange(channel,att);
142             set.wakeup();
143         }
144     }
145 
146     
147     /* ------------------------------------------------------------ */
148     /** Register a channel
149      * @param channel
150      */
151     public void register(SocketChannel channel)
152     {
153         // The ++ increment here is not atomic, but it does not matter.
154         // so long as the value changes sometimes, then connections will
155         // be distributed over the available sets.
156         
157         int s=_set++; 
158         s=s%_selectSets;
159         SelectSet[] sets=_selectSet;
160         if (sets!=null)
161         {
162             SelectSet set=sets[s];
163             set.addChange(channel);
164             set.wakeup();
165         }
166     }
167     
168     /* ------------------------------------------------------------ */
169     /** Register a {@link ServerSocketChannel}
170      * @param acceptChannel
171      */
172     public void register(ServerSocketChannel acceptChannel)
173     {
174         int s=_set++; 
175         s=s%_selectSets;
176         SelectSet set=_selectSet[s];
177         set.addChange(acceptChannel);
178         set.wakeup();
179     }
180 
181     /* ------------------------------------------------------------ */
182     /**
183      * @return the lowResourcesConnections
184      */
185     public long getLowResourcesConnections()
186     {
187         return _lowResourcesConnections*_selectSets;
188     }
189 
190     /* ------------------------------------------------------------ */
191     /**
192      * Set the number of connections, which if exceeded places this manager in low resources state.
193      * This is not an exact measure as the connection count is averaged over the select sets.
194      * @param lowResourcesConnections the number of connections
195      * @see #setLowResourcesMaxIdleTime(long)
196      */
197     public void setLowResourcesConnections(long lowResourcesConnections)
198     {
199         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
200     }
201 
202     /* ------------------------------------------------------------ */
203     /**
204      * @return the lowResourcesMaxIdleTime
205      */
206     public long getLowResourcesMaxIdleTime()
207     {
208         return _lowResourcesMaxIdleTime;
209     }
210 
211     /* ------------------------------------------------------------ */
212     /**
213      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
214      * @see #setMaxIdleTime(long)
215      */
216     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
217     {
218         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
219     }
220     
221     /* ------------------------------------------------------------ */
222     /**
223      * @param acceptorID
224      * @throws IOException
225      */
226     public void doSelect(int acceptorID) throws IOException
227     {
228         SelectSet[] sets= _selectSet;
229         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
230             sets[acceptorID].doSelect();
231     }
232 
233     /* ------------------------------------------------------------------------------- */
234     public abstract boolean dispatch(Runnable task);
235 
236     /* ------------------------------------------------------------ */
237     /* (non-Javadoc)
238      * @see org.eclipse.component.AbstractLifeCycle#doStart()
239      */
240     @Override
241     protected void doStart() throws Exception
242     {
243         _selectSet = new SelectSet[_selectSets];
244         for (int i=0;i<_selectSet.length;i++)
245             _selectSet[i]= new SelectSet(i);
246 
247         super.doStart();
248     }
249 
250 
251     /* ------------------------------------------------------------------------------- */
252     @Override
253     protected void doStop() throws Exception
254     {
255         SelectSet[] sets= _selectSet;
256         _selectSet=null;
257         if (sets!=null)
258         {
259             for (SelectSet set : sets)
260             {
261                 if (set!=null)
262                     set.stop();
263             }
264         }
265         super.doStop();
266     }
267 
268     /* ------------------------------------------------------------ */
269     /**
270      * @param endpoint
271      */
272     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
273 
274     /* ------------------------------------------------------------ */
275     /**
276      * @param endpoint
277      */
278     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
279 
280     /* ------------------------------------------------------------ */
281     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
282 
283     /* ------------------------------------------------------------------------------- */
284     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
285 
286     /* ------------------------------------------------------------ */
287     /**
288      * Create a new end point
289      * @param channel
290      * @param selectSet
291      * @param sKey the selection key
292      * @return the new endpoint {@link SelectChannelEndPoint}
293      * @throws IOException
294      */
295     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
296 
297     /* ------------------------------------------------------------------------------- */
298     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
299     {
300         __log.warn(ex+","+channel+","+attachment);
301         __log.debug(ex);
302     }
303     
304     /* ------------------------------------------------------------ */
305     public String dump()
306     {
307         return AggregateLifeCycle.dump(this);
308     }
309 
310     /* ------------------------------------------------------------ */
311     public void dump(Appendable out, String indent) throws IOException
312     {
313         out.append(String.valueOf(this)).append("\n");
314         AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
315     }
316     
317     
318     /* ------------------------------------------------------------------------------- */
319     /* ------------------------------------------------------------------------------- */
320     /* ------------------------------------------------------------------------------- */
321     public class SelectSet implements Dumpable
322     {
323         private final int _setID;
324         private final Timeout _timeout;
325         
326         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
327         
328         private Selector _selector;
329         
330         private volatile Thread _selecting;
331         private int _jvmBug;
332         private int _selects;
333         private long _monitorStart;
334         private long _monitorNext;
335         private boolean _pausing;
336         private SelectionKey _busyKey;
337         private int _busyKeyCount;
338         private long _log;
339         private int _paused;
340         private int _jvmFix0;
341         private int _jvmFix1;
342         private int _jvmFix2;
343         private volatile long _idleTick;
344         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
345         
346         /* ------------------------------------------------------------ */
347         SelectSet(int acceptorID) throws Exception
348         {
349             _setID=acceptorID;
350 
351             _idleTick = System.currentTimeMillis();
352             _timeout = new Timeout(this);
353             _timeout.setDuration(0L);
354 
355             // create a selector;
356             _selector = Selector.open();
357             _monitorStart=System.currentTimeMillis();
358             _monitorNext=_monitorStart+__MONITOR_PERIOD;
359             _log=_monitorStart+60000;
360         }
361         
362         /* ------------------------------------------------------------ */
363         public void addChange(Object change)
364         {
365             _changes.add(change);
366         }
367 
368         /* ------------------------------------------------------------ */
369         public void addChange(SelectableChannel channel, Object att)
370         {   
371             if (att==null)
372                 addChange(channel);
373             else if (att instanceof EndPoint)
374                 addChange(att);
375             else
376                 addChange(new ChannelAndAttachment(channel,att));
377         }
378         
379         /* ------------------------------------------------------------ */
380         /**
381          * Select and dispatch tasks found from changes and the selector.
382          * 
383          * @throws IOException
384          */
385         public void doSelect() throws IOException
386         {
387             try
388             {
389                 _selecting=Thread.currentThread();
390                 final Selector selector=_selector;
391 
392                 // Make any key changes required
393                 Object change;
394                 int changes=_changes.size();
395                 while (changes-->0 && (change=_changes.poll())!=null)
396                 {
397                     Channel ch=null;
398                     SelectionKey key=null;
399                     
400                     try
401                     {
402                         if (change instanceof EndPoint)
403                         {
404                             // Update the operations for a key.
405                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
406                             ch=endpoint.getChannel();
407                             endpoint.doUpdateKey();
408                         }
409                         else if (change instanceof ChannelAndAttachment)
410                         {
411                             // finish accepting/connecting this connection
412                             final ChannelAndAttachment asc = (ChannelAndAttachment)change;
413                             final SelectableChannel channel=asc._channel;
414                             ch=channel;
415                             final Object att = asc._attachment;
416                             
417                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
418                             {
419                                 key = channel.register(selector,SelectionKey.OP_READ,att);
420                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
421                                 key.attach(endpoint);
422                                 endpoint.schedule();
423                             }
424                             else if (channel.isOpen())
425                             {
426                                 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
427                             }
428                         }
429                         else if (change instanceof SocketChannel)
430                         {
431                             // Newly registered channel
432                             final SocketChannel channel=(SocketChannel)change;
433                             ch=channel;
434                             key = channel.register(selector,SelectionKey.OP_READ,null);
435                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
436                             key.attach(endpoint);
437                             endpoint.schedule();
438                         }
439                         else if (change instanceof ChangeTask)
440                         {
441                             ((Runnable)change).run();
442                         }
443                         else if (change instanceof Runnable)
444                         {
445                             dispatch((Runnable)change);
446                         }
447                         else
448                             throw new IllegalArgumentException(change.toString());
449                     }
450                     catch (CancelledKeyException e)
451                     {
452                         __log.ignore(e);
453                     }
454                     catch (Throwable e)
455                     {
456                         if (e instanceof ThreadDeath)
457                             throw (ThreadDeath)e;
458                         
459                         if (isRunning())
460                             __log.warn(e);
461                         else
462                             __log.debug(e);
463 
464                         try
465                         {
466                             ch.close();
467                         }
468                         catch(IOException e2)
469                         {
470                             __log.debug(e2);
471                         }
472                     }
473                 }
474 
475 
476                 // Do and instant select to see if any connections can be handled.
477                 int selected=selector.selectNow();
478                 _selects++;
479 
480                 long now=System.currentTimeMillis();
481                 
482                 // if no immediate things to do
483                 if (selected==0 && selector.selectedKeys().isEmpty())
484                 {
485                     // If we are in pausing mode
486                     if (_pausing)
487                     {
488                         try
489                         {
490                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
491                         }
492                         catch(InterruptedException e)
493                         {
494                             __log.ignore(e);
495                         }
496                         now=System.currentTimeMillis();
497                     }
498 
499                     // workout how long to wait in select
500                     _timeout.setNow(now);
501                     long to_next_timeout=_timeout.getTimeToNext();
502 
503                     long wait = _changes.size()==0?__IDLE_TICK:0L;  
504                     if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
505                         wait = to_next_timeout;
506 
507                     // If we should wait with a select
508                     if (wait>0)
509                     {
510                         long before=now;
511                         selected=selector.select(wait);
512                         _selects++;
513                         now = System.currentTimeMillis();
514                         _timeout.setNow(now);
515                         
516                         if (__JVMBUG_THRESHHOLD>0)
517                             checkJvmBugs(before, now, wait, selected);
518                     }
519                 }
520                 
521                 // have we been destroyed while sleeping
522                 if (_selector==null || !selector.isOpen())
523                     return;
524 
525                 // Look for things to do
526                 for (SelectionKey key: selector.selectedKeys())
527                 {   
528                     SocketChannel channel=null;
529                     
530                     try
531                     {
532                         if (!key.isValid())
533                         {
534                             key.cancel();
535                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
536                             if (endpoint != null)
537                                 endpoint.doUpdateKey();
538                             continue;
539                         }
540 
541                         Object att = key.attachment();
542                         if (att instanceof SelectChannelEndPoint)
543                         {
544                             if (key.isReadable()||key.isWritable())
545                                 ((SelectChannelEndPoint)att).schedule();
546                         }
547                         else if (key.isConnectable())
548                         {
549                             // Complete a connection of a registered channel
550                             channel = (SocketChannel)key.channel();
551                             boolean connected=false;
552                             try
553                             {
554                                 connected=channel.finishConnect();
555                             }
556                             catch(Exception e)
557                             {
558                                 connectionFailed(channel,e,att);
559                             }
560                             finally
561                             {
562                                 if (connected)
563                                 {
564                                     key.interestOps(SelectionKey.OP_READ);
565                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
566                                     key.attach(endpoint);
567                                     endpoint.schedule();
568                                 }
569                                 else
570                                 {
571                                     key.cancel();
572                                 }
573                             }
574                         }
575                         else
576                         {
577                             // Wrap readable registered channel in an endpoint
578                             channel = (SocketChannel)key.channel();
579                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
580                             key.attach(endpoint);
581                             if (key.isReadable())
582                                 endpoint.schedule();  
583                         }
584                         key = null;
585                     }
586                     catch (CancelledKeyException e)
587                     {
588                         __log.ignore(e);
589                     }
590                     catch (Exception e)
591                     {
592                         if (isRunning())
593                             __log.warn(e);
594                         else
595                             __log.ignore(e);
596 
597                         try
598                         {
599                             if (channel!=null)
600                                 channel.close();
601                         }
602                         catch(IOException e2)
603                         {
604                             __log.debug(e2);
605                         }
606                         
607                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
608                             key.cancel();
609                     }
610                 }
611                 
612                 // Everything always handled
613                 selector.selectedKeys().clear();
614                 
615                 now=System.currentTimeMillis();
616                 _timeout.setNow(now);
617                 Task task = _timeout.expired();
618                 while (task!=null)
619                 {
620                     if (task instanceof Runnable)
621                         dispatch((Runnable)task);
622                     task = _timeout.expired();
623                 }
624 
625                 // Idle tick
626                 if (now-_idleTick>__IDLE_TICK)
627                 {
628                     _idleTick=now;
629                     
630                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
631                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
632                         :now;
633                         
634                     dispatch(new Runnable()
635                     {
636                         public void run()
637                         {
638                             for (SelectChannelEndPoint endp:_endPoints.keySet())
639                             {
640                                 endp.checkIdleTimestamp(idle_now);
641                             }
642                         }
643                     });
644                 }
645             }
646             catch (ClosedSelectorException e)
647             {
648                 if (isRunning())
649                     __log.warn(e);
650                 else
651                     __log.ignore(e);
652             }
653             catch (CancelledKeyException e)
654             {
655                 __log.ignore(e);
656             }
657             finally
658             {
659                 _selecting=null;
660             }
661         }
662         
663         /* ------------------------------------------------------------ */
664         private void checkJvmBugs(long before, long now, long wait, int selected)
665             throws IOException
666         {
667             Selector selector = _selector;
668             if (selector==null)
669                 return;
670                 
671             // Look for JVM bugs over a monitor period.
672             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
673             // http://bugs.sun.com/view_bug.do?bug_id=6693490
674             if (now>_monitorNext)
675             {
676                 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
677                 _pausing=_selects>__MAX_SELECTS;
678                 if (_pausing)
679                     _paused++;
680 
681                 _selects=0;
682                 _jvmBug=0;
683                 _monitorStart=now;
684                 _monitorNext=now+__MONITOR_PERIOD;
685             }
686 
687             if (now>_log)
688             {
689                 if (_paused>0)  
690                     __log.debug(this+" Busy selector - injecting delay "+_paused+" times");
691 
692                 if (_jvmFix2>0)
693                     __log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
694 
695                 if (_jvmFix1>0)
696                     __log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
697 
698                 else if(__log.isDebugEnabled() && _jvmFix0>0)
699                     __log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
700                 _paused=0;
701                 _jvmFix2=0;
702                 _jvmFix1=0;
703                 _jvmFix0=0;
704                 _log=now+60000;
705             }
706 
707             // If we see signature of possible JVM bug, increment count.
708             if (selected==0 && wait>10 && (now-before)<(wait/2))
709             {
710                 // Increment bug count and try a work around
711                 _jvmBug++;
712                 if (_jvmBug>(__JVMBUG_THRESHHOLD))
713                 {
714                     try
715                     {
716                         if (_jvmBug==__JVMBUG_THRESHHOLD+1)
717                             _jvmFix2++;
718 
719                         Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
720                     }
721                     catch(InterruptedException e)
722                     {
723                         __log.ignore(e);
724                     }
725                 }
726                 else if (_jvmBug==__JVMBUG_THRESHHOLD)
727                 {
728                     renewSelector();
729                 }
730                 else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
731                 {
732                     // Cancel keys with 0 interested ops
733                     int cancelled=0;
734                     for (SelectionKey k: selector.keys())
735                     {
736                         if (k.isValid()&&k.interestOps()==0)
737                         {
738                             k.cancel();
739                             cancelled++;
740                         }
741                     }
742                     if (cancelled>0)
743                         _jvmFix0++;
744 
745                     return;
746                 }
747             }
748             else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
749             {
750                 // Look for busy key
751                 SelectionKey busy = selector.selectedKeys().iterator().next();
752                 if (busy==_busyKey)
753                 {
754                     if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
755                     {
756                         final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
757                         __log.warn("Busy Key "+busy.channel()+" "+endpoint);
758                         busy.cancel();
759                         if (endpoint!=null)
760                         {
761                             dispatch(new Runnable()
762                             {
763                                 public void run()
764                                 {
765                                     try
766                                     {
767                                         endpoint.close();
768                                     }
769                                     catch (IOException e)
770                                     {
771                                         __log.ignore(e);
772                                     }
773                                 }
774                             });
775                         }
776                     }
777                 }
778                 else
779                     _busyKeyCount=0;
780                 _busyKey=busy;
781             }
782         }
783         
784         /* ------------------------------------------------------------ */
785         private void renewSelector() 
786         {
787             try
788             {
789                 synchronized (this)
790                 {
791                     Selector selector=_selector;
792                     if (selector==null)
793                         return;
794                     final Selector new_selector = Selector.open();
795                     for (SelectionKey k: selector.keys())
796                     {
797                         if (!k.isValid() || k.interestOps()==0)
798                             continue;
799 
800                         final SelectableChannel channel = k.channel();
801                         final Object attachment = k.attachment();
802 
803                         if (attachment==null)
804                             addChange(channel);
805                         else
806                             addChange(channel,attachment);
807                     }
808                     _selector.close();
809                     _selector=new_selector;
810                 }
811             }
812             catch(IOException e)
813             {
814                 throw new RuntimeException("recreating selector",e);
815             }
816         }
817         
818         /* ------------------------------------------------------------ */
819         public SelectorManager getManager()
820         {
821             return SelectorManager.this;
822         }
823 
824         /* ------------------------------------------------------------ */
825         public long getNow()
826         {
827             return _timeout.getNow();
828         }
829 
830         /* ------------------------------------------------------------ */
831         /**
832          * @param task The task to timeout. If it implements Runnable, then 
833          * expired will be called from a dispatched thread.
834          * 
835          * @param timeoutMs
836          */
837         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
838         {
839             if (!(task instanceof Runnable))
840                 throw new IllegalArgumentException("!Runnable");
841             _timeout.schedule(task, timeoutMs);
842         }
843         
844         /* ------------------------------------------------------------ */
845         public void cancelTimeout(Timeout.Task task)
846         {
847             task.cancel();
848         }
849 
850         /* ------------------------------------------------------------ */
851         public void wakeup()
852         {
853             try
854             {
855                 Selector selector = _selector;
856                 if (selector!=null)
857                     selector.wakeup();
858             }
859             catch(Exception e)
860             {
861                 addChange(new ChangeTask()
862                 {
863                     public void run()
864                     {
865                         renewSelector();
866                     }
867                 });
868                 
869                 renewSelector();
870             }
871         }
872         
873         /* ------------------------------------------------------------ */
874         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
875         {
876             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
877             endPointOpened(endp); 
878             _endPoints.put(endp,this);
879             return endp;
880         }
881         
882         /* ------------------------------------------------------------ */
883         public void destroyEndPoint(SelectChannelEndPoint endp)
884         {
885             _endPoints.remove(endp);
886             endPointClosed(endp);
887         }
888 
889         /* ------------------------------------------------------------ */
890         Selector getSelector()
891         {
892             return _selector;
893         }
894         
895         /* ------------------------------------------------------------ */
896         void stop() throws Exception
897         {
898             // Spin for a while waiting for selector to complete 
899             // to avoid unneccessary closed channel exceptions
900             try
901             {
902                 for (int i=0;i<100 && _selecting!=null;i++)
903                 {
904                     wakeup();
905                     Thread.sleep(10);
906                 }
907             }
908             catch(Exception e)
909             {
910                 __log.ignore(e);
911             }
912 
913             // close endpoints and selector
914             synchronized (this)
915             {
916                 for (SelectionKey key:_selector.keys())
917                 {
918                     if (key==null)
919                         continue;
920                     Object att=key.attachment();
921                     if (att instanceof EndPoint)
922                     {
923                         EndPoint endpoint = (EndPoint)att;
924                         try
925                         {
926                             endpoint.close();
927                         }
928                         catch(IOException e)
929                         {
930                             __log.ignore(e);
931                         }
932                     }
933                 }
934             
935             
936                 _timeout.cancelAll();
937                 try
938                 {
939                     if (_selector != null)
940                         _selector.close();
941                 }
942                 catch (IOException e)
943                 {
944                     __log.ignore(e);
945                 } 
946                 _selector=null;
947             }
948         }
949 
950         /* ------------------------------------------------------------ */
951         public String dump()
952         {
953             return AggregateLifeCycle.dump(this);
954         }
955 
956         /* ------------------------------------------------------------ */
957         public void dump(Appendable out, String indent) throws IOException
958         {
959             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
960             
961             Thread selecting = _selecting;
962             
963             Object where = "not selecting";
964             StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
965             if (trace!=null)
966             {
967                 for (StackTraceElement t:trace)
968                     if (t.getClassName().startsWith("org.eclipse.jetty."))
969                     {
970                         where=t;
971                         break;
972                     }
973             }
974 
975             Selector selector=_selector;
976             final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
977             dump.add(where);
978             
979             final CountDownLatch latch = new CountDownLatch(1);
980             
981             addChange(new Runnable(){
982                 public void run()
983                 {
984                     dumpKeyState(dump);
985                     latch.countDown();
986                 }
987             });
988             
989             try
990             {
991                 latch.await(5,TimeUnit.SECONDS);
992             }
993             catch(InterruptedException e)
994             {
995                 __log.ignore(e);
996             }
997             AggregateLifeCycle.dump(out,indent,dump);
998         }
999 
1000         /* ------------------------------------------------------------ */
1001         public void dumpKeyState(List<Object> dumpto)
1002         {
1003             Selector selector=_selector;
1004             dumpto.add(selector+" keys="+selector.keys().size());
1005             for (SelectionKey key: selector.keys())
1006             {
1007                 if (key.isValid())
1008                     dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
1009                 else
1010                     dumpto.add(key.attachment()+" - - ");
1011             }
1012         }
1013     }
1014 
1015     /* ------------------------------------------------------------ */
1016     private static class ChannelAndAttachment
1017     {
1018         final SelectableChannel _channel;
1019         final Object _attachment;
1020         
1021         public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1022         {
1023             super();
1024             _channel = channel;
1025             _attachment = attachment;
1026         }
1027     }
1028 
1029     /* ------------------------------------------------------------ */
1030     public boolean isDeferringInterestedOps0()
1031     {
1032         return _deferringInterestedOps0;
1033     }
1034 
1035     /* ------------------------------------------------------------ */
1036     public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1037     {
1038         _deferringInterestedOps0 = deferringInterestedOps0;
1039     }
1040     
1041 
1042     /* ------------------------------------------------------------ */
1043     /* ------------------------------------------------------------ */
1044     /* ------------------------------------------------------------ */
1045     private interface ChangeTask extends Runnable
1046     {}
1047     
1048 }