View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io.nio;
20  
21  import java.io.IOException;
22  import java.nio.channels.CancelledKeyException;
23  import java.nio.channels.Channel;
24  import java.nio.channels.ClosedSelectorException;
25  import java.nio.channels.SelectableChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.ServerSocketChannel;
29  import java.nio.channels.SocketChannel;
30  import java.util.ArrayList;
31  import java.util.List;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.CountDownLatch;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.eclipse.jetty.io.AsyncEndPoint;
40  import org.eclipse.jetty.io.ConnectedEndPoint;
41  import org.eclipse.jetty.io.Connection;
42  import org.eclipse.jetty.io.EndPoint;
43  import org.eclipse.jetty.util.TypeUtil;
44  import org.eclipse.jetty.util.component.AbstractLifeCycle;
45  import org.eclipse.jetty.util.component.AggregateLifeCycle;
46  import org.eclipse.jetty.util.component.Dumpable;
47  import org.eclipse.jetty.util.log.Log;
48  import org.eclipse.jetty.util.log.Logger;
49  import org.eclipse.jetty.util.thread.Timeout;
50  import org.eclipse.jetty.util.thread.Timeout.Task;
51  
52  
53  /* ------------------------------------------------------------ */
54  /**
55   * The Selector Manager manages and number of SelectSets to allow
56   * NIO scheduling to scale to large numbers of connections.
57   * <p>
58   */
59  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
60  {
61      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
62  
63      private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
64      private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
65      private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
66      private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
67  
68      private int _maxIdleTime;
69      private int _lowResourcesMaxIdleTime;
70      private long _lowResourcesConnections;
71      private SelectSet[] _selectSet;
72      private int _selectSets=1;
73      private volatile int _set=0;
74      private boolean _deferringInterestedOps0=true;
75      private int _selectorPriorityDelta=0;
76  
77      /* ------------------------------------------------------------ */
78      /**
79       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
80       * @see #setLowResourcesMaxIdleTime(long)
81       */
82      public void setMaxIdleTime(long maxIdleTime)
83      {
84          _maxIdleTime=(int)maxIdleTime;
85      }
86  
87      /* ------------------------------------------------------------ */
88      /**
89       * @param selectSets number of select sets to create
90       */
91      public void setSelectSets(int selectSets)
92      {
93          long lrc = _lowResourcesConnections * _selectSets;
94          _selectSets=selectSets;
95          _lowResourcesConnections=lrc/_selectSets;
96      }
97  
98      /* ------------------------------------------------------------ */
99      /**
100      * @return the max idle time
101      */
102     public long getMaxIdleTime()
103     {
104         return _maxIdleTime;
105     }
106 
107     /* ------------------------------------------------------------ */
108     /**
109      * @return the number of select sets in use
110      */
111     public int getSelectSets()
112     {
113         return _selectSets;
114     }
115 
116     /* ------------------------------------------------------------ */
117     /**
118      * @param i
119      * @return The select set
120      */
121     public SelectSet getSelectSet(int i)
122     {
123         return _selectSet[i];
124     }
125 
126     /* ------------------------------------------------------------ */
127     /** Register a channel
128      * @param channel
129      * @param att Attached Object
130      */
131     public void register(SocketChannel channel, Object att)
132     {
133         // The ++ increment here is not atomic, but it does not matter.
134         // so long as the value changes sometimes, then connections will
135         // be distributed over the available sets.
136 
137         int s=_set++;
138         if (s<0)
139             s=-s;
140         s=s%_selectSets;
141         SelectSet[] sets=_selectSet;
142         if (sets!=null)
143         {
144             SelectSet set=sets[s];
145             set.addChange(channel,att);
146             set.wakeup();
147         }
148     }
149 
150 
151     /* ------------------------------------------------------------ */
152     /** Register a channel
153      * @param channel
154      */
155     public void register(SocketChannel channel)
156     {
157         // The ++ increment here is not atomic, but it does not matter.
158         // so long as the value changes sometimes, then connections will
159         // be distributed over the available sets.
160 
161         int s=_set++;
162         if (s<0)
163             s=-s;
164         s=s%_selectSets;
165         SelectSet[] sets=_selectSet;
166         if (sets!=null)
167         {
168             SelectSet set=sets[s];
169             set.addChange(channel);
170             set.wakeup();
171         }
172     }
173 
174     /* ------------------------------------------------------------ */
175     /** Register a {@link ServerSocketChannel}
176      * @param acceptChannel
177      */
178     public void register(ServerSocketChannel acceptChannel)
179     {
180         int s=_set++;
181         if (s<0)
182             s=-s;
183         s=s%_selectSets;
184         SelectSet set=_selectSet[s];
185         set.addChange(acceptChannel);
186         set.wakeup();
187     }
188 
189     /* ------------------------------------------------------------ */
190     /**
191      * @return delta The value to add to the selector thread priority.
192      */
193     public int getSelectorPriorityDelta()
194     {
195         return _selectorPriorityDelta;
196     }
197 
198     /* ------------------------------------------------------------ */
199     /** Set the selector thread priorty delta.
200      * @param delta The value to add to the selector thread priority.
201      */
202     public void setSelectorPriorityDelta(int delta)
203     {
204         _selectorPriorityDelta=delta;
205     }
206 
207 
208     /* ------------------------------------------------------------ */
209     /**
210      * @return the lowResourcesConnections
211      */
212     public long getLowResourcesConnections()
213     {
214         return _lowResourcesConnections*_selectSets;
215     }
216 
217     /* ------------------------------------------------------------ */
218     /**
219      * Set the number of connections, which if exceeded places this manager in low resources state.
220      * This is not an exact measure as the connection count is averaged over the select sets.
221      * @param lowResourcesConnections the number of connections
222      * @see #setLowResourcesMaxIdleTime(long)
223      */
224     public void setLowResourcesConnections(long lowResourcesConnections)
225     {
226         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
227     }
228 
229     /* ------------------------------------------------------------ */
230     /**
231      * @return the lowResourcesMaxIdleTime
232      */
233     public long getLowResourcesMaxIdleTime()
234     {
235         return _lowResourcesMaxIdleTime;
236     }
237 
238     /* ------------------------------------------------------------ */
239     /**
240      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
241      * @see #setMaxIdleTime(long)
242      */
243     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
244     {
245         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
246     }
247 
248 
249     /* ------------------------------------------------------------------------------- */
250     public abstract boolean dispatch(Runnable task);
251 
252     /* ------------------------------------------------------------ */
253     /* (non-Javadoc)
254      * @see org.eclipse.component.AbstractLifeCycle#doStart()
255      */
256     @Override
257     protected void doStart() throws Exception
258     {
259         _selectSet = new SelectSet[_selectSets];
260         for (int i=0;i<_selectSet.length;i++)
261             _selectSet[i]= new SelectSet(i);
262 
263         super.doStart();
264 
265         // start a thread to Select
266         for (int i=0;i<getSelectSets();i++)
267         {
268             final int id=i;
269             boolean selecting=dispatch(new Runnable()
270             {
271                 public void run()
272                 {
273                     String name=Thread.currentThread().getName();
274                     int priority=Thread.currentThread().getPriority();
275                     try
276                     {
277                         SelectSet[] sets=_selectSet;
278                         if (sets==null)
279                             return;
280                         SelectSet set=sets[id];
281 
282                         Thread.currentThread().setName(name+" Selector"+id);
283                         if (getSelectorPriorityDelta()!=0)
284                             Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
285                         LOG.debug("Starting {} on {}",Thread.currentThread(),this);
286                         while (isRunning())
287                         {
288                             try
289                             {
290                                 set.doSelect();
291                             }
292                             catch(IOException e)
293                             {
294                                 LOG.ignore(e);
295                             }
296                             catch(Exception e)
297                             {
298                                 LOG.warn(e);
299                             }
300                         }
301                     }
302                     finally
303                     {
304                         LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
305                         Thread.currentThread().setName(name);
306                         if (getSelectorPriorityDelta()!=0)
307                             Thread.currentThread().setPriority(priority);
308                     }
309                 }
310 
311             });
312 
313             if (!selecting)
314                 throw new IllegalStateException("!Selecting");
315         }
316     }
317 
318 
319     /* ------------------------------------------------------------------------------- */
320     @Override
321     protected void doStop() throws Exception
322     {
323         SelectSet[] sets= _selectSet;
324         _selectSet=null;
325         if (sets!=null)
326         {
327             for (SelectSet set : sets)
328             {
329                 if (set!=null)
330                     set.stop();
331             }
332         }
333         super.doStop();
334     }
335 
336     /* ------------------------------------------------------------ */
337     /**
338      * @param endpoint
339      */
340     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
341 
342     /* ------------------------------------------------------------ */
343     /**
344      * @param endpoint
345      */
346     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
347 
348     /* ------------------------------------------------------------ */
349     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
350 
351     /* ------------------------------------------------------------------------------- */
352     public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
353 
354     /* ------------------------------------------------------------ */
355     /**
356      * Create a new end point
357      * @param channel
358      * @param selectSet
359      * @param sKey the selection key
360      * @return the new endpoint {@link SelectChannelEndPoint}
361      * @throws IOException
362      */
363     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
364 
365     /* ------------------------------------------------------------------------------- */
366     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
367     {
368         LOG.warn(ex+","+channel+","+attachment);
369         LOG.debug(ex);
370     }
371 
372     /* ------------------------------------------------------------ */
373     public String dump()
374     {
375         return AggregateLifeCycle.dump(this);
376     }
377 
378     /* ------------------------------------------------------------ */
379     public void dump(Appendable out, String indent) throws IOException
380     {
381         AggregateLifeCycle.dumpObject(out,this);
382         AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
383     }
384 
385 
386     /* ------------------------------------------------------------------------------- */
387     /* ------------------------------------------------------------------------------- */
388     /* ------------------------------------------------------------------------------- */
389     public class SelectSet implements Dumpable
390     {
391         private final int _setID;
392         private final Timeout _timeout;
393 
394         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
395 
396         private volatile Selector _selector;
397 
398         private volatile Thread _selecting;
399         private int _busySelects;
400         private long _monitorNext;
401         private boolean _pausing;
402         private boolean _paused;
403         private volatile long _idleTick;
404         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
405 
406         /* ------------------------------------------------------------ */
407         SelectSet(int acceptorID) throws Exception
408         {
409             _setID=acceptorID;
410 
411             _idleTick = System.currentTimeMillis();
412             _timeout = new Timeout(this);
413             _timeout.setDuration(0L);
414 
415             // create a selector;
416             _selector = Selector.open();
417             _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
418         }
419 
420         /* ------------------------------------------------------------ */
421         public void addChange(Object change)
422         {
423             _changes.add(change);
424         }
425 
426         /* ------------------------------------------------------------ */
427         public void addChange(SelectableChannel channel, Object att)
428         {
429             if (att==null)
430                 addChange(channel);
431             else if (att instanceof EndPoint)
432                 addChange(att);
433             else
434                 addChange(new ChannelAndAttachment(channel,att));
435         }
436 
437         /* ------------------------------------------------------------ */
438         /**
439          * Select and dispatch tasks found from changes and the selector.
440          *
441          * @throws IOException
442          */
443         public void doSelect() throws IOException
444         {
445             try
446             {
447                 _selecting=Thread.currentThread();
448                 final Selector selector=_selector;
449                 // Stopped concurrently ?
450                 if (selector == null)
451                     return;
452 
453                 // Make any key changes required
454                 Object change;
455                 int changes=_changes.size();
456                 while (changes-->0 && (change=_changes.poll())!=null)
457                 {
458                     Channel ch=null;
459                     SelectionKey key=null;
460 
461                     try
462                     {
463                         if (change instanceof EndPoint)
464                         {
465                             // Update the operations for a key.
466                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
467                             ch=endpoint.getChannel();
468                             endpoint.doUpdateKey();
469                         }
470                         else if (change instanceof ChannelAndAttachment)
471                         {
472                             // finish accepting/connecting this connection
473                             final ChannelAndAttachment asc = (ChannelAndAttachment)change;
474                             final SelectableChannel channel=asc._channel;
475                             ch=channel;
476                             final Object att = asc._attachment;
477 
478                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
479                             {
480                                 key = channel.register(selector,SelectionKey.OP_READ,att);
481                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
482                                 key.attach(endpoint);
483                                 endpoint.schedule();
484                             }
485                             else if (channel.isOpen())
486                             {
487                                 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
488                             }
489                         }
490                         else if (change instanceof SocketChannel)
491                         {
492                             // Newly registered channel
493                             final SocketChannel channel=(SocketChannel)change;
494                             ch=channel;
495                             key = channel.register(selector,SelectionKey.OP_READ,null);
496                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
497                             key.attach(endpoint);
498                             endpoint.schedule();
499                         }
500                         else if (change instanceof ChangeTask)
501                         {
502                             ((Runnable)change).run();
503                         }
504                         else if (change instanceof Runnable)
505                         {
506                             dispatch((Runnable)change);
507                         }
508                         else
509                             throw new IllegalArgumentException(change.toString());
510                     }
511                     catch (CancelledKeyException e)
512                     {
513                         LOG.ignore(e);
514                     }
515                     catch (Throwable e)
516                     {
517                         if (isRunning())
518                             LOG.warn(e);
519                         else
520                             LOG.debug(e);
521 
522                         try
523                         {
524                             if (ch!=null)
525                                 ch.close();
526                         }
527                         catch(IOException e2)
528                         {
529                             LOG.debug(e2);
530                         }
531                     }
532                 }
533 
534 
535                 // Do and instant select to see if any connections can be handled.
536                 int selected=selector.selectNow();
537 
538                 long now=System.currentTimeMillis();
539 
540                 // if no immediate things to do
541                 if (selected==0 && selector.selectedKeys().isEmpty())
542                 {
543                     // If we are in pausing mode
544                     if (_pausing)
545                     {
546                         try
547                         {
548                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
549                         }
550                         catch(InterruptedException e)
551                         {
552                             LOG.ignore(e);
553                         }
554                         now=System.currentTimeMillis();
555                     }
556 
557                     // workout how long to wait in select
558                     _timeout.setNow(now);
559                     long to_next_timeout=_timeout.getTimeToNext();
560 
561                     long wait = _changes.size()==0?__IDLE_TICK:0L;
562                     if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
563                         wait = to_next_timeout;
564 
565                     // If we should wait with a select
566                     if (wait>0)
567                     {
568                         long before=now;
569                         selector.select(wait);
570                         now = System.currentTimeMillis();
571                         _timeout.setNow(now);
572 
573                         // If we are monitoring for busy selector
574                         // and this select did not wait more than 1ms
575                         if (__MONITOR_PERIOD>0 && now-before <=1)
576                         {
577                             // count this as a busy select and if there have been too many this monitor cycle
578                             if (++_busySelects>__MAX_SELECTS)
579                             {
580                                 // Start injecting pauses
581                                 _pausing=true;
582 
583                                 // if this is the first pause
584                                 if (!_paused)
585                                 {
586                                     // Log and dump some status
587                                     _paused=true;
588                                     LOG.warn("Selector {} is too busy, pausing!",this);
589                                 }
590                             }
591                         }
592                     }
593                 }
594 
595                 // have we been destroyed while sleeping
596                 if (_selector==null || !selector.isOpen())
597                     return;
598 
599                 // Look for things to do
600                 for (SelectionKey key: selector.selectedKeys())
601                 {
602                     SocketChannel channel=null;
603 
604                     try
605                     {
606                         if (!key.isValid())
607                         {
608                             key.cancel();
609                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
610                             if (endpoint != null)
611                                 endpoint.doUpdateKey();
612                             continue;
613                         }
614 
615                         Object att = key.attachment();
616                         if (att instanceof SelectChannelEndPoint)
617                         {
618                             if (key.isReadable()||key.isWritable())
619                                 ((SelectChannelEndPoint)att).schedule();
620                         }
621                         else if (key.isConnectable())
622                         {
623                             // Complete a connection of a registered channel
624                             channel = (SocketChannel)key.channel();
625                             boolean connected=false;
626                             try
627                             {
628                                 connected=channel.finishConnect();
629                             }
630                             catch(Exception e)
631                             {
632                                 connectionFailed(channel,e,att);
633                             }
634                             finally
635                             {
636                                 if (connected)
637                                 {
638                                     key.interestOps(SelectionKey.OP_READ);
639                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
640                                     key.attach(endpoint);
641                                     endpoint.schedule();
642                                 }
643                                 else
644                                 {
645                                     key.cancel();
646                                 }
647                             }
648                         }
649                         else
650                         {
651                             // Wrap readable registered channel in an endpoint
652                             channel = (SocketChannel)key.channel();
653                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
654                             key.attach(endpoint);
655                             if (key.isReadable())
656                                 endpoint.schedule();
657                         }
658                         key = null;
659                     }
660                     catch (CancelledKeyException e)
661                     {
662                         LOG.ignore(e);
663                     }
664                     catch (Exception e)
665                     {
666                         if (isRunning())
667                             LOG.warn(e);
668                         else
669                             LOG.ignore(e);
670 
671                         try
672                         {
673                             if (channel!=null)
674                                 channel.close();
675                         }
676                         catch(IOException e2)
677                         {
678                             LOG.debug(e2);
679                         }
680 
681                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
682                             key.cancel();
683                     }
684                 }
685 
686                 // Everything always handled
687                 selector.selectedKeys().clear();
688 
689                 now=System.currentTimeMillis();
690                 _timeout.setNow(now);
691                 Task task = _timeout.expired();
692                 while (task!=null)
693                 {
694                     if (task instanceof Runnable)
695                         dispatch((Runnable)task);
696                     task = _timeout.expired();
697                 }
698 
699                 // Idle tick
700                 if (now-_idleTick>__IDLE_TICK)
701                 {
702                     _idleTick=now;
703 
704                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
705                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
706                         :now;
707 
708                     dispatch(new Runnable()
709                     {
710                         public void run()
711                         {
712                             for (SelectChannelEndPoint endp:_endPoints.keySet())
713                             {
714                                 endp.checkIdleTimestamp(idle_now);
715                             }
716                         }
717                         public String toString() {return "Idle-"+super.toString();}
718                     });
719 
720                 }
721 
722                 // Reset busy select monitor counts
723                 if (__MONITOR_PERIOD>0 && now>_monitorNext)
724                 {
725                     _busySelects=0;
726                     _pausing=false;
727                     _monitorNext=now+__MONITOR_PERIOD;
728 
729                 }
730             }
731             catch (ClosedSelectorException e)
732             {
733                 if (isRunning())
734                     LOG.warn(e);
735                 else
736                     LOG.ignore(e);
737             }
738             catch (CancelledKeyException e)
739             {
740                 LOG.ignore(e);
741             }
742             finally
743             {
744                 _selecting=null;
745             }
746         }
747 
748 
749         /* ------------------------------------------------------------ */
750         private void renewSelector()
751         {
752             try
753             {
754                 synchronized (this)
755                 {
756                     Selector selector=_selector;
757                     if (selector==null)
758                         return;
759                     final Selector new_selector = Selector.open();
760                     for (SelectionKey k: selector.keys())
761                     {
762                         if (!k.isValid() || k.interestOps()==0)
763                             continue;
764 
765                         final SelectableChannel channel = k.channel();
766                         final Object attachment = k.attachment();
767 
768                         if (attachment==null)
769                             addChange(channel);
770                         else
771                             addChange(channel,attachment);
772                     }
773                     _selector.close();
774                     _selector=new_selector;
775                 }
776             }
777             catch(IOException e)
778             {
779                 throw new RuntimeException("recreating selector",e);
780             }
781         }
782 
783         /* ------------------------------------------------------------ */
784         public SelectorManager getManager()
785         {
786             return SelectorManager.this;
787         }
788 
789         /* ------------------------------------------------------------ */
790         public long getNow()
791         {
792             return _timeout.getNow();
793         }
794 
795         /* ------------------------------------------------------------ */
796         /**
797          * @param task The task to timeout. If it implements Runnable, then
798          * expired will be called from a dispatched thread.
799          *
800          * @param timeoutMs
801          */
802         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
803         {
804             if (!(task instanceof Runnable))
805                 throw new IllegalArgumentException("!Runnable");
806             _timeout.schedule(task, timeoutMs);
807         }
808 
809         /* ------------------------------------------------------------ */
810         public void cancelTimeout(Timeout.Task task)
811         {
812             task.cancel();
813         }
814 
815         /* ------------------------------------------------------------ */
816         public void wakeup()
817         {
818             try
819             {
820                 Selector selector = _selector;
821                 if (selector!=null)
822                     selector.wakeup();
823             }
824             catch(Exception e)
825             {
826                 addChange(new ChangeTask()
827                 {
828                     public void run()
829                     {
830                         renewSelector();
831                     }
832                 });
833 
834                 renewSelector();
835             }
836         }
837 
838         /* ------------------------------------------------------------ */
839         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
840         {
841             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
842             LOG.debug("created {}",endp);
843             endPointOpened(endp);
844             _endPoints.put(endp,this);
845             return endp;
846         }
847 
848         /* ------------------------------------------------------------ */
849         public void destroyEndPoint(SelectChannelEndPoint endp)
850         {
851             LOG.debug("destroyEndPoint {}",endp);
852             _endPoints.remove(endp);
853             endPointClosed(endp);
854         }
855 
856         /* ------------------------------------------------------------ */
857         Selector getSelector()
858         {
859             return _selector;
860         }
861 
862         /* ------------------------------------------------------------ */
863         void stop() throws Exception
864         {
865             // Spin for a while waiting for selector to complete
866             // to avoid unneccessary closed channel exceptions
867             try
868             {
869                 for (int i=0;i<100 && _selecting!=null;i++)
870                 {
871                     wakeup();
872                     Thread.sleep(10);
873                 }
874             }
875             catch(Exception e)
876             {
877                 LOG.ignore(e);
878             }
879 
880             // close endpoints and selector
881             synchronized (this)
882             {
883                 Selector selector=_selector;
884                 for (SelectionKey key:selector.keys())
885                 {
886                     if (key==null)
887                         continue;
888                     Object att=key.attachment();
889                     if (att instanceof EndPoint)
890                     {
891                         EndPoint endpoint = (EndPoint)att;
892                         try
893                         {
894                             endpoint.close();
895                         }
896                         catch(IOException e)
897                         {
898                             LOG.ignore(e);
899                         }
900                     }
901                 }
902 
903 
904                 _timeout.cancelAll();
905                 try
906                 {
907                     selector=_selector;
908                     if (selector != null)
909                         selector.close();
910                 }
911                 catch (IOException e)
912                 {
913                     LOG.ignore(e);
914                 }
915                 _selector=null;
916             }
917         }
918 
919         /* ------------------------------------------------------------ */
920         public String dump()
921         {
922             return AggregateLifeCycle.dump(this);
923         }
924 
925         /* ------------------------------------------------------------ */
926         public void dump(Appendable out, String indent) throws IOException
927         {
928             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
929 
930             Thread selecting = _selecting;
931 
932             Object where = "not selecting";
933             StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
934             if (trace!=null)
935             {
936                 for (StackTraceElement t:trace)
937                     if (t.getClassName().startsWith("org.eclipse.jetty."))
938                     {
939                         where=t;
940                         break;
941                     }
942             }
943 
944             Selector selector=_selector;
945             if (selector!=null)
946             {
947                 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
948                 dump.add(where);
949 
950                 final CountDownLatch latch = new CountDownLatch(1);
951 
952                 addChange(new ChangeTask()
953                 {
954                     public void run()
955                     {
956                         dumpKeyState(dump);
957                         latch.countDown();
958                     }
959                 });
960 
961                 try
962                 {
963                     latch.await(5,TimeUnit.SECONDS);
964                 }
965                 catch(InterruptedException e)
966                 {
967                     LOG.ignore(e);
968                 }
969 
970                 AggregateLifeCycle.dump(out,indent,dump);
971             }
972         }
973 
974         /* ------------------------------------------------------------ */
975         public void dumpKeyState(List<Object> dumpto)
976         {
977             Selector selector=_selector;
978             Set<SelectionKey> keys = selector.keys();
979             dumpto.add(selector + " keys=" + keys.size());
980             for (SelectionKey key: keys)
981             {
982                 if (key.isValid())
983                     dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
984                 else
985                     dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
986             }
987         }
988 
989         /* ------------------------------------------------------------ */
990         public String toString()
991         {
992             Selector selector=_selector;
993             return String.format("%s keys=%d selected=%d",
994                     super.toString(),
995                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
996                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
997         }
998     }
999 
1000     /* ------------------------------------------------------------ */
1001     private static class ChannelAndAttachment
1002     {
1003         final SelectableChannel _channel;
1004         final Object _attachment;
1005 
1006         public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1007         {
1008             super();
1009             _channel = channel;
1010             _attachment = attachment;
1011         }
1012     }
1013 
1014     /* ------------------------------------------------------------ */
1015     public boolean isDeferringInterestedOps0()
1016     {
1017         return _deferringInterestedOps0;
1018     }
1019 
1020     /* ------------------------------------------------------------ */
1021     public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1022     {
1023         _deferringInterestedOps0 = deferringInterestedOps0;
1024     }
1025 
1026 
1027     /* ------------------------------------------------------------ */
1028     /* ------------------------------------------------------------ */
1029     /* ------------------------------------------------------------ */
1030     private interface ChangeTask extends Runnable
1031     {}
1032 
1033 }