View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.Channel;
19  import java.nio.channels.ClosedSelectorException;
20  import java.nio.channels.SelectableChannel;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.Selector;
23  import java.nio.channels.ServerSocketChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Set;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.ConcurrentMap;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.TimeUnit;
33  
34  import org.eclipse.jetty.io.AsyncEndPoint;
35  import org.eclipse.jetty.io.ConnectedEndPoint;
36  import org.eclipse.jetty.io.Connection;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.util.TypeUtil;
39  import org.eclipse.jetty.util.component.AbstractLifeCycle;
40  import org.eclipse.jetty.util.component.AggregateLifeCycle;
41  import org.eclipse.jetty.util.component.Dumpable;
42  import org.eclipse.jetty.util.log.Log;
43  import org.eclipse.jetty.util.log.Logger;
44  import org.eclipse.jetty.util.thread.Timeout;
45  import org.eclipse.jetty.util.thread.Timeout.Task;
46  
47  
48  /* ------------------------------------------------------------ */
49  /**
50   * The Selector Manager manages and number of SelectSets to allow
51   * NIO scheduling to scale to large numbers of connections.
52   * <p>
53   */
54  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
55  {
56      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
57  
58      private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
59      private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
60      private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
61      private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
62  
63      private int _maxIdleTime;
64      private int _lowResourcesMaxIdleTime;
65      private long _lowResourcesConnections;
66      private SelectSet[] _selectSet;
67      private int _selectSets=1;
68      private volatile int _set=0;
69      private boolean _deferringInterestedOps0=true;
70      private int _selectorPriorityDelta=0;
71  
72      /* ------------------------------------------------------------ */
73      /**
74       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
75       * @see #setLowResourcesMaxIdleTime(long)
76       */
77      public void setMaxIdleTime(long maxIdleTime)
78      {
79          _maxIdleTime=(int)maxIdleTime;
80      }
81  
82      /* ------------------------------------------------------------ */
83      /**
84       * @param selectSets number of select sets to create
85       */
86      public void setSelectSets(int selectSets)
87      {
88          long lrc = _lowResourcesConnections * _selectSets;
89          _selectSets=selectSets;
90          _lowResourcesConnections=lrc/_selectSets;
91      }
92  
93      /* ------------------------------------------------------------ */
94      /**
95       * @return the max idle time
96       */
97      public long getMaxIdleTime()
98      {
99          return _maxIdleTime;
100     }
101 
102     /* ------------------------------------------------------------ */
103     /**
104      * @return the number of select sets in use
105      */
106     public int getSelectSets()
107     {
108         return _selectSets;
109     }
110 
111     /* ------------------------------------------------------------ */
112     /**
113      * @param i
114      * @return The select set
115      */
116     public SelectSet getSelectSet(int i)
117     {
118         return _selectSet[i];
119     }
120 
121     /* ------------------------------------------------------------ */
122     /** Register a channel
123      * @param channel
124      * @param att Attached Object
125      */
126     public void register(SocketChannel channel, Object att)
127     {
128         // The ++ increment here is not atomic, but it does not matter.
129         // so long as the value changes sometimes, then connections will
130         // be distributed over the available sets.
131 
132         int s=_set++;
133         if (s<0)
134             s=-s;
135         s=s%_selectSets;
136         SelectSet[] sets=_selectSet;
137         if (sets!=null)
138         {
139             SelectSet set=sets[s];
140             set.addChange(channel,att);
141             set.wakeup();
142         }
143     }
144 
145 
146     /* ------------------------------------------------------------ */
147     /** Register a channel
148      * @param channel
149      */
150     public void register(SocketChannel channel)
151     {
152         // The ++ increment here is not atomic, but it does not matter.
153         // so long as the value changes sometimes, then connections will
154         // be distributed over the available sets.
155 
156         int s=_set++;
157         if (s<0)
158             s=-s;
159         s=s%_selectSets;
160         SelectSet[] sets=_selectSet;
161         if (sets!=null)
162         {
163             SelectSet set=sets[s];
164             set.addChange(channel);
165             set.wakeup();
166         }
167     }
168 
169     /* ------------------------------------------------------------ */
170     /** Register a {@link ServerSocketChannel}
171      * @param acceptChannel
172      */
173     public void register(ServerSocketChannel acceptChannel)
174     {
175         int s=_set++;
176         if (s<0)
177             s=-s;
178         s=s%_selectSets;
179         SelectSet set=_selectSet[s];
180         set.addChange(acceptChannel);
181         set.wakeup();
182     }
183 
184     /* ------------------------------------------------------------ */
185     /**
186      * @return delta The value to add to the selector thread priority.
187      */
188     public int getSelectorPriorityDelta()
189     {
190         return _selectorPriorityDelta;
191     }
192 
193     /* ------------------------------------------------------------ */
194     /** Set the selector thread priorty delta.
195      * @param delta The value to add to the selector thread priority.
196      */
197     public void setSelectorPriorityDelta(int delta)
198     {
199         _selectorPriorityDelta=delta;
200     }
201 
202 
203     /* ------------------------------------------------------------ */
204     /**
205      * @return the lowResourcesConnections
206      */
207     public long getLowResourcesConnections()
208     {
209         return _lowResourcesConnections*_selectSets;
210     }
211 
212     /* ------------------------------------------------------------ */
213     /**
214      * Set the number of connections, which if exceeded places this manager in low resources state.
215      * This is not an exact measure as the connection count is averaged over the select sets.
216      * @param lowResourcesConnections the number of connections
217      * @see #setLowResourcesMaxIdleTime(long)
218      */
219     public void setLowResourcesConnections(long lowResourcesConnections)
220     {
221         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
222     }
223 
224     /* ------------------------------------------------------------ */
225     /**
226      * @return the lowResourcesMaxIdleTime
227      */
228     public long getLowResourcesMaxIdleTime()
229     {
230         return _lowResourcesMaxIdleTime;
231     }
232 
233     /* ------------------------------------------------------------ */
234     /**
235      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
236      * @see #setMaxIdleTime(long)
237      */
238     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
239     {
240         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
241     }
242 
243 
244     /* ------------------------------------------------------------------------------- */
245     public abstract boolean dispatch(Runnable task);
246 
247     /* ------------------------------------------------------------ */
248     /* (non-Javadoc)
249      * @see org.eclipse.component.AbstractLifeCycle#doStart()
250      */
251     @Override
252     protected void doStart() throws Exception
253     {
254         _selectSet = new SelectSet[_selectSets];
255         for (int i=0;i<_selectSet.length;i++)
256             _selectSet[i]= new SelectSet(i);
257 
258         super.doStart();
259 
260         // start a thread to Select
261         for (int i=0;i<getSelectSets();i++)
262         {
263             final int id=i;
264             boolean selecting=dispatch(new Runnable()
265             {
266                 public void run()
267                 {
268                     String name=Thread.currentThread().getName();
269                     int priority=Thread.currentThread().getPriority();
270                     try
271                     {
272                         SelectSet[] sets=_selectSet;
273                         if (sets==null)
274                             return;
275                         SelectSet set=sets[id];
276 
277                         Thread.currentThread().setName(name+" Selector"+id);
278                         if (getSelectorPriorityDelta()!=0)
279                             Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
280                         LOG.debug("Starting {} on {}",Thread.currentThread(),this);
281                         while (isRunning())
282                         {
283                             try
284                             {
285                                 set.doSelect();
286                             }
287                             catch(IOException e)
288                             {
289                                 LOG.ignore(e);
290                             }
291                             catch(Exception e)
292                             {
293                                 LOG.warn(e);
294                             }
295                         }
296                     }
297                     finally
298                     {
299                         LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
300                         Thread.currentThread().setName(name);
301                         if (getSelectorPriorityDelta()!=0)
302                             Thread.currentThread().setPriority(priority);
303                     }
304                 }
305 
306             });
307 
308             if (!selecting)
309                 throw new IllegalStateException("!Selecting");
310         }
311     }
312 
313 
314     /* ------------------------------------------------------------------------------- */
315     @Override
316     protected void doStop() throws Exception
317     {
318         SelectSet[] sets= _selectSet;
319         _selectSet=null;
320         if (sets!=null)
321         {
322             for (SelectSet set : sets)
323             {
324                 if (set!=null)
325                     set.stop();
326             }
327         }
328         super.doStop();
329     }
330 
331     /* ------------------------------------------------------------ */
332     /**
333      * @param endpoint
334      */
335     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
336 
337     /* ------------------------------------------------------------ */
338     /**
339      * @param endpoint
340      */
341     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
342 
343     /* ------------------------------------------------------------ */
344     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
345 
346     /* ------------------------------------------------------------------------------- */
347     public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
348 
349     /* ------------------------------------------------------------ */
350     /**
351      * Create a new end point
352      * @param channel
353      * @param selectSet
354      * @param sKey the selection key
355      * @return the new endpoint {@link SelectChannelEndPoint}
356      * @throws IOException
357      */
358     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
359 
360     /* ------------------------------------------------------------------------------- */
361     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
362     {
363         LOG.warn(ex+","+channel+","+attachment);
364         LOG.debug(ex);
365     }
366 
367     /* ------------------------------------------------------------ */
368     public String dump()
369     {
370         return AggregateLifeCycle.dump(this);
371     }
372 
373     /* ------------------------------------------------------------ */
374     public void dump(Appendable out, String indent) throws IOException
375     {
376         AggregateLifeCycle.dumpObject(out,this);
377         AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
378     }
379 
380 
381     /* ------------------------------------------------------------------------------- */
382     /* ------------------------------------------------------------------------------- */
383     /* ------------------------------------------------------------------------------- */
384     public class SelectSet implements Dumpable
385     {
386         private final int _setID;
387         private final Timeout _timeout;
388 
389         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
390 
391         private volatile Selector _selector;
392 
393         private volatile Thread _selecting;
394         private int _busySelects;
395         private long _monitorNext;
396         private boolean _pausing;
397         private boolean _paused;
398         private volatile long _idleTick;
399         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
400 
401         /* ------------------------------------------------------------ */
402         SelectSet(int acceptorID) throws Exception
403         {
404             _setID=acceptorID;
405 
406             _idleTick = System.currentTimeMillis();
407             _timeout = new Timeout(this);
408             _timeout.setDuration(0L);
409 
410             // create a selector;
411             _selector = Selector.open();
412             _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
413         }
414 
415         /* ------------------------------------------------------------ */
416         public void addChange(Object change)
417         {
418             _changes.add(change);
419         }
420 
421         /* ------------------------------------------------------------ */
422         public void addChange(SelectableChannel channel, Object att)
423         {
424             if (att==null)
425                 addChange(channel);
426             else if (att instanceof EndPoint)
427                 addChange(att);
428             else
429                 addChange(new ChannelAndAttachment(channel,att));
430         }
431 
432         /* ------------------------------------------------------------ */
433         /**
434          * Select and dispatch tasks found from changes and the selector.
435          *
436          * @throws IOException
437          */
438         public void doSelect() throws IOException
439         {
440             try
441             {
442                 _selecting=Thread.currentThread();
443                 final Selector selector=_selector;
444                 // Stopped concurrently ?
445                 if (selector == null)
446                     return;
447 
448                 // Make any key changes required
449                 Object change;
450                 int changes=_changes.size();
451                 while (changes-->0 && (change=_changes.poll())!=null)
452                 {
453                     Channel ch=null;
454                     SelectionKey key=null;
455 
456                     try
457                     {
458                         if (change instanceof EndPoint)
459                         {
460                             // Update the operations for a key.
461                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
462                             ch=endpoint.getChannel();
463                             endpoint.doUpdateKey();
464                         }
465                         else if (change instanceof ChannelAndAttachment)
466                         {
467                             // finish accepting/connecting this connection
468                             final ChannelAndAttachment asc = (ChannelAndAttachment)change;
469                             final SelectableChannel channel=asc._channel;
470                             ch=channel;
471                             final Object att = asc._attachment;
472 
473                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
474                             {
475                                 key = channel.register(selector,SelectionKey.OP_READ,att);
476                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
477                                 key.attach(endpoint);
478                                 endpoint.schedule();
479                             }
480                             else if (channel.isOpen())
481                             {
482                                 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
483                             }
484                         }
485                         else if (change instanceof SocketChannel)
486                         {
487                             // Newly registered channel
488                             final SocketChannel channel=(SocketChannel)change;
489                             ch=channel;
490                             key = channel.register(selector,SelectionKey.OP_READ,null);
491                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
492                             key.attach(endpoint);
493                             endpoint.schedule();
494                         }
495                         else if (change instanceof ChangeTask)
496                         {
497                             ((Runnable)change).run();
498                         }
499                         else if (change instanceof Runnable)
500                         {
501                             dispatch((Runnable)change);
502                         }
503                         else
504                             throw new IllegalArgumentException(change.toString());
505                     }
506                     catch (CancelledKeyException e)
507                     {
508                         LOG.ignore(e);
509                     }
510                     catch (Throwable e)
511                     {
512                         if (isRunning())
513                             LOG.warn(e);
514                         else
515                             LOG.debug(e);
516 
517                         try
518                         {
519                             if (ch!=null)
520                                 ch.close();
521                         }
522                         catch(IOException e2)
523                         {
524                             LOG.debug(e2);
525                         }
526                     }
527                 }
528 
529 
530                 // Do and instant select to see if any connections can be handled.
531                 int selected=selector.selectNow();
532 
533                 long now=System.currentTimeMillis();
534 
535                 // if no immediate things to do
536                 if (selected==0 && selector.selectedKeys().isEmpty())
537                 {
538                     // If we are in pausing mode
539                     if (_pausing)
540                     {
541                         try
542                         {
543                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
544                         }
545                         catch(InterruptedException e)
546                         {
547                             LOG.ignore(e);
548                         }
549                         now=System.currentTimeMillis();
550                     }
551 
552                     // workout how long to wait in select
553                     _timeout.setNow(now);
554                     long to_next_timeout=_timeout.getTimeToNext();
555 
556                     long wait = _changes.size()==0?__IDLE_TICK:0L;
557                     if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
558                         wait = to_next_timeout;
559 
560                     // If we should wait with a select
561                     if (wait>0)
562                     {
563                         long before=now;
564                         selected=selector.select(wait);
565                         now = System.currentTimeMillis();
566                         _timeout.setNow(now);
567 
568                         // If we are monitoring for busy selector
569                         // and this select did not wait more than 1ms
570                         if (__MONITOR_PERIOD>0 && now-before <=1)
571                         {
572                             // count this as a busy select and if there have been too many this monitor cycle
573                             if (++_busySelects>__MAX_SELECTS)
574                             {
575                                 // Start injecting pauses
576                                 _pausing=true;
577 
578                                 // if this is the first pause
579                                 if (!_paused)
580                                 {
581                                     // Log and dump some status
582                                     _paused=true;
583                                     LOG.warn("Selector {} is too busy, pausing!",this);
584                                 }
585                             }
586                         }
587                     }
588                 }
589 
590                 // have we been destroyed while sleeping
591                 if (_selector==null || !selector.isOpen())
592                     return;
593 
594                 // Look for things to do
595                 for (SelectionKey key: selector.selectedKeys())
596                 {
597                     SocketChannel channel=null;
598 
599                     try
600                     {
601                         if (!key.isValid())
602                         {
603                             key.cancel();
604                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
605                             if (endpoint != null)
606                                 endpoint.doUpdateKey();
607                             continue;
608                         }
609 
610                         Object att = key.attachment();
611                         if (att instanceof SelectChannelEndPoint)
612                         {
613                             if (key.isReadable()||key.isWritable())
614                                 ((SelectChannelEndPoint)att).schedule();
615                         }
616                         else if (key.isConnectable())
617                         {
618                             // Complete a connection of a registered channel
619                             channel = (SocketChannel)key.channel();
620                             boolean connected=false;
621                             try
622                             {
623                                 connected=channel.finishConnect();
624                             }
625                             catch(Exception e)
626                             {
627                                 connectionFailed(channel,e,att);
628                             }
629                             finally
630                             {
631                                 if (connected)
632                                 {
633                                     key.interestOps(SelectionKey.OP_READ);
634                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
635                                     key.attach(endpoint);
636                                     endpoint.schedule();
637                                 }
638                                 else
639                                 {
640                                     key.cancel();
641                                 }
642                             }
643                         }
644                         else
645                         {
646                             // Wrap readable registered channel in an endpoint
647                             channel = (SocketChannel)key.channel();
648                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
649                             key.attach(endpoint);
650                             if (key.isReadable())
651                                 endpoint.schedule();
652                         }
653                         key = null;
654                     }
655                     catch (CancelledKeyException e)
656                     {
657                         LOG.ignore(e);
658                     }
659                     catch (Exception e)
660                     {
661                         if (isRunning())
662                             LOG.warn(e);
663                         else
664                             LOG.ignore(e);
665 
666                         try
667                         {
668                             if (channel!=null)
669                                 channel.close();
670                         }
671                         catch(IOException e2)
672                         {
673                             LOG.debug(e2);
674                         }
675 
676                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
677                             key.cancel();
678                     }
679                 }
680 
681                 // Everything always handled
682                 selector.selectedKeys().clear();
683 
684                 now=System.currentTimeMillis();
685                 _timeout.setNow(now);
686                 Task task = _timeout.expired();
687                 while (task!=null)
688                 {
689                     if (task instanceof Runnable)
690                         dispatch((Runnable)task);
691                     task = _timeout.expired();
692                 }
693 
694                 // Idle tick
695                 if (now-_idleTick>__IDLE_TICK)
696                 {
697                     _idleTick=now;
698 
699                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
700                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
701                         :now;
702 
703                     dispatch(new Runnable()
704                     {
705                         public void run()
706                         {
707                             for (SelectChannelEndPoint endp:_endPoints.keySet())
708                             {
709                                 endp.checkIdleTimestamp(idle_now);
710                             }
711                         }
712                         public String toString() {return "Idle-"+super.toString();}
713                     });
714 
715                 }
716 
717                 // Reset busy select monitor counts
718                 if (__MONITOR_PERIOD>0 && now>_monitorNext)
719                 {
720                     _busySelects=0;
721                     _pausing=false;
722                     _monitorNext=now+__MONITOR_PERIOD;
723 
724                 }
725             }
726             catch (ClosedSelectorException e)
727             {
728                 if (isRunning())
729                     LOG.warn(e);
730                 else
731                     LOG.ignore(e);
732             }
733             catch (CancelledKeyException e)
734             {
735                 LOG.ignore(e);
736             }
737             finally
738             {
739                 _selecting=null;
740             }
741         }
742 
743 
744         /* ------------------------------------------------------------ */
745         private void renewSelector()
746         {
747             try
748             {
749                 synchronized (this)
750                 {
751                     Selector selector=_selector;
752                     if (selector==null)
753                         return;
754                     final Selector new_selector = Selector.open();
755                     for (SelectionKey k: selector.keys())
756                     {
757                         if (!k.isValid() || k.interestOps()==0)
758                             continue;
759 
760                         final SelectableChannel channel = k.channel();
761                         final Object attachment = k.attachment();
762 
763                         if (attachment==null)
764                             addChange(channel);
765                         else
766                             addChange(channel,attachment);
767                     }
768                     _selector.close();
769                     _selector=new_selector;
770                 }
771             }
772             catch(IOException e)
773             {
774                 throw new RuntimeException("recreating selector",e);
775             }
776         }
777 
778         /* ------------------------------------------------------------ */
779         public SelectorManager getManager()
780         {
781             return SelectorManager.this;
782         }
783 
784         /* ------------------------------------------------------------ */
785         public long getNow()
786         {
787             return _timeout.getNow();
788         }
789 
790         /* ------------------------------------------------------------ */
791         /**
792          * @param task The task to timeout. If it implements Runnable, then
793          * expired will be called from a dispatched thread.
794          *
795          * @param timeoutMs
796          */
797         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
798         {
799             if (!(task instanceof Runnable))
800                 throw new IllegalArgumentException("!Runnable");
801             _timeout.schedule(task, timeoutMs);
802         }
803 
804         /* ------------------------------------------------------------ */
805         public void cancelTimeout(Timeout.Task task)
806         {
807             task.cancel();
808         }
809 
810         /* ------------------------------------------------------------ */
811         public void wakeup()
812         {
813             try
814             {
815                 Selector selector = _selector;
816                 if (selector!=null)
817                     selector.wakeup();
818             }
819             catch(Exception e)
820             {
821                 addChange(new ChangeTask()
822                 {
823                     public void run()
824                     {
825                         renewSelector();
826                     }
827                 });
828 
829                 renewSelector();
830             }
831         }
832 
833         /* ------------------------------------------------------------ */
834         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
835         {
836             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
837             LOG.debug("created {}",endp);
838             endPointOpened(endp);
839             _endPoints.put(endp,this);
840             return endp;
841         }
842 
843         /* ------------------------------------------------------------ */
844         public void destroyEndPoint(SelectChannelEndPoint endp)
845         {
846             LOG.debug("destroyEndPoint {}",endp);
847             _endPoints.remove(endp);
848             endPointClosed(endp);
849         }
850 
851         /* ------------------------------------------------------------ */
852         Selector getSelector()
853         {
854             return _selector;
855         }
856 
857         /* ------------------------------------------------------------ */
858         void stop() throws Exception
859         {
860             // Spin for a while waiting for selector to complete
861             // to avoid unneccessary closed channel exceptions
862             try
863             {
864                 for (int i=0;i<100 && _selecting!=null;i++)
865                 {
866                     wakeup();
867                     Thread.sleep(10);
868                 }
869             }
870             catch(Exception e)
871             {
872                 LOG.ignore(e);
873             }
874 
875             // close endpoints and selector
876             synchronized (this)
877             {
878                 Selector selector=_selector;
879                 for (SelectionKey key:selector.keys())
880                 {
881                     if (key==null)
882                         continue;
883                     Object att=key.attachment();
884                     if (att instanceof EndPoint)
885                     {
886                         EndPoint endpoint = (EndPoint)att;
887                         try
888                         {
889                             endpoint.close();
890                         }
891                         catch(IOException e)
892                         {
893                             LOG.ignore(e);
894                         }
895                     }
896                 }
897 
898 
899                 _timeout.cancelAll();
900                 try
901                 {
902                     selector=_selector;
903                     if (selector != null)
904                         selector.close();
905                 }
906                 catch (IOException e)
907                 {
908                     LOG.ignore(e);
909                 }
910                 _selector=null;
911             }
912         }
913 
914         /* ------------------------------------------------------------ */
915         public String dump()
916         {
917             return AggregateLifeCycle.dump(this);
918         }
919 
920         /* ------------------------------------------------------------ */
921         public void dump(Appendable out, String indent) throws IOException
922         {
923             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
924 
925             Thread selecting = _selecting;
926 
927             Object where = "not selecting";
928             StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
929             if (trace!=null)
930             {
931                 for (StackTraceElement t:trace)
932                     if (t.getClassName().startsWith("org.eclipse.jetty."))
933                     {
934                         where=t;
935                         break;
936                     }
937             }
938 
939             Selector selector=_selector;
940             if (selector!=null)
941             {
942                 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
943                 dump.add(where);
944 
945                 final CountDownLatch latch = new CountDownLatch(1);
946 
947                 addChange(new ChangeTask()
948                 {
949                     public void run()
950                     {
951                         dumpKeyState(dump);
952                         latch.countDown();
953                     }
954                 });
955 
956                 try
957                 {
958                     latch.await(5,TimeUnit.SECONDS);
959                 }
960                 catch(InterruptedException e)
961                 {
962                     LOG.ignore(e);
963                 }
964 
965                 AggregateLifeCycle.dump(out,indent,dump);
966             }
967         }
968 
969         /* ------------------------------------------------------------ */
970         public void dumpKeyState(List<Object> dumpto)
971         {
972             Selector selector=_selector;
973             Set<SelectionKey> keys = selector.keys();
974             dumpto.add(selector + " keys=" + keys.size());
975             for (SelectionKey key: keys)
976             {
977                 if (key.isValid())
978                     dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
979                 else
980                     dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
981             }
982         }
983 
984         /* ------------------------------------------------------------ */
985         public String toString()
986         {
987             Selector selector=_selector;
988             return String.format("%s keys=%d selected=%d",
989                     super.toString(),
990                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
991                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
992         }
993     }
994 
995     /* ------------------------------------------------------------ */
996     private static class ChannelAndAttachment
997     {
998         final SelectableChannel _channel;
999         final Object _attachment;
1000 
1001         public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1002         {
1003             super();
1004             _channel = channel;
1005             _attachment = attachment;
1006         }
1007     }
1008 
1009     /* ------------------------------------------------------------ */
1010     public boolean isDeferringInterestedOps0()
1011     {
1012         return _deferringInterestedOps0;
1013     }
1014 
1015     /* ------------------------------------------------------------ */
1016     public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1017     {
1018         _deferringInterestedOps0 = deferringInterestedOps0;
1019     }
1020 
1021 
1022     /* ------------------------------------------------------------ */
1023     /* ------------------------------------------------------------ */
1024     /* ------------------------------------------------------------ */
1025     private interface ChangeTask extends Runnable
1026     {}
1027 
1028 }