View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.Channel;
19  import java.nio.channels.ClosedSelectorException;
20  import java.nio.channels.SelectableChannel;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.Selector;
23  import java.nio.channels.ServerSocketChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.eclipse.jetty.io.ConnectedEndPoint;
34  import org.eclipse.jetty.io.Connection;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.util.TypeUtil;
37  import org.eclipse.jetty.util.component.AbstractLifeCycle;
38  import org.eclipse.jetty.util.component.AggregateLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Timeout;
43  import org.eclipse.jetty.util.thread.Timeout.Task;
44  
45  
46  /* ------------------------------------------------------------ */
47  /**
48   * The Selector Manager manages and number of SelectSets to allow
49   * NIO scheduling to scale to large numbers of connections.
50   * <p>
51   */
52  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
53  {
54      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
55  
56      private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
57      private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",25000).intValue();
58      private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
59      private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
60  
61      private int _maxIdleTime;
62      private int _lowResourcesMaxIdleTime;
63      private long _lowResourcesConnections;
64      private SelectSet[] _selectSet;
65      private int _selectSets=1;
66      private volatile int _set;
67      private boolean _deferringInterestedOps0=true;
68      private int _selectorPriorityDelta=0;
69  
70      /* ------------------------------------------------------------ */
71      /**
72       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
73       * @see #setLowResourcesMaxIdleTime(long)
74       */
75      public void setMaxIdleTime(long maxIdleTime)
76      {
77          _maxIdleTime=(int)maxIdleTime;
78      }
79  
80      /* ------------------------------------------------------------ */
81      /**
82       * @param selectSets number of select sets to create
83       */
84      public void setSelectSets(int selectSets)
85      {
86          long lrc = _lowResourcesConnections * _selectSets;
87          _selectSets=selectSets;
88          _lowResourcesConnections=lrc/_selectSets;
89      }
90  
91      /* ------------------------------------------------------------ */
92      /**
93       * @return the max idle time
94       */
95      public long getMaxIdleTime()
96      {
97          return _maxIdleTime;
98      }
99  
100     /* ------------------------------------------------------------ */
101     /**
102      * @return the number of select sets in use
103      */
104     public int getSelectSets()
105     {
106         return _selectSets;
107     }
108 
109     /* ------------------------------------------------------------ */
110     /**
111      * @param i
112      * @return The select set
113      */
114     public SelectSet getSelectSet(int i)
115     {
116         return _selectSet[i];
117     }
118 
119     /* ------------------------------------------------------------ */
120     /** Register a channel
121      * @param channel
122      * @param att Attached Object
123      */
124     public void register(SocketChannel channel, Object att)
125     {
126         // The ++ increment here is not atomic, but it does not matter.
127         // so long as the value changes sometimes, then connections will
128         // be distributed over the available sets.
129 
130         int s=_set++;
131         s=s%_selectSets;
132         SelectSet[] sets=_selectSet;
133         if (sets!=null)
134         {
135             SelectSet set=sets[s];
136             set.addChange(channel,att);
137             set.wakeup();
138         }
139     }
140 
141 
142     /* ------------------------------------------------------------ */
143     /** Register a channel
144      * @param channel
145      */
146     public void register(SocketChannel channel)
147     {
148         // The ++ increment here is not atomic, but it does not matter.
149         // so long as the value changes sometimes, then connections will
150         // be distributed over the available sets.
151 
152         int s=_set++;
153         s=s%_selectSets;
154         SelectSet[] sets=_selectSet;
155         if (sets!=null)
156         {
157             SelectSet set=sets[s];
158             set.addChange(channel);
159             set.wakeup();
160         }
161     }
162 
163     /* ------------------------------------------------------------ */
164     /** Register a {@link ServerSocketChannel}
165      * @param acceptChannel
166      */
167     public void register(ServerSocketChannel acceptChannel)
168     {
169         int s=_set++;
170         s=s%_selectSets;
171         SelectSet set=_selectSet[s];
172         set.addChange(acceptChannel);
173         set.wakeup();
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * @return delta The value to add to the selector thread priority.
179      */
180     public int getSelectorPriorityDelta()
181     {
182         return _selectorPriorityDelta;
183     }
184 
185     /* ------------------------------------------------------------ */
186     /** Set the selector thread priorty delta.
187      * @param delta The value to add to the selector thread priority.
188      */
189     public void setSelectorPriorityDelta(int delta)
190     {
191         _selectorPriorityDelta=delta;
192     }
193 
194 
195     /* ------------------------------------------------------------ */
196     /**
197      * @return the lowResourcesConnections
198      */
199     public long getLowResourcesConnections()
200     {
201         return _lowResourcesConnections*_selectSets;
202     }
203 
204     /* ------------------------------------------------------------ */
205     /**
206      * Set the number of connections, which if exceeded places this manager in low resources state.
207      * This is not an exact measure as the connection count is averaged over the select sets.
208      * @param lowResourcesConnections the number of connections
209      * @see #setLowResourcesMaxIdleTime(long)
210      */
211     public void setLowResourcesConnections(long lowResourcesConnections)
212     {
213         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
214     }
215 
216     /* ------------------------------------------------------------ */
217     /**
218      * @return the lowResourcesMaxIdleTime
219      */
220     public long getLowResourcesMaxIdleTime()
221     {
222         return _lowResourcesMaxIdleTime;
223     }
224 
225     /* ------------------------------------------------------------ */
226     /**
227      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
228      * @see #setMaxIdleTime(long)
229      */
230     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
231     {
232         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
233     }
234 
235 
236     /* ------------------------------------------------------------------------------- */
237     public abstract boolean dispatch(Runnable task);
238 
239     /* ------------------------------------------------------------ */
240     /* (non-Javadoc)
241      * @see org.eclipse.component.AbstractLifeCycle#doStart()
242      */
243     @Override
244     protected void doStart() throws Exception
245     {
246         _selectSet = new SelectSet[_selectSets];
247         for (int i=0;i<_selectSet.length;i++)
248             _selectSet[i]= new SelectSet(i);
249 
250         super.doStart();
251 
252         // start a thread to Select
253         for (int i=0;i<getSelectSets();i++)
254         {
255             final int id=i;
256             dispatch(new Runnable()
257             {
258                 public void run()
259                 {
260                     String name=Thread.currentThread().getName();
261                     int priority=Thread.currentThread().getPriority();
262                     try
263                     {
264                         SelectSet[] sets=_selectSet;
265                         if (sets==null)
266                             return;
267                         SelectSet set=sets[id];
268 
269                         Thread.currentThread().setName(name+" Selector"+id);
270                         if (getSelectorPriorityDelta()!=0)
271                             Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
272                         LOG.debug("Starting {} on {}",Thread.currentThread(),this);
273                         while (isRunning())
274                         {
275                             try
276                             {
277                                 set.doSelect();
278                             }
279                             catch(ThreadDeath e)
280                             {
281                                 throw e;
282                             }
283                             catch(IOException e)
284                             {
285                                 LOG.ignore(e);
286                             }
287                             catch(Exception e)
288                             {
289                                 LOG.warn(e);
290                             }
291                         }
292                     }
293                     finally
294                     {
295                         LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
296                         Thread.currentThread().setName(name);
297                         if (getSelectorPriorityDelta()!=0)
298                             Thread.currentThread().setPriority(priority);
299                     }
300                 }
301 
302             });
303         }
304     }
305 
306 
307     /* ------------------------------------------------------------------------------- */
308     @Override
309     protected void doStop() throws Exception
310     {
311         SelectSet[] sets= _selectSet;
312         _selectSet=null;
313         if (sets!=null)
314         {
315             for (SelectSet set : sets)
316             {
317                 if (set!=null)
318                     set.stop();
319             }
320         }
321         super.doStop();
322     }
323 
324     /* ------------------------------------------------------------ */
325     /**
326      * @param endpoint
327      */
328     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
329 
330     /* ------------------------------------------------------------ */
331     /**
332      * @param endpoint
333      */
334     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
335 
336     /* ------------------------------------------------------------ */
337     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
338 
339     /* ------------------------------------------------------------------------------- */
340     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
341 
342     /* ------------------------------------------------------------ */
343     /**
344      * Create a new end point
345      * @param channel
346      * @param selectSet
347      * @param sKey the selection key
348      * @return the new endpoint {@link SelectChannelEndPoint}
349      * @throws IOException
350      */
351     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
352 
353     /* ------------------------------------------------------------------------------- */
354     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
355     {
356         LOG.warn(ex+","+channel+","+attachment);
357         LOG.debug(ex);
358     }
359 
360     /* ------------------------------------------------------------ */
361     public String dump()
362     {
363         return AggregateLifeCycle.dump(this);
364     }
365 
366     /* ------------------------------------------------------------ */
367     public void dump(Appendable out, String indent) throws IOException
368     {
369         out.append(String.valueOf(this)).append("\n");
370         AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
371     }
372 
373 
374     /* ------------------------------------------------------------------------------- */
375     /* ------------------------------------------------------------------------------- */
376     /* ------------------------------------------------------------------------------- */
377     public class SelectSet implements Dumpable
378     {
379         private final int _setID;
380         private final Timeout _timeout;
381 
382         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
383 
384         private volatile Selector _selector;
385 
386         private volatile Thread _selecting;
387         private int _busySelects;
388         private long _monitorNext;
389         private boolean _pausing;
390         private boolean _paused;
391         private volatile long _idleTick;
392         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
393 
394         /* ------------------------------------------------------------ */
395         SelectSet(int acceptorID) throws Exception
396         {
397             _setID=acceptorID;
398 
399             _idleTick = System.currentTimeMillis();
400             _timeout = new Timeout(this);
401             _timeout.setDuration(0L);
402 
403             // create a selector;
404             _selector = Selector.open();
405             _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
406         }
407 
408         /* ------------------------------------------------------------ */
409         public void addChange(Object change)
410         {
411             _changes.add(change);
412         }
413 
414         /* ------------------------------------------------------------ */
415         public void addChange(SelectableChannel channel, Object att)
416         {
417             if (att==null)
418                 addChange(channel);
419             else if (att instanceof EndPoint)
420                 addChange(att);
421             else
422                 addChange(new ChannelAndAttachment(channel,att));
423         }
424 
425         /* ------------------------------------------------------------ */
426         /**
427          * Select and dispatch tasks found from changes and the selector.
428          *
429          * @throws IOException
430          */
431         public void doSelect() throws IOException
432         {
433             try
434             {
435                 _selecting=Thread.currentThread();
436                 final Selector selector=_selector;
437                 // Stopped concurrently ?
438                 if (selector == null)
439                     return;
440 
441                 // Make any key changes required
442                 Object change;
443                 int changes=_changes.size();
444                 while (changes-->0 && (change=_changes.poll())!=null)
445                 {
446                     Channel ch=null;
447                     SelectionKey key=null;
448 
449                     try
450                     {
451                         if (change instanceof EndPoint)
452                         {
453                             // Update the operations for a key.
454                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
455                             ch=endpoint.getChannel();
456                             endpoint.doUpdateKey();
457                         }
458                         else if (change instanceof ChannelAndAttachment)
459                         {
460                             // finish accepting/connecting this connection
461                             final ChannelAndAttachment asc = (ChannelAndAttachment)change;
462                             final SelectableChannel channel=asc._channel;
463                             ch=channel;
464                             final Object att = asc._attachment;
465 
466                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
467                             {
468                                 key = channel.register(selector,SelectionKey.OP_READ,att);
469                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
470                                 key.attach(endpoint);
471                                 endpoint.schedule();
472                             }
473                             else if (channel.isOpen())
474                             {
475                                 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
476                             }
477                         }
478                         else if (change instanceof SocketChannel)
479                         {
480                             // Newly registered channel
481                             final SocketChannel channel=(SocketChannel)change;
482                             ch=channel;
483                             key = channel.register(selector,SelectionKey.OP_READ,null);
484                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
485                             key.attach(endpoint);
486                             endpoint.schedule();
487                         }
488                         else if (change instanceof ChangeTask)
489                         {
490                             ((Runnable)change).run();
491                         }
492                         else if (change instanceof Runnable)
493                         {
494                             dispatch((Runnable)change);
495                         }
496                         else
497                             throw new IllegalArgumentException(change.toString());
498                     }
499                     catch (CancelledKeyException e)
500                     {
501                         LOG.ignore(e);
502                     }
503                     catch (Throwable e)
504                     {
505                         if (e instanceof ThreadDeath)
506                             throw (ThreadDeath)e;
507 
508                         if (isRunning())
509                             LOG.warn(e);
510                         else
511                             LOG.debug(e);
512 
513                         try
514                         {
515                             if (ch!=null)
516                                 ch.close();
517                         }
518                         catch(IOException e2)
519                         {
520                             LOG.debug(e2);
521                         }
522                     }
523                 }
524 
525 
526                 // Do and instant select to see if any connections can be handled.
527                 int selected=selector.selectNow();
528 
529                 long now=System.currentTimeMillis();
530 
531                 // if no immediate things to do
532                 if (selected==0 && selector.selectedKeys().isEmpty())
533                 {
534                     // If we are in pausing mode
535                     if (_pausing)
536                     {
537                         try
538                         {
539                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
540                         }
541                         catch(InterruptedException e)
542                         {
543                             LOG.ignore(e);
544                         }
545                         now=System.currentTimeMillis();
546                     }
547 
548                     // workout how long to wait in select
549                     _timeout.setNow(now);
550                     long to_next_timeout=_timeout.getTimeToNext();
551 
552                     long wait = _changes.size()==0?__IDLE_TICK:0L;
553                     if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
554                         wait = to_next_timeout;
555 
556                     // If we should wait with a select
557                     if (wait>0)
558                     {
559                         long before=now;
560                         selected=selector.select(wait);
561                         now = System.currentTimeMillis();
562                         _timeout.setNow(now);
563 
564                         // If we are monitoring for busy selector
565                         // and this select did not wait more than 1ms
566                         if (__MONITOR_PERIOD>0 && now-before <=1)
567                         {
568                             // count this as a busy select and if there have been too many this monitor cycle
569                             if (++_busySelects>__MAX_SELECTS)
570                             {
571                                 // Start injecting pauses
572                                 _pausing=true;
573                                 
574                                 // if this is the first pause
575                                 if (!_paused)
576                                 {
577                                     // Log and dump some status
578                                     _paused=true;
579                                     LOG.warn("Selector {} is too busy, pausing!",this);
580                                     final SelectSet set = this;
581                                     SelectorManager.this.dispatch(
582                                     new Runnable(){
583                                         public void run()
584                                         {
585                                             System.err.println(set+":\n"+set.dump());
586                                         }
587                                     });
588                                 }
589                             }
590                         }
591                     }
592                 }
593 
594                 // have we been destroyed while sleeping
595                 if (_selector==null || !selector.isOpen())
596                     return;
597 
598                 // Look for things to do
599                 for (SelectionKey key: selector.selectedKeys())
600                 {
601                     SocketChannel channel=null;
602 
603                     try
604                     {
605                         if (!key.isValid())
606                         {
607                             key.cancel();
608                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
609                             if (endpoint != null)
610                                 endpoint.doUpdateKey();
611                             continue;
612                         }
613 
614                         Object att = key.attachment();
615                         if (att instanceof SelectChannelEndPoint)
616                         {
617                             if (key.isReadable()||key.isWritable())
618                                 ((SelectChannelEndPoint)att).schedule();
619                         }
620                         else if (key.isConnectable())
621                         {
622                             // Complete a connection of a registered channel
623                             channel = (SocketChannel)key.channel();
624                             boolean connected=false;
625                             try
626                             {
627                                 connected=channel.finishConnect();
628                             }
629                             catch(Exception e)
630                             {
631                                 connectionFailed(channel,e,att);
632                             }
633                             finally
634                             {
635                                 if (connected)
636                                 {
637                                     key.interestOps(SelectionKey.OP_READ);
638                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
639                                     key.attach(endpoint);
640                                     endpoint.schedule();
641                                 }
642                                 else
643                                 {
644                                     key.cancel();
645                                 }
646                             }
647                         }
648                         else
649                         {
650                             // Wrap readable registered channel in an endpoint
651                             channel = (SocketChannel)key.channel();
652                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
653                             key.attach(endpoint);
654                             if (key.isReadable())
655                                 endpoint.schedule();
656                         }
657                         key = null;
658                     }
659                     catch (CancelledKeyException e)
660                     {
661                         LOG.ignore(e);
662                     }
663                     catch (Exception e)
664                     {
665                         if (isRunning())
666                             LOG.warn(e);
667                         else
668                             LOG.ignore(e);
669 
670                         try
671                         {
672                             if (channel!=null)
673                                 channel.close();
674                         }
675                         catch(IOException e2)
676                         {
677                             LOG.debug(e2);
678                         }
679 
680                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
681                             key.cancel();
682                     }
683                 }
684 
685                 // Everything always handled
686                 selector.selectedKeys().clear();
687 
688                 now=System.currentTimeMillis();
689                 _timeout.setNow(now);
690                 Task task = _timeout.expired();
691                 while (task!=null)
692                 {
693                     if (task instanceof Runnable)
694                         dispatch((Runnable)task);
695                     task = _timeout.expired();
696                 }
697 
698                 // Idle tick
699                 if (now-_idleTick>__IDLE_TICK)
700                 {
701                     _idleTick=now;
702 
703                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
704                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
705                         :now;
706 
707                     dispatch(new Runnable()
708                     {
709                         public void run()
710                         {
711                             for (SelectChannelEndPoint endp:_endPoints.keySet())
712                             {
713                                 endp.checkIdleTimestamp(idle_now);
714                             }
715                         }
716                     });
717                     
718                 }
719                 
720                 // Reset busy select monitor counts
721                 if (__MONITOR_PERIOD>0 && now>_monitorNext)
722                 {
723                     _busySelects=0;
724                     _pausing=false;
725                     _monitorNext=now+__MONITOR_PERIOD;
726                     
727                 }
728             }
729             catch (ClosedSelectorException e)
730             {
731                 if (isRunning())
732                     LOG.warn(e);
733                 else
734                     LOG.ignore(e);
735             }
736             catch (CancelledKeyException e)
737             {
738                 LOG.ignore(e);
739             }
740             finally
741             {
742                 _selecting=null;
743             }
744         }
745 
746 
747         /* ------------------------------------------------------------ */
748         private void renewSelector()
749         {
750             try
751             {
752                 synchronized (this)
753                 {
754                     Selector selector=_selector;
755                     if (selector==null)
756                         return;
757                     final Selector new_selector = Selector.open();
758                     for (SelectionKey k: selector.keys())
759                     {
760                         if (!k.isValid() || k.interestOps()==0)
761                             continue;
762 
763                         final SelectableChannel channel = k.channel();
764                         final Object attachment = k.attachment();
765 
766                         if (attachment==null)
767                             addChange(channel);
768                         else
769                             addChange(channel,attachment);
770                     }
771                     _selector.close();
772                     _selector=new_selector;
773                 }
774             }
775             catch(IOException e)
776             {
777                 throw new RuntimeException("recreating selector",e);
778             }
779         }
780 
781         /* ------------------------------------------------------------ */
782         public SelectorManager getManager()
783         {
784             return SelectorManager.this;
785         }
786 
787         /* ------------------------------------------------------------ */
788         public long getNow()
789         {
790             return _timeout.getNow();
791         }
792 
793         /* ------------------------------------------------------------ */
794         /**
795          * @param task The task to timeout. If it implements Runnable, then
796          * expired will be called from a dispatched thread.
797          *
798          * @param timeoutMs
799          */
800         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
801         {
802             if (!(task instanceof Runnable))
803                 throw new IllegalArgumentException("!Runnable");
804             _timeout.schedule(task, timeoutMs);
805         }
806 
807         /* ------------------------------------------------------------ */
808         public void cancelTimeout(Timeout.Task task)
809         {
810             task.cancel();
811         }
812 
813         /* ------------------------------------------------------------ */
814         public void wakeup()
815         {
816             try
817             {
818                 Selector selector = _selector;
819                 if (selector!=null)
820                     selector.wakeup();
821             }
822             catch(Exception e)
823             {
824                 addChange(new ChangeTask()
825                 {
826                     public void run()
827                     {
828                         renewSelector();
829                     }
830                 });
831 
832                 renewSelector();
833             }
834         }
835 
836         /* ------------------------------------------------------------ */
837         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
838         {
839             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
840             endPointOpened(endp);
841             _endPoints.put(endp,this);
842             return endp;
843         }
844 
845         /* ------------------------------------------------------------ */
846         public void destroyEndPoint(SelectChannelEndPoint endp)
847         {
848             LOG.debug("destroyEndPoint {}",endp);
849             _endPoints.remove(endp);
850             endPointClosed(endp);
851         }
852 
853         /* ------------------------------------------------------------ */
854         Selector getSelector()
855         {
856             return _selector;
857         }
858 
859         /* ------------------------------------------------------------ */
860         void stop() throws Exception
861         {
862             // Spin for a while waiting for selector to complete
863             // to avoid unneccessary closed channel exceptions
864             try
865             {
866                 for (int i=0;i<100 && _selecting!=null;i++)
867                 {
868                     wakeup();
869                     Thread.sleep(10);
870                 }
871             }
872             catch(Exception e)
873             {
874                 LOG.ignore(e);
875             }
876 
877             // close endpoints and selector
878             synchronized (this)
879             {
880                 Selector selector=_selector;
881                 for (SelectionKey key:selector.keys())
882                 {
883                     if (key==null)
884                         continue;
885                     Object att=key.attachment();
886                     if (att instanceof EndPoint)
887                     {
888                         EndPoint endpoint = (EndPoint)att;
889                         try
890                         {
891                             endpoint.close();
892                         }
893                         catch(IOException e)
894                         {
895                             LOG.ignore(e);
896                         }
897                     }
898                 }
899 
900 
901                 _timeout.cancelAll();
902                 try
903                 {
904                     selector=_selector;
905                     if (selector != null)
906                         selector.close();
907                 }
908                 catch (IOException e)
909                 {
910                     LOG.ignore(e);
911                 }
912                 _selector=null;
913             }
914         }
915 
916         /* ------------------------------------------------------------ */
917         public String dump()
918         {
919             return AggregateLifeCycle.dump(this);
920         }
921 
922         /* ------------------------------------------------------------ */
923         public void dump(Appendable out, String indent) throws IOException
924         {
925             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
926 
927             Thread selecting = _selecting;
928 
929             Object where = "not selecting";
930             StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
931             if (trace!=null)
932             {
933                 for (StackTraceElement t:trace)
934                     if (t.getClassName().startsWith("org.eclipse.jetty."))
935                     {
936                         where=t;
937                         break;
938                     }
939             }
940 
941             Selector selector=_selector;
942             if (selector!=null)
943             {
944                 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
945                 dump.add(where);
946 
947                 final CountDownLatch latch = new CountDownLatch(1);
948 
949                 addChange(new ChangeTask()
950                 {
951                     public void run()
952                     {
953                         dumpKeyState(dump);
954                         latch.countDown();
955                     }
956                 });
957 
958                 try
959                 {
960                     latch.await(5,TimeUnit.SECONDS);
961                 }
962                 catch(InterruptedException e)
963                 {
964                     LOG.ignore(e);
965                 }
966                 AggregateLifeCycle.dump(out,indent,dump);
967             }
968         }
969 
970         /* ------------------------------------------------------------ */
971         public void dumpKeyState(List<Object> dumpto)
972         {
973             Selector selector=_selector;
974             dumpto.add(selector+" keys="+selector.keys().size());
975             for (SelectionKey key: selector.keys())
976             {
977                 if (key.isValid())
978                     dumpto.add(key.attachment()+" "+key.interestOps()+" "+key.readyOps());
979                 else
980                     dumpto.add(key.attachment()+" - - ");
981             }
982         }
983     }
984 
985     /* ------------------------------------------------------------ */
986     private static class ChannelAndAttachment
987     {
988         final SelectableChannel _channel;
989         final Object _attachment;
990 
991         public ChannelAndAttachment(SelectableChannel channel, Object attachment)
992         {
993             super();
994             _channel = channel;
995             _attachment = attachment;
996         }
997     }
998 
999     /* ------------------------------------------------------------ */
1000     public boolean isDeferringInterestedOps0()
1001     {
1002         return _deferringInterestedOps0;
1003     }
1004 
1005     /* ------------------------------------------------------------ */
1006     public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1007     {
1008         _deferringInterestedOps0 = deferringInterestedOps0;
1009     }
1010 
1011 
1012     /* ------------------------------------------------------------ */
1013     /* ------------------------------------------------------------ */
1014     /* ------------------------------------------------------------ */
1015     private interface ChangeTask extends Runnable
1016     {}
1017 
1018 }