View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.Channel;
19  import java.nio.channels.ClosedSelectorException;
20  import java.nio.channels.SelectableChannel;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.Selector;
23  import java.nio.channels.ServerSocketChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.TimeUnit;
32  
33  import javax.management.RuntimeErrorException;
34  
35  import org.eclipse.jetty.io.ConnectedEndPoint;
36  import org.eclipse.jetty.io.Connection;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.util.TypeUtil;
39  import org.eclipse.jetty.util.component.AbstractLifeCycle;
40  import org.eclipse.jetty.util.component.AggregateLifeCycle;
41  import org.eclipse.jetty.util.component.Dumpable;
42  import org.eclipse.jetty.util.log.Log;
43  import org.eclipse.jetty.util.thread.Timeout;
44  import org.eclipse.jetty.util.thread.Timeout.Task;
45  
46  
47  /* ------------------------------------------------------------ */
48  /**
49   * The Selector Manager manages and number of SelectSets to allow
50   * NIO scheduling to scale to large numbers of connections.
51   * <p>
52   * This class works around a number of know JVM bugs. For details
53   * see http://wiki.eclipse.org/Jetty/Feature/JVM_NIO_Bug
54   */
55  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
56  {
57      // TODO Tune these by approx system speed.
58      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
59      private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
60      private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
61      private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
62      private static final int __BUSY_KEY=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_KEY",-1).intValue();
63      private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
64      
65      private int _maxIdleTime;
66      private int _lowResourcesMaxIdleTime;
67      private long _lowResourcesConnections;
68      private SelectSet[] _selectSet;
69      private int _selectSets=1;
70      private volatile int _set;
71      private boolean _deferringInterestedOps0=true;
72      
73      /* ------------------------------------------------------------ */
74      /**
75       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
76       * @see #setLowResourcesMaxIdleTime(long)
77       */
78      public void setMaxIdleTime(long maxIdleTime)
79      {
80          _maxIdleTime=(int)maxIdleTime;
81      }
82      
83      /* ------------------------------------------------------------ */
84      /**
85       * @param selectSets number of select sets to create
86       */
87      public void setSelectSets(int selectSets)
88      {
89          long lrc = _lowResourcesConnections * _selectSets; 
90          _selectSets=selectSets;
91          _lowResourcesConnections=lrc/_selectSets;
92      }
93      
94      /* ------------------------------------------------------------ */
95      /**
96       * @return the max idle time
97       */
98      public long getMaxIdleTime()
99      {
100         return _maxIdleTime;
101     }
102     
103     /* ------------------------------------------------------------ */
104     /**
105      * @return the number of select sets in use
106      */
107     public int getSelectSets()
108     {
109         return _selectSets;
110     }
111 
112     /* ------------------------------------------------------------ */
113     /**
114      * @param i 
115      * @return The select set
116      */
117     public SelectSet getSelectSet(int i)
118     {
119         return _selectSet[i];
120     }
121     
122     /* ------------------------------------------------------------ */
123     /** Register a channel
124      * @param channel
125      * @param att Attached Object
126      */
127     public void register(SocketChannel channel, Object att)
128     {
129         // The ++ increment here is not atomic, but it does not matter.
130         // so long as the value changes sometimes, then connections will
131         // be distributed over the available sets.
132         
133         int s=_set++; 
134         s=s%_selectSets;
135         SelectSet[] sets=_selectSet;
136         if (sets!=null)
137         {
138             SelectSet set=sets[s];
139             set.addChange(channel,att);
140             set.wakeup();
141         }
142     }
143 
144     
145     /* ------------------------------------------------------------ */
146     /** Register a channel
147      * @param channel
148      */
149     public void register(SocketChannel channel)
150     {
151         // The ++ increment here is not atomic, but it does not matter.
152         // so long as the value changes sometimes, then connections will
153         // be distributed over the available sets.
154         
155         int s=_set++; 
156         s=s%_selectSets;
157         SelectSet[] sets=_selectSet;
158         if (sets!=null)
159         {
160             SelectSet set=sets[s];
161             set.addChange(channel);
162             set.wakeup();
163         }
164     }
165     
166     /* ------------------------------------------------------------ */
167     /** Register a {@link ServerSocketChannel}
168      * @param acceptChannel
169      */
170     public void register(ServerSocketChannel acceptChannel)
171     {
172         int s=_set++; 
173         s=s%_selectSets;
174         SelectSet set=_selectSet[s];
175         set.addChange(acceptChannel);
176         set.wakeup();
177     }
178 
179     /* ------------------------------------------------------------ */
180     /**
181      * @return the lowResourcesConnections
182      */
183     public long getLowResourcesConnections()
184     {
185         return _lowResourcesConnections*_selectSets;
186     }
187 
188     /* ------------------------------------------------------------ */
189     /**
190      * Set the number of connections, which if exceeded places this manager in low resources state.
191      * This is not an exact measure as the connection count is averaged over the select sets.
192      * @param lowResourcesConnections the number of connections
193      * @see #setLowResourcesMaxIdleTime(long)
194      */
195     public void setLowResourcesConnections(long lowResourcesConnections)
196     {
197         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
198     }
199 
200     /* ------------------------------------------------------------ */
201     /**
202      * @return the lowResourcesMaxIdleTime
203      */
204     public long getLowResourcesMaxIdleTime()
205     {
206         return _lowResourcesMaxIdleTime;
207     }
208 
209     /* ------------------------------------------------------------ */
210     /**
211      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
212      * @see #setMaxIdleTime(long)
213      */
214     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
215     {
216         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
217     }
218     
219     /* ------------------------------------------------------------ */
220     /**
221      * @param acceptorID
222      * @throws IOException
223      */
224     public void doSelect(int acceptorID) throws IOException
225     {
226         SelectSet[] sets= _selectSet;
227         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
228             sets[acceptorID].doSelect();
229     }
230 
231     /* ------------------------------------------------------------------------------- */
232     public abstract boolean dispatch(Runnable task);
233 
234     /* ------------------------------------------------------------ */
235     /* (non-Javadoc)
236      * @see org.eclipse.component.AbstractLifeCycle#doStart()
237      */
238     @Override
239     protected void doStart() throws Exception
240     {
241         _selectSet = new SelectSet[_selectSets];
242         for (int i=0;i<_selectSet.length;i++)
243             _selectSet[i]= new SelectSet(i);
244 
245         super.doStart();
246     }
247 
248 
249     /* ------------------------------------------------------------------------------- */
250     @Override
251     protected void doStop() throws Exception
252     {
253         SelectSet[] sets= _selectSet;
254         _selectSet=null;
255         if (sets!=null)
256         {
257             for (SelectSet set : sets)
258             {
259                 if (set!=null)
260                     set.stop();
261             }
262         }
263         super.doStop();
264     }
265 
266     /* ------------------------------------------------------------ */
267     /**
268      * @param endpoint
269      */
270     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
271 
272     /* ------------------------------------------------------------ */
273     /**
274      * @param endpoint
275      */
276     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
277 
278     /* ------------------------------------------------------------ */
279     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
280 
281     /* ------------------------------------------------------------------------------- */
282     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
283 
284     /* ------------------------------------------------------------ */
285     /**
286      * Create a new end point
287      * @param channel
288      * @param selectSet
289      * @param sKey the selection key
290      * @return the new endpoint {@link SelectChannelEndPoint}
291      * @throws IOException
292      */
293     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
294 
295     /* ------------------------------------------------------------------------------- */
296     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
297     {
298         Log.warn(ex+","+channel+","+attachment);
299         Log.debug(ex);
300     }
301     
302     /* ------------------------------------------------------------ */
303     public String dump()
304     {
305         return AggregateLifeCycle.dump(this);
306     }
307 
308     /* ------------------------------------------------------------ */
309     public void dump(Appendable out, String indent) throws IOException
310     {
311         out.append(String.valueOf(this)).append("\n");
312         AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
313     }
314     
315     
316     /* ------------------------------------------------------------------------------- */
317     /* ------------------------------------------------------------------------------- */
318     /* ------------------------------------------------------------------------------- */
319     public class SelectSet implements Dumpable
320     {
321         private final int _setID;
322         private final Timeout _timeout;
323         
324         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
325         
326         private Selector _selector;
327         
328         private volatile Thread _selecting;
329         private int _jvmBug;
330         private int _selects;
331         private long _monitorStart;
332         private long _monitorNext;
333         private boolean _pausing;
334         private SelectionKey _busyKey;
335         private int _busyKeyCount;
336         private long _log;
337         private int _paused;
338         private int _jvmFix0;
339         private int _jvmFix1;
340         private int _jvmFix2;
341         private volatile long _idleTick;
342         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
343         
344         /* ------------------------------------------------------------ */
345         SelectSet(int acceptorID) throws Exception
346         {
347             _setID=acceptorID;
348 
349             _idleTick = System.currentTimeMillis();
350             _timeout = new Timeout(this);
351             _timeout.setDuration(0L);
352 
353             // create a selector;
354             _selector = Selector.open();
355             _monitorStart=System.currentTimeMillis();
356             _monitorNext=_monitorStart+__MONITOR_PERIOD;
357             _log=_monitorStart+60000;
358         }
359         
360         /* ------------------------------------------------------------ */
361         public void addChange(Object change)
362         {
363             _changes.add(change);
364         }
365 
366         /* ------------------------------------------------------------ */
367         public void addChange(SelectableChannel channel, Object att)
368         {   
369             if (att==null)
370                 addChange(channel);
371             else if (att instanceof EndPoint)
372                 addChange(att);
373             else
374                 addChange(new ChannelAndAttachment(channel,att));
375         }
376         
377         /* ------------------------------------------------------------ */
378         /**
379          * Select and dispatch tasks found from changes and the selector.
380          * 
381          * @throws IOException
382          */
383         public void doSelect() throws IOException
384         {
385             try
386             {
387                 _selecting=Thread.currentThread();
388                 final Selector selector=_selector;
389 
390                 // Make any key changes required
391                 Object change;
392                 int changes=_changes.size();
393                 while (changes-->0 && (change=_changes.poll())!=null)
394                 {
395                     Channel ch=null;
396                     SelectionKey key=null;
397                     
398                     try
399                     {
400                         if (change instanceof EndPoint)
401                         {
402                             // Update the operations for a key.
403                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
404                             ch=endpoint.getChannel();
405                             endpoint.doUpdateKey();
406                         }
407                         else if (change instanceof ChannelAndAttachment)
408                         {
409                             // finish accepting/connecting this connection
410                             final ChannelAndAttachment asc = (ChannelAndAttachment)change;
411                             final SelectableChannel channel=asc._channel;
412                             ch=channel;
413                             final Object att = asc._attachment;
414                             
415                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
416                             {
417                                 key = channel.register(selector,SelectionKey.OP_READ,att);
418                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
419                                 key.attach(endpoint);
420                                 endpoint.schedule();
421                             }
422                             else if (channel.isOpen())
423                             {
424                                 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
425                             }
426                         }
427                         else if (change instanceof SocketChannel)
428                         {
429                             // Newly registered channel
430                             final SocketChannel channel=(SocketChannel)change;
431                             ch=channel;
432                             key = channel.register(selector,SelectionKey.OP_READ,null);
433                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
434                             key.attach(endpoint);
435                             endpoint.schedule();
436                         }
437                         else if (change instanceof ChangeTask)
438                         {
439                             ((Runnable)change).run();
440                         }
441                         else if (change instanceof Runnable)
442                         {
443                             dispatch((Runnable)change);
444                         }
445                         else
446                             throw new IllegalArgumentException(change.toString());
447                     }
448                     catch (CancelledKeyException e)
449                     {
450                         Log.ignore(e);
451                     }
452                     catch (Throwable e)
453                     {
454                         if (e instanceof ThreadDeath)
455                             throw (ThreadDeath)e;
456                         
457                         if (isRunning())
458                             Log.warn(e);
459                         else
460                             Log.debug(e);
461 
462                         try
463                         {
464                             ch.close();
465                         }
466                         catch(IOException e2)
467                         {
468                             Log.debug(e2);
469                         }
470                     }
471                 }
472 
473 
474                 // Do and instant select to see if any connections can be handled.
475                 int selected=selector.selectNow();
476                 _selects++;
477 
478                 long now=System.currentTimeMillis();
479                 
480                 // if no immediate things to do
481                 if (selected==0 && selector.selectedKeys().isEmpty())
482                 {
483                     // If we are in pausing mode
484                     if (_pausing)
485                     {
486                         try
487                         {
488                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
489                         }
490                         catch(InterruptedException e)
491                         {
492                             Log.ignore(e);
493                         }
494                         now=System.currentTimeMillis();
495                     }
496 
497                     // workout how long to wait in select
498                     _timeout.setNow(now);
499                     long to_next_timeout=_timeout.getTimeToNext();
500 
501                     long wait = _changes.size()==0?__IDLE_TICK:0L;  
502                     if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
503                         wait = to_next_timeout;
504 
505                     // If we should wait with a select
506                     if (wait>0)
507                     {
508                         long before=now;
509                         selected=selector.select(wait);
510                         _selects++;
511                         now = System.currentTimeMillis();
512                         _timeout.setNow(now);
513                         
514                         if (__JVMBUG_THRESHHOLD>0)
515                             checkJvmBugs(before, now, wait, selected);
516                     }
517                 }
518                 
519                 // have we been destroyed while sleeping
520                 if (_selector==null || !selector.isOpen())
521                     return;
522 
523                 // Look for things to do
524                 for (SelectionKey key: selector.selectedKeys())
525                 {   
526                     SocketChannel channel=null;
527                     
528                     try
529                     {
530                         if (!key.isValid())
531                         {
532                             key.cancel();
533                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
534                             if (endpoint != null)
535                                 endpoint.doUpdateKey();
536                             continue;
537                         }
538 
539                         Object att = key.attachment();
540                         if (att instanceof SelectChannelEndPoint)
541                         {
542                             if (key.isReadable()||key.isWritable())
543                                 ((SelectChannelEndPoint)att).schedule();
544                         }
545                         else if (key.isConnectable())
546                         {
547                             // Complete a connection of a registered channel
548                             channel = (SocketChannel)key.channel();
549                             boolean connected=false;
550                             try
551                             {
552                                 connected=channel.finishConnect();
553                             }
554                             catch(Exception e)
555                             {
556                                 connectionFailed(channel,e,att);
557                             }
558                             finally
559                             {
560                                 if (connected)
561                                 {
562                                     key.interestOps(SelectionKey.OP_READ);
563                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
564                                     key.attach(endpoint);
565                                     endpoint.schedule();
566                                 }
567                                 else
568                                 {
569                                     key.cancel();
570                                 }
571                             }
572                         }
573                         else
574                         {
575                             // Wrap readable registered channel in an endpoint
576                             channel = (SocketChannel)key.channel();
577                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
578                             key.attach(endpoint);
579                             if (key.isReadable())
580                                 endpoint.schedule();  
581                         }
582                         key = null;
583                     }
584                     catch (CancelledKeyException e)
585                     {
586                         Log.ignore(e);
587                     }
588                     catch (Exception e)
589                     {
590                         if (isRunning())
591                             Log.warn(e);
592                         else
593                             Log.ignore(e);
594 
595                         try
596                         {
597                             if (channel!=null)
598                                 channel.close();
599                         }
600                         catch(IOException e2)
601                         {
602                             Log.debug(e2);
603                         }
604                         
605                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
606                             key.cancel();
607                     }
608                 }
609                 
610                 // Everything always handled
611                 selector.selectedKeys().clear();
612                 
613                 now=System.currentTimeMillis();
614                 _timeout.setNow(now);
615                 Task task = _timeout.expired();
616                 while (task!=null)
617                 {
618                     if (task instanceof Runnable)
619                         dispatch((Runnable)task);
620                     task = _timeout.expired();
621                 }
622 
623                 // Idle tick
624                 if (now-_idleTick>__IDLE_TICK)
625                 {
626                     _idleTick=now;
627                     
628                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
629                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
630                         :now;
631                         
632                     dispatch(new Runnable()
633                     {
634                         public void run()
635                         {
636                             for (SelectChannelEndPoint endp:_endPoints.keySet())
637                             {
638                                 endp.checkIdleTimestamp(idle_now);
639                             }
640                         }
641                     });
642                 }
643             }
644             catch (ClosedSelectorException e)
645             {
646                 if (isRunning())
647                     Log.warn(e);
648                 else
649                     Log.ignore(e);
650             }
651             catch (CancelledKeyException e)
652             {
653                 Log.ignore(e);
654             }
655             finally
656             {
657                 _selecting=null;
658             }
659         }
660         
661         /* ------------------------------------------------------------ */
662         private void checkJvmBugs(long before, long now, long wait, int selected)
663             throws IOException
664         {
665             Selector selector = _selector;
666             if (selector==null)
667                 return;
668                 
669             // Look for JVM bugs over a monitor period.
670             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
671             // http://bugs.sun.com/view_bug.do?bug_id=6693490
672             if (now>_monitorNext)
673             {
674                 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
675                 _pausing=_selects>__MAX_SELECTS;
676                 if (_pausing)
677                     _paused++;
678 
679                 _selects=0;
680                 _jvmBug=0;
681                 _monitorStart=now;
682                 _monitorNext=now+__MONITOR_PERIOD;
683             }
684 
685             if (now>_log)
686             {
687                 if (_paused>0)  
688                     Log.debug(this+" Busy selector - injecting delay "+_paused+" times");
689 
690                 if (_jvmFix2>0)
691                     Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
692 
693                 if (_jvmFix1>0)
694                     Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
695 
696                 else if(Log.isDebugEnabled() && _jvmFix0>0)
697                     Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
698                 _paused=0;
699                 _jvmFix2=0;
700                 _jvmFix1=0;
701                 _jvmFix0=0;
702                 _log=now+60000;
703             }
704 
705             // If we see signature of possible JVM bug, increment count.
706             if (selected==0 && wait>10 && (now-before)<(wait/2))
707             {
708                 // Increment bug count and try a work around
709                 _jvmBug++;
710                 if (_jvmBug>(__JVMBUG_THRESHHOLD))
711                 {
712                     try
713                     {
714                         if (_jvmBug==__JVMBUG_THRESHHOLD+1)
715                             _jvmFix2++;
716 
717                         Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
718                     }
719                     catch(InterruptedException e)
720                     {
721                         Log.ignore(e);
722                     }
723                 }
724                 else if (_jvmBug==__JVMBUG_THRESHHOLD)
725                 {
726                     renewSelector();
727                 }
728                 else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
729                 {
730                     // Cancel keys with 0 interested ops
731                     int cancelled=0;
732                     for (SelectionKey k: selector.keys())
733                     {
734                         if (k.isValid()&&k.interestOps()==0)
735                         {
736                             k.cancel();
737                             cancelled++;
738                         }
739                     }
740                     if (cancelled>0)
741                         _jvmFix0++;
742 
743                     return;
744                 }
745             }
746             else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
747             {
748                 // Look for busy key
749                 SelectionKey busy = selector.selectedKeys().iterator().next();
750                 if (busy==_busyKey)
751                 {
752                     if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
753                     {
754                         final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
755                         Log.warn("Busy Key "+busy.channel()+" "+endpoint);
756                         busy.cancel();
757                         if (endpoint!=null)
758                         {
759                             dispatch(new Runnable()
760                             {
761                                 public void run()
762                                 {
763                                     try
764                                     {
765                                         endpoint.close();
766                                     }
767                                     catch (IOException e)
768                                     {
769                                         Log.ignore(e);
770                                     }
771                                 }
772                             });
773                         }
774                     }
775                 }
776                 else
777                     _busyKeyCount=0;
778                 _busyKey=busy;
779             }
780         }
781         
782         /* ------------------------------------------------------------ */
783         private void renewSelector() 
784         {
785             try
786             {
787                 synchronized (this)
788                 {
789                     Selector selector=_selector;
790                     if (selector==null)
791                         return;
792                     final Selector new_selector = Selector.open();
793                     for (SelectionKey k: selector.keys())
794                     {
795                         if (!k.isValid() || k.interestOps()==0)
796                             continue;
797 
798                         final SelectableChannel channel = k.channel();
799                         final Object attachment = k.attachment();
800 
801                         if (attachment==null)
802                             addChange(channel);
803                         else
804                             addChange(channel,attachment);
805                     }
806                     _selector.close();
807                     _selector=new_selector;
808                 }
809             }
810             catch(IOException e)
811             {
812                 throw new RuntimeException("recreating selector",e);
813             }
814         }
815         
816         /* ------------------------------------------------------------ */
817         public SelectorManager getManager()
818         {
819             return SelectorManager.this;
820         }
821 
822         /* ------------------------------------------------------------ */
823         public long getNow()
824         {
825             return _timeout.getNow();
826         }
827 
828         /* ------------------------------------------------------------ */
829         /**
830          * @param task The task to timeout. If it implements Runnable, then 
831          * expired will be called from a dispatched thread.
832          * 
833          * @param timeoutMs
834          */
835         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
836         {
837             if (!(task instanceof Runnable))
838                 throw new IllegalArgumentException("!Runnable");
839             _timeout.schedule(task, timeoutMs);
840         }
841         
842         /* ------------------------------------------------------------ */
843         public void cancelTimeout(Timeout.Task task)
844         {
845             task.cancel();
846         }
847 
848         /* ------------------------------------------------------------ */
849         public void wakeup()
850         {
851             try
852             {
853                 Selector selector = _selector;
854                 if (selector!=null)
855                     selector.wakeup();
856             }
857             catch(Exception e)
858             {
859                 addChange(new ChangeTask()
860                 {
861                     public void run()
862                     {
863                         renewSelector();
864                     }
865                 });
866                 
867                 renewSelector();
868             }
869         }
870         
871         /* ------------------------------------------------------------ */
872         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
873         {
874             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
875             endPointOpened(endp); 
876             _endPoints.put(endp,this);
877             return endp;
878         }
879         
880         /* ------------------------------------------------------------ */
881         public void destroyEndPoint(SelectChannelEndPoint endp)
882         {
883             _endPoints.remove(endp);
884             endPointClosed(endp);
885         }
886 
887         /* ------------------------------------------------------------ */
888         Selector getSelector()
889         {
890             return _selector;
891         }
892         
893         /* ------------------------------------------------------------ */
894         void stop() throws Exception
895         {
896             // Spin for a while waiting for selector to complete 
897             // to avoid unneccessary closed channel exceptions
898             try
899             {
900                 for (int i=0;i<100 && _selecting!=null;i++)
901                 {
902                     wakeup();
903                     Thread.sleep(10);
904                 }
905             }
906             catch(Exception e)
907             {
908                 Log.ignore(e);
909             }
910 
911             // close endpoints and selector
912             synchronized (this)
913             {
914                 for (SelectionKey key:_selector.keys())
915                 {
916                     if (key==null)
917                         continue;
918                     Object att=key.attachment();
919                     if (att instanceof EndPoint)
920                     {
921                         EndPoint endpoint = (EndPoint)att;
922                         try
923                         {
924                             endpoint.close();
925                         }
926                         catch(IOException e)
927                         {
928                             Log.ignore(e);
929                         }
930                     }
931                 }
932             
933             
934                 _timeout.cancelAll();
935                 try
936                 {
937                     if (_selector != null)
938                         _selector.close();
939                 }
940                 catch (IOException e)
941                 {
942                     Log.ignore(e);
943                 } 
944                 _selector=null;
945             }
946         }
947 
948         /* ------------------------------------------------------------ */
949         public String dump()
950         {
951             return AggregateLifeCycle.dump(this);
952         }
953 
954         /* ------------------------------------------------------------ */
955         public void dump(Appendable out, String indent) throws IOException
956         {
957             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
958             
959             Thread selecting = _selecting;
960             
961             Object where = "not selecting";
962             StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
963             if (trace!=null)
964             {
965                 for (StackTraceElement t:trace)
966                     if (t.getClassName().startsWith("org.eclipse.jetty."))
967                     {
968                         where=t;
969                         break;
970                     }
971             }
972 
973             Selector selector=_selector;
974             final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
975             dump.add(where);
976             
977             final CountDownLatch latch = new CountDownLatch(1);
978             
979             addChange(new Runnable(){
980                 public void run()
981                 {
982                     dumpKeyState(dump);
983                     latch.countDown();
984                 }
985             });
986             
987             try
988             {
989                 latch.await(5,TimeUnit.SECONDS);
990             }
991             catch(InterruptedException e)
992             {
993                 Log.ignore(e);
994             }
995             AggregateLifeCycle.dump(out,indent,dump);
996         }
997 
998         /* ------------------------------------------------------------ */
999         public void dumpKeyState(List<Object> dumpto)
1000         {
1001             Selector selector=_selector;
1002             dumpto.add(selector+" keys="+selector.keys().size());
1003             for (SelectionKey key: selector.keys())
1004             {
1005                 if (key.isValid())
1006                     dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
1007                 else
1008                     dumpto.add(key.attachment()+" - - ");
1009             }
1010         }
1011     }
1012 
1013     /* ------------------------------------------------------------ */
1014     private static class ChannelAndAttachment
1015     {
1016         final SelectableChannel _channel;
1017         final Object _attachment;
1018         
1019         public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1020         {
1021             super();
1022             _channel = channel;
1023             _attachment = attachment;
1024         }
1025     }
1026 
1027     /* ------------------------------------------------------------ */
1028     public boolean isDeferringInterestedOps0()
1029     {
1030         return _deferringInterestedOps0;
1031     }
1032 
1033     /* ------------------------------------------------------------ */
1034     public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1035     {
1036         _deferringInterestedOps0 = deferringInterestedOps0;
1037     }
1038     
1039 
1040     /* ------------------------------------------------------------ */
1041     /* ------------------------------------------------------------ */
1042     /* ------------------------------------------------------------ */
1043     private interface ChangeTask extends Runnable
1044     {}
1045     
1046 }