View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.Selector;
21  import java.nio.channels.ServerSocketChannel;
22  import java.nio.channels.SocketChannel;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Set;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.ConcurrentMap;
29  
30  import org.eclipse.jetty.io.ConnectedEndPoint;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.io.EndPoint;
33  import org.eclipse.jetty.util.component.AbstractLifeCycle;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.thread.Timeout;
36  import org.eclipse.jetty.util.thread.Timeout.Task;
37  
38  
39  /* ------------------------------------------------------------ */
40  /**
41   * The Selector Manager manages and number of SelectSets to allow
42   * NIO scheduling to scale to large numbers of connections.
43   * <p>
44   * This class works around a number of know JVM bugs. For details
45   * see http://wiki.eclipse.org/Jetty/Feature/JVM_NIO_Bug
46   */
47  public abstract class SelectorManager extends AbstractLifeCycle
48  {
49      // TODO Tune these by approx system speed.
50      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
51      private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
52      private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
53      private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
54      private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
55      
56      private int _maxIdleTime;
57      private int _lowResourcesMaxIdleTime;
58      private long _lowResourcesConnections;
59      private SelectSet[] _selectSet;
60      private int _selectSets=1;
61      private volatile int _set;
62      
63      /* ------------------------------------------------------------ */
64      /**
65       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
66       * @see #setLowResourcesMaxIdleTime(long)
67       */
68      public void setMaxIdleTime(long maxIdleTime)
69      {
70          _maxIdleTime=(int)maxIdleTime;
71      }
72      
73      /* ------------------------------------------------------------ */
74      /**
75       * @param selectSets number of select sets to create
76       */
77      public void setSelectSets(int selectSets)
78      {
79          long lrc = _lowResourcesConnections * _selectSets; 
80          _selectSets=selectSets;
81          _lowResourcesConnections=lrc/_selectSets;
82      }
83      
84      /* ------------------------------------------------------------ */
85      /**
86       * @return the max idle time
87       */
88      public long getMaxIdleTime()
89      {
90          return _maxIdleTime;
91      }
92      
93      /* ------------------------------------------------------------ */
94      /**
95       * @return the number of select sets in use
96       */
97      public int getSelectSets()
98      {
99          return _selectSets;
100     }
101 
102     /* ------------------------------------------------------------ */
103     /**
104      * @param i 
105      * @return The select set
106      */
107     public SelectSet getSelectSet(int i)
108     {
109         return _selectSet[i];
110     }
111     /* ------------------------------------------------------------ */
112     /** Register a channel
113      * @param channel
114      * @param att Attached Object
115      */
116     public void register(SocketChannel channel, Object att)
117     {
118         // The ++ increment here is not atomic, but it does not matter.
119         // so long as the value changes sometimes, then connections will
120         // be distributed over the available sets.
121         
122         int s=_set++; 
123         s=s%_selectSets;
124         SelectSet[] sets=_selectSet;
125         if (sets!=null)
126         {
127             SelectSet set=sets[s];
128             set.addChange(channel,att);
129             set.wakeup();
130         }
131     }
132     
133     /* ------------------------------------------------------------ */
134     /** Register a {@link ServerSocketChannel}
135      * @param acceptChannel
136      */
137     public void register(ServerSocketChannel acceptChannel)
138     {
139         int s=_set++; 
140         s=s%_selectSets;
141         SelectSet set=_selectSet[s];
142         set.addChange(acceptChannel);
143         set.wakeup();
144     }
145 
146     /* ------------------------------------------------------------ */
147     /**
148      * @return the lowResourcesConnections
149      */
150     public long getLowResourcesConnections()
151     {
152         return _lowResourcesConnections*_selectSets;
153     }
154 
155     /* ------------------------------------------------------------ */
156     /**
157      * Set the number of connections, which if exceeded places this manager in low resources state.
158      * This is not an exact measure as the connection count is averaged over the select sets.
159      * @param lowResourcesConnections the number of connections
160      * @see #setLowResourcesMaxIdleTime(long)
161      */
162     public void setLowResourcesConnections(long lowResourcesConnections)
163     {
164         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
165     }
166 
167     /* ------------------------------------------------------------ */
168     /**
169      * @return the lowResourcesMaxIdleTime
170      */
171     public long getLowResourcesMaxIdleTime()
172     {
173         return _lowResourcesMaxIdleTime;
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
179      * @see #setMaxIdleTime(long)
180      */
181     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
182     {
183         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
184     }
185     
186     /* ------------------------------------------------------------ */
187     /**
188      * @param acceptorID
189      * @throws IOException
190      */
191     public void doSelect(int acceptorID) throws IOException
192     {
193         SelectSet[] sets= _selectSet;
194         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
195             sets[acceptorID].doSelect();
196     }
197 
198     /* ------------------------------------------------------------ */
199     /**
200      * @param key the selection key
201      * @return the SocketChannel created on accept
202      * @throws IOException 
203      */
204     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
205 
206     /* ------------------------------------------------------------------------------- */
207     public abstract boolean dispatch(Runnable task);
208 
209     /* ------------------------------------------------------------ */
210     /* (non-Javadoc)
211      * @see org.eclipse.component.AbstractLifeCycle#doStart()
212      */
213     @Override
214     protected void doStart() throws Exception
215     {
216         _selectSet = new SelectSet[_selectSets];
217         for (int i=0;i<_selectSet.length;i++)
218             _selectSet[i]= new SelectSet(i);
219 
220         super.doStart();
221     }
222 
223 
224     /* ------------------------------------------------------------------------------- */
225     @Override
226     protected void doStop() throws Exception
227     {
228         SelectSet[] sets= _selectSet;
229         _selectSet=null;
230         if (sets!=null)
231         {
232             for (SelectSet set : sets)
233             {
234                 if (set!=null)
235                     set.stop();
236             }
237         }
238         super.doStop();
239     }
240 
241     /* ------------------------------------------------------------ */
242     /**
243      * @param endpoint
244      */
245     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
246 
247     /* ------------------------------------------------------------ */
248     /**
249      * @param endpoint
250      */
251     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
252 
253     /* ------------------------------------------------------------ */
254     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
255 
256     /* ------------------------------------------------------------------------------- */
257     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
258 
259     /* ------------------------------------------------------------ */
260     /**
261      * Create a new end point
262      * @param channel
263      * @param selectSet
264      * @param sKey the selection key
265      * @return the new endpoint {@link SelectChannelEndPoint}
266      * @throws IOException
267      */
268     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
269 
270     /* ------------------------------------------------------------------------------- */
271     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
272     {
273         Log.warn(ex+","+channel+","+attachment);
274         Log.debug(ex);
275     }
276 
277     /* ------------------------------------------------------------------------------- */
278     public void dump()
279     {
280         for (final SelectSet set :_selectSet)
281         {
282             Thread selecting = set._selecting;
283             Log.info("SelectSet "+set._setID+" : "+selecting);
284             if (selecting!=null)
285             {
286                 StackTraceElement[] trace =selecting.getStackTrace();
287                 if (trace!=null)
288                 {
289                     for (StackTraceElement e : trace)
290                     {
291                         Log.info("\tat "+e.toString());
292                     }
293                 }
294             }
295                 
296             set.addChange(new ChangeTask(){
297                 public void run()
298                 {
299                     set.dump();
300                 }
301             });
302         }
303     }
304     
305     
306     /* ------------------------------------------------------------------------------- */
307     /* ------------------------------------------------------------------------------- */
308     /* ------------------------------------------------------------------------------- */
309     public class SelectSet 
310     {
311         private final int _setID;
312         private final Timeout _timeout;
313         
314         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
315         
316         private Selector _selector;
317         
318         private int _nextSet;
319         private volatile Thread _selecting;
320         private int _jvmBug;
321         private int _selects;
322         private long _monitorStart;
323         private long _monitorNext;
324         private boolean _pausing;
325         private SelectionKey _busyKey;
326         private int _busyKeyCount;
327         private long _log;
328         private int _paused;
329         private int _jvmFix0;
330         private int _jvmFix1;
331         private int _jvmFix2;
332         private volatile long _idleTick;
333         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
334         
335         /* ------------------------------------------------------------ */
336         SelectSet(int acceptorID) throws Exception
337         {
338             _setID=acceptorID;
339 
340             _idleTick = System.currentTimeMillis();
341             _timeout = new Timeout(this);
342             _timeout.setDuration(0L);
343 
344             // create a selector;
345             _selector = Selector.open();
346             _monitorStart=System.currentTimeMillis();
347             _monitorNext=_monitorStart+__MONITOR_PERIOD;
348             _log=_monitorStart+60000;
349         }
350         
351         /* ------------------------------------------------------------ */
352         public void addChange(Object point)
353         {
354             _changes.add(point);
355         }
356         
357         /* ------------------------------------------------------------ */
358         public void addChange(SelectableChannel channel, Object att)
359         {   
360             if (att==null)
361                 addChange(channel);
362             else if (att instanceof EndPoint)
363                 addChange(att);
364             else
365                 addChange(new ChangeSelectableChannel(channel,att));
366         }
367         
368         /* ------------------------------------------------------------ */
369         /**
370          * Select and dispatch tasks found from changes and the selector.
371          * 
372          * @throws IOException
373          */
374         public void doSelect() throws IOException
375         {
376             try
377             {
378                 _selecting=Thread.currentThread();
379                 final Selector selector=_selector;
380 
381                 // Make any key changes required
382                 Object change;
383                 int changes=_changes.size();
384                 while (changes-->0 && (change=_changes.poll())!=null)
385                 {
386                     try
387                     {
388                         if (change instanceof EndPoint)
389                         {
390                             // Update the operations for a key.
391                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
392                             endpoint.doUpdateKey();
393                         }
394                         else if (change instanceof Runnable)
395                         {
396                             dispatch((Runnable)change);
397                         }
398                         else if (change instanceof ChangeSelectableChannel)
399                         {
400                             // finish accepting/connecting this connection
401                             final ChangeSelectableChannel asc = (ChangeSelectableChannel)change;
402                             final SelectableChannel channel=asc._channel;
403                             final Object att = asc._attachment;
404 
405                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
406                             {
407                                 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
408                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
409                                 key.attach(endpoint);
410                                 endpoint.schedule();
411                             }
412                             else if (channel.isOpen())
413                             {
414                                 channel.register(selector,SelectionKey.OP_CONNECT,att);
415                             }
416                         }
417                         else if (change instanceof SocketChannel)
418                         {
419                             final SocketChannel channel=(SocketChannel)change;
420 
421                             if (channel.isConnected())
422                             {
423                                 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
424                                 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
425                                 key.attach(endpoint);
426                                 endpoint.schedule();
427                             }
428                             else if (channel.isOpen())
429                             {
430                                 channel.register(selector,SelectionKey.OP_CONNECT,null);
431                             }
432                         }
433                         else if (change instanceof ServerSocketChannel)
434                         {
435                             ServerSocketChannel channel = (ServerSocketChannel)change;
436                             channel.register(getSelector(),SelectionKey.OP_ACCEPT);
437                         }
438                         else if (change instanceof ChangeTask)
439                         {
440                             ((ChangeTask)change).run();
441                         }
442                         else
443                             throw new IllegalArgumentException(change.toString());
444                     }
445                     catch (Exception e)
446                     {
447                         if (isRunning())
448                             Log.warn(e);
449                         else
450                             Log.debug(e);
451                     }
452                 }
453 
454                 long retry_next;
455                 long now=System.currentTimeMillis();
456                 _timeout.setNow(now);
457 
458                 retry_next=_timeout.getTimeToNext();
459 
460                 // workout how low to wait in select
461                 long wait = 1000L;  
462                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
463                     wait = retry_next;
464     
465                 // Do the select.
466                 if (wait > 0) 
467                 {
468                     // If we are in pausing mode
469                     if (_pausing)
470                     {
471                         try
472                         {
473                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
474                         }
475                         catch(InterruptedException e)
476                         {
477                             Log.ignore(e);
478                         }
479                     }
480                         
481                     long before=now;
482                     int selected=selector.select(wait);
483                     now = System.currentTimeMillis();
484                     _timeout.setNow(now);
485                     _selects++;
486   
487                     // Look for JVM bugs over a monitor period.
488                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
489                     // http://bugs.sun.com/view_bug.do?bug_id=6693490
490                     if (now>_monitorNext)
491                     {
492                         _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
493                         _pausing=_selects>__MAX_SELECTS;
494                         if (_pausing)
495                             _paused++;
496                             
497                         _selects=0;
498                         _jvmBug=0;
499                         _monitorStart=now;
500                         _monitorNext=now+__MONITOR_PERIOD;
501                     }
502                     
503                     if (now>_log)
504                     {
505                         if (_paused>0)  
506                             Log.info(this+" Busy selector - injecting delay "+_paused+" times");
507 
508                         if (_jvmFix2>0)
509                             Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
510 
511                         if (_jvmFix1>0)
512                             Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
513 
514                         else if(Log.isDebugEnabled() && _jvmFix0>0)
515                             Log.info(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
516                         _paused=0;
517                         _jvmFix2=0;
518                         _jvmFix1=0;
519                         _jvmFix0=0;
520                         _log=now+60000;
521                     }
522                     
523                     // If we see signature of possible JVM bug, increment count.
524                     if (selected==0 && wait>10 && (now-before)<(wait/2))
525                     {
526                         // Increment bug count and try a work around
527                         _jvmBug++;
528                         if (_jvmBug>(__JVMBUG_THRESHHOLD))
529                         {
530                             try
531                             {
532                                 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
533                                     _jvmFix2++;
534                                     
535                                 Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
536                             }
537                             catch(InterruptedException e)
538                             {
539                                 Log.ignore(e);
540                             }
541                         }
542                         else if (_jvmBug==__JVMBUG_THRESHHOLD)
543                         {
544                             synchronized (this)
545                             {
546                                 // BLOODY SUN BUG !!!  Try refreshing the entire selector.
547                                 final Selector new_selector = Selector.open();
548                                 for (SelectionKey k: selector.keys())
549                                 {
550                                     if (!k.isValid() || k.interestOps()==0)
551                                         continue;
552                                     
553                                     final SelectableChannel channel = k.channel();
554                                     final Object attachment = k.attachment();
555                                     
556                                     if (attachment==null)
557                                         addChange(channel);
558                                     else
559                                         addChange(channel,attachment);
560                                 }
561                                 _selector.close();
562                                 _selector=new_selector;
563                                 return;
564                             }
565                         }
566                         else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
567                         {
568                             // Cancel keys with 0 interested ops
569                             int cancelled=0;
570                             for (SelectionKey k: selector.keys())
571                             {
572                                 if (k.isValid()&&k.interestOps()==0)
573                                 {
574                                     k.cancel();
575                                     cancelled++;
576                                 }
577                             }
578                             if (cancelled>0)
579                                 _jvmFix0++;
580                             
581                             return;
582                         }
583                     }
584                     else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
585                     {
586                         // Look for busy key
587                         SelectionKey busy = selector.selectedKeys().iterator().next();
588                         if (busy==_busyKey)
589                         {
590                             if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
591                             {
592                                 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
593                                 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
594                                 busy.cancel();
595                                 if (endpoint!=null)
596                                 {
597                                     dispatch(new Runnable()
598                                     {
599                                         public void run()
600                                         {
601                                             try
602                                             {
603                                                 endpoint.close();
604                                             }
605                                             catch (IOException e)
606                                             {
607                                                 Log.ignore(e);
608                                             }
609                                         }
610                                     });
611                                 }
612                             }
613                         }
614                         else
615                             _busyKeyCount=0;
616                         _busyKey=busy;
617                     }
618                 }
619                 else 
620                 {
621                     selector.selectNow();
622                     _selects++;
623                 }
624 
625                 // have we been destroyed while sleeping
626                 if (_selector==null || !selector.isOpen())
627                     return;
628 
629                 // Look for things to do
630                 for (SelectionKey key: selector.selectedKeys())
631                 {   
632                     try
633                     {
634                         if (!key.isValid())
635                         {
636                             key.cancel();
637                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
638                             if (endpoint != null)
639                                 endpoint.doUpdateKey();
640                             continue;
641                         }
642 
643                         Object att = key.attachment();
644                         if (att instanceof SelectChannelEndPoint)
645                         {
646                             ((SelectChannelEndPoint)att).schedule();
647                         }
648                         else if (key.isAcceptable())
649                         {
650                             SocketChannel channel = acceptChannel(key);
651                             if (channel==null)
652                                 continue;
653 
654                             channel.configureBlocking(false);
655 
656                             // TODO make it reluctant to leave 0
657                             _nextSet=++_nextSet%_selectSet.length;
658 
659                             // Is this for this selectset
660                             if (_nextSet==_setID)
661                             {
662                                 // bind connections to this select set.
663                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
664                                 
665                                 SelectChannelEndPoint endpoint=_selectSet[_nextSet].createEndPoint(channel,cKey);
666                                 cKey.attach(endpoint);
667                                 if (endpoint != null)
668                                     endpoint.schedule();
669                             }
670                             else
671                             {
672                                 // nope - give it to another.
673                                 _selectSet[_nextSet].addChange(channel);
674                                 _selectSet[_nextSet].wakeup();
675                             }
676                         }
677                         else if (key.isConnectable())
678                         {
679                             // Complete a connection of a registered channel
680                             SocketChannel channel = (SocketChannel)key.channel();
681                             boolean connected=false;
682                             try
683                             {
684                                 connected=channel.finishConnect();
685                             }
686                             catch(Exception e)
687                             {
688                                 connectionFailed(channel,e,att);
689                             }
690                             finally
691                             {
692                                 if (connected)
693                                 {
694                                     key.interestOps(SelectionKey.OP_READ);
695                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
696                                     key.attach(endpoint);
697                                     endpoint.schedule();
698                                 }
699                                 else
700                                 {
701                                     key.cancel();
702                                 }
703                             }
704                         }
705                         else
706                         {
707                             // Wrap readable registered channel in an endpoint
708                             SocketChannel channel = (SocketChannel)key.channel();
709                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
710                             key.attach(endpoint);
711                             if (key.isReadable())
712                                 endpoint.schedule();                           
713                         }
714                         key = null;
715                     }
716                     catch (CancelledKeyException e)
717                     {
718                         Log.ignore(e);
719                     }
720                     catch (Exception e)
721                     {
722                         if (isRunning())
723                             Log.warn(e);
724                         else
725                             Log.ignore(e);
726 
727                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
728                             key.cancel();
729                     }
730                 }
731                 
732                 // Everything always handled
733                 selector.selectedKeys().clear();
734                 
735                 _timeout.setNow(now);
736                 Task task = _timeout.expired();
737                 while (task!=null)
738                 {
739                     if (task instanceof Runnable)
740                         dispatch((Runnable)task);
741                     else
742                         task.expired();
743                         
744                     task = _timeout.expired();
745                 }
746 
747                 // Idle tick
748                 if (now-_idleTick>1000)
749                 {
750                     _idleTick=now;
751                     
752                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
753                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
754                         :now;
755                         
756                     dispatch(new Runnable()
757                     {
758                         public void run()
759                         {
760                             for (SelectChannelEndPoint endp:_endPoints.keySet())
761                             {
762                                 endp.checkIdleTimestamp(idle_now);
763                             }
764                         }
765                     });
766                 }
767             }
768             catch (CancelledKeyException e)
769             {
770                 Log.ignore(e);
771             }
772             finally
773             {
774                 _selecting=null;
775             }
776         }
777 
778         /* ------------------------------------------------------------ */
779         public SelectorManager getManager()
780         {
781             return SelectorManager.this;
782         }
783 
784         /* ------------------------------------------------------------ */
785         public long getNow()
786         {
787             return _timeout.getNow();
788         }
789 
790         /* ------------------------------------------------------------ */
791         /**
792          * @param task The task to timeout. If it implements Runnable, then 
793          * expired will be called from a dispatched thread.
794          * 
795          * @param timeoutMs
796          */
797         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
798         {
799             _timeout.schedule(task, timeoutMs);
800         }
801         
802         /* ------------------------------------------------------------ */
803         public void cancelTimeout(Timeout.Task task)
804         {
805             task.cancel();
806         }
807 
808         /* ------------------------------------------------------------ */
809         public void wakeup()
810         {
811             Selector selector = _selector;
812             if (selector!=null)
813                 selector.wakeup();
814         }
815         
816         /* ------------------------------------------------------------ */
817         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
818         {
819             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
820             endPointOpened(endp); 
821             _endPoints.put(endp,this);
822             return endp;
823         }
824         
825         /* ------------------------------------------------------------ */
826         public void destroyEndPoint(SelectChannelEndPoint endp)
827         {
828             _endPoints.remove(endp);
829             endPointClosed(endp);
830         }
831 
832         /* ------------------------------------------------------------ */
833         Selector getSelector()
834         {
835             return _selector;
836         }
837         
838         /* ------------------------------------------------------------ */
839         void stop() throws Exception
840         {
841             boolean selecting=true;
842             while(selecting)
843             {
844                 wakeup();
845                 selecting=_selecting!=null;
846             }
847 
848             for (SelectionKey key:_selector.keys())
849             {
850                 if (key==null)
851                     continue;
852                 Object att=key.attachment();
853                 if (att instanceof EndPoint)
854                 {
855                     EndPoint endpoint = (EndPoint)att;
856                     try
857                     {
858                         endpoint.close();
859                     }
860                     catch(IOException e)
861                     {
862                         Log.ignore(e);
863                     }
864                 }
865             }
866             
867             synchronized (this)
868             {
869                 selecting=_selecting!=null;
870                 while(selecting)
871                 {
872                     wakeup();
873                     selecting=_selecting!=null;
874                 }
875                 
876                 _timeout.cancelAll();
877                 try
878                 {
879                     if (_selector != null)
880                         _selector.close();
881                 }
882                 catch (IOException e)
883                 {
884                     Log.ignore(e);
885                 } 
886                 _selector=null;
887             }
888         }
889         
890         public void dump()
891         {
892             synchronized (System.err)
893             {
894                 Selector selector=_selector;
895                 Log.info("SelectSet "+_setID+" "+selector.keys().size());
896                 for (SelectionKey key: selector.keys())
897                 {
898                     if (key.isValid())
899                         Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment());
900                     else
901                         Log.info(key.channel()+" - - "+key.attachment());
902                 }
903             }
904         }
905     }
906 
907     /* ------------------------------------------------------------ */
908     private static class ChangeSelectableChannel
909     {
910         final SelectableChannel _channel;
911         final Object _attachment;
912         
913         public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
914         {
915             super();
916             _channel = channel;
917             _attachment = attachment;
918         }
919     }
920 
921     /* ------------------------------------------------------------ */
922     private interface ChangeTask
923     {
924         public void run();
925     }
926 }