View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.Selector;
21  import java.nio.channels.ServerSocketChannel;
22  import java.nio.channels.SocketChannel;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Set;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.ConcurrentMap;
29  
30  import org.eclipse.jetty.io.ConnectedEndPoint;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.io.EndPoint;
33  import org.eclipse.jetty.util.component.AbstractLifeCycle;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.thread.Timeout;
36  import org.eclipse.jetty.util.thread.Timeout.Task;
37  
38  
39  /* ------------------------------------------------------------ */
40  /**
41   * The Selector Manager manages and number of SelectSets to allow
42   * NIO scheduling to scale to large numbers of connections.
43   * <p>
44   * This class works around a number of know JVM bugs. For details
45   * see http://wiki.eclipse.org/Jetty/Feature/JVM_NIO_Bug
46   */
47  public abstract class SelectorManager extends AbstractLifeCycle
48  {
49      // TODO Tune these by approx system speed.
50      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
51      private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
52      private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
53      private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
54      private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
55      
56      private int _maxIdleTime;
57      private int _lowResourcesMaxIdleTime;
58      private long _lowResourcesConnections;
59      private transient SelectSet[] _selectSet;
60      private int _selectSets=1;
61      private volatile int _set;
62      
63      /* ------------------------------------------------------------ */
64      /**
65       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
66       * @see #setLowResourcesMaxIdleTime(long)
67       */
68      public void setMaxIdleTime(long maxIdleTime)
69      {
70          _maxIdleTime=(int)maxIdleTime;
71      }
72      
73      /* ------------------------------------------------------------ */
74      /**
75       * @param selectSets number of select sets to create
76       */
77      public void setSelectSets(int selectSets)
78      {
79          long lrc = _lowResourcesConnections * _selectSets; 
80          _selectSets=selectSets;
81          _lowResourcesConnections=lrc/_selectSets;
82      }
83      
84      /* ------------------------------------------------------------ */
85      /**
86       * @return the max idle time
87       */
88      public long getMaxIdleTime()
89      {
90          return _maxIdleTime;
91      }
92      
93      /* ------------------------------------------------------------ */
94      /**
95       * @return the number of select sets in use
96       */
97      public int getSelectSets()
98      {
99          return _selectSets;
100     }
101     
102     /* ------------------------------------------------------------ */
103     /** Register a channel
104      * @param channel
105      * @param att Attached Object
106      */
107     public void register(SocketChannel channel, Object att)
108     {
109         // The ++ increment here is not atomic, but it does not matter.
110         // so long as the value changes sometimes, then connections will
111         // be distributed over the available sets.
112         
113         int s=_set++; 
114         s=s%_selectSets;
115         SelectSet[] sets=_selectSet;
116         if (sets!=null)
117         {
118             SelectSet set=sets[s];
119             set.addChange(channel,att);
120             set.wakeup();
121         }
122     }
123     
124     /* ------------------------------------------------------------ */
125     /** Register a {@link ServerSocketChannel}
126      * @param acceptChannel
127      */
128     public void register(ServerSocketChannel acceptChannel)
129     {
130         int s=_set++; 
131         s=s%_selectSets;
132         SelectSet set=_selectSet[s];
133         set.addChange(acceptChannel);
134         set.wakeup();
135     }
136 
137     /* ------------------------------------------------------------ */
138     /**
139      * @return the lowResourcesConnections
140      */
141     public long getLowResourcesConnections()
142     {
143         return _lowResourcesConnections*_selectSets;
144     }
145 
146     /* ------------------------------------------------------------ */
147     /**
148      * Set the number of connections, which if exceeded places this manager in low resources state.
149      * This is not an exact measure as the connection count is averaged over the select sets.
150      * @param lowResourcesConnections the number of connections
151      * @see #setLowResourcesMaxIdleTime(long)
152      */
153     public void setLowResourcesConnections(long lowResourcesConnections)
154     {
155         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
156     }
157 
158     /* ------------------------------------------------------------ */
159     /**
160      * @return the lowResourcesMaxIdleTime
161      */
162     public long getLowResourcesMaxIdleTime()
163     {
164         return _lowResourcesMaxIdleTime;
165     }
166 
167     /* ------------------------------------------------------------ */
168     /**
169      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
170      * @see #setMaxIdleTime(long)
171      */
172     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
173     {
174         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
175     }
176     
177     /* ------------------------------------------------------------ */
178     /**
179      * @param acceptorID
180      * @throws IOException
181      */
182     public void doSelect(int acceptorID) throws IOException
183     {
184         SelectSet[] sets= _selectSet;
185         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
186             sets[acceptorID].doSelect();
187     }
188 
189     /* ------------------------------------------------------------ */
190     /**
191      * @param key the selection key
192      * @return the SocketChannel created on accept
193      * @throws IOException 
194      */
195     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
196 
197     /* ------------------------------------------------------------------------------- */
198     public abstract boolean dispatch(Runnable task);
199 
200     /* ------------------------------------------------------------ */
201     /* (non-Javadoc)
202      * @see org.eclipse.component.AbstractLifeCycle#doStart()
203      */
204     @Override
205     protected void doStart() throws Exception
206     {
207         _selectSet = new SelectSet[_selectSets];
208         for (int i=0;i<_selectSet.length;i++)
209             _selectSet[i]= new SelectSet(i);
210 
211         super.doStart();
212     }
213 
214 
215     /* ------------------------------------------------------------------------------- */
216     @Override
217     protected void doStop() throws Exception
218     {
219         SelectSet[] sets= _selectSet;
220         _selectSet=null;
221         if (sets!=null)
222         {
223             for (SelectSet set : sets)
224             {
225                 if (set!=null)
226                     set.stop();
227             }
228         }
229         super.doStop();
230     }
231 
232     /* ------------------------------------------------------------ */
233     /**
234      * @param endpoint
235      */
236     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
237 
238     /* ------------------------------------------------------------ */
239     /**
240      * @param endpoint
241      */
242     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
243 
244     /* ------------------------------------------------------------ */
245     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
246 
247     /* ------------------------------------------------------------------------------- */
248     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
249 
250     /* ------------------------------------------------------------ */
251     /**
252      * Create a new end point
253      * @param channel
254      * @param selectSet
255      * @param sKey the selection key
256      * @return the new endpoint {@link SelectChannelEndPoint}
257      * @throws IOException
258      */
259     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
260 
261     /* ------------------------------------------------------------------------------- */
262     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
263     {
264         Log.warn(ex+","+channel+","+attachment);
265         Log.debug(ex);
266     }
267 
268     /* ------------------------------------------------------------------------------- */
269     public void dump()
270     {
271         for (final SelectSet set :_selectSet)
272         {
273             Thread selecting = set._selecting;
274             Log.info("SelectSet "+set._setID+" : "+selecting);
275             if (selecting!=null)
276             {
277                 StackTraceElement[] trace =selecting.getStackTrace();
278                 if (trace!=null)
279                 {
280                     for (StackTraceElement e : trace)
281                     {
282                         Log.info("\tat "+e.toString());
283                     }
284                 }
285             }
286                 
287             set.addChange(new ChangeTask(){
288                 public void run()
289                 {
290                     set.dump();
291                 }
292             });
293         }
294     }
295     
296     
297     /* ------------------------------------------------------------------------------- */
298     /* ------------------------------------------------------------------------------- */
299     /* ------------------------------------------------------------------------------- */
300     public class SelectSet 
301     {
302         private final int _setID;
303         private final Timeout _timeout;
304         
305         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
306         
307         private Selector _selector;
308         
309         private int _nextSet;
310         private volatile Thread _selecting;
311         private int _jvmBug;
312         private int _selects;
313         private long _monitorStart;
314         private long _monitorNext;
315         private boolean _pausing;
316         private SelectionKey _busyKey;
317         private int _busyKeyCount;
318         private long _log;
319         private int _paused;
320         private int _jvmFix0;
321         private int _jvmFix1;
322         private int _jvmFix2;
323         private volatile long _idleTick;
324         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
325         
326         /* ------------------------------------------------------------ */
327         SelectSet(int acceptorID) throws Exception
328         {
329             _setID=acceptorID;
330 
331             _idleTick = System.currentTimeMillis();
332             _timeout = new Timeout(this);
333             _timeout.setDuration(0L);
334 
335             // create a selector;
336             _selector = Selector.open();
337             _monitorStart=System.currentTimeMillis();
338             _monitorNext=_monitorStart+__MONITOR_PERIOD;
339             _log=_monitorStart+60000;
340         }
341         
342         /* ------------------------------------------------------------ */
343         public void addChange(Object point)
344         {
345             _changes.add(point);
346         }
347         
348         /* ------------------------------------------------------------ */
349         public void addChange(SelectableChannel channel, Object att)
350         {   
351             if (att==null)
352                 addChange(channel);
353             else if (att instanceof EndPoint)
354                 addChange(att);
355             else
356                 addChange(new ChangeSelectableChannel(channel,att));
357         }
358         
359         /* ------------------------------------------------------------ */
360         /**
361          * Select and dispatch tasks found from changes and the selector.
362          * 
363          * @throws IOException
364          */
365         public void doSelect() throws IOException
366         {
367             try
368             {
369                 _selecting=Thread.currentThread();
370                 final Selector selector=_selector;
371 
372                 // Make any key changes required
373                 Object change;
374                 int changes=_changes.size();
375                 while (changes-->0 && (change=_changes.poll())!=null)
376                 {
377                     try
378                     {
379                         if (change instanceof EndPoint)
380                         {
381                             // Update the operations for a key.
382                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
383                             endpoint.doUpdateKey();
384                         }
385                         else if (change instanceof Runnable)
386                         {
387                             dispatch((Runnable)change);
388                         }
389                         else if (change instanceof ChangeSelectableChannel)
390                         {
391                             // finish accepting/connecting this connection
392                             final ChangeSelectableChannel asc = (ChangeSelectableChannel)change;
393                             final SelectableChannel channel=asc._channel;
394                             final Object att = asc._attachment;
395 
396                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
397                             {
398                                 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
399                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
400                                 key.attach(endpoint);
401                                 endpoint.schedule();
402                             }
403                             else if (channel.isOpen())
404                             {
405                                 channel.register(selector,SelectionKey.OP_CONNECT,att);
406                             }
407                         }
408                         else if (change instanceof SocketChannel)
409                         {
410                             final SocketChannel channel=(SocketChannel)change;
411 
412                             if (channel.isConnected())
413                             {
414                                 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
415                                 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
416                                 key.attach(endpoint);
417                                 endpoint.schedule();
418                             }
419                             else if (channel.isOpen())
420                             {
421                                 channel.register(selector,SelectionKey.OP_CONNECT,null);
422                             }
423                         }
424                         else if (change instanceof ServerSocketChannel)
425                         {
426                             ServerSocketChannel channel = (ServerSocketChannel)change;
427                             channel.register(getSelector(),SelectionKey.OP_ACCEPT);
428                         }
429                         else if (change instanceof ChangeTask)
430                         {
431                             ((ChangeTask)change).run();
432                         }
433                         else
434                             throw new IllegalArgumentException(change.toString());
435                     }
436                     catch (Exception e)
437                     {
438                         if (isRunning())
439                             Log.warn(e);
440                         else
441                             Log.debug(e);
442                     }
443                 }
444 
445                 long retry_next;
446                 long now=System.currentTimeMillis();
447                 _timeout.setNow(now);
448 
449                 retry_next=_timeout.getTimeToNext();
450 
451                 // workout how low to wait in select
452                 long wait = 1000L;  
453                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
454                     wait = retry_next;
455     
456                 // Do the select.
457                 if (wait > 0) 
458                 {
459                     // If we are in pausing mode
460                     if (_pausing)
461                     {
462                         try
463                         {
464                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
465                         }
466                         catch(InterruptedException e)
467                         {
468                             Log.ignore(e);
469                         }
470                     }
471                         
472                     long before=now;
473                     int selected=selector.select(wait);
474                     now = System.currentTimeMillis();
475                     _timeout.setNow(now);
476                     _selects++;
477   
478                     // Look for JVM bugs over a monitor period.
479                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
480                     // http://bugs.sun.com/view_bug.do?bug_id=6693490
481                     if (now>_monitorNext)
482                     {
483                         _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
484                         _pausing=_selects>__MAX_SELECTS;
485                         if (_pausing)
486                             _paused++;
487                             
488                         _selects=0;
489                         _jvmBug=0;
490                         _monitorStart=now;
491                         _monitorNext=now+__MONITOR_PERIOD;
492                     }
493                     
494                     if (now>_log)
495                     {
496                         if (_paused>0)  
497                             Log.info(this+" Busy selector - injecting delay "+_paused+" times");
498 
499                         if (_jvmFix2>0)
500                             Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
501 
502                         if (_jvmFix1>0)
503                             Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
504 
505                         else if(Log.isDebugEnabled() && _jvmFix0>0)
506                             Log.info(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
507                         _paused=0;
508                         _jvmFix2=0;
509                         _jvmFix1=0;
510                         _jvmFix0=0;
511                         _log=now+60000;
512                     }
513                     
514                     // If we see signature of possible JVM bug, increment count.
515                     if (selected==0 && wait>10 && (now-before)<(wait/2))
516                     {
517                         // Increment bug count and try a work around
518                         _jvmBug++;
519                         if (_jvmBug>(__JVMBUG_THRESHHOLD))
520                         {
521                             try
522                             {
523                                 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
524                                     _jvmFix2++;
525                                     
526                                 Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
527                             }
528                             catch(InterruptedException e)
529                             {
530                                 Log.ignore(e);
531                             }
532                         }
533                         else if (_jvmBug==__JVMBUG_THRESHHOLD)
534                         {
535                             synchronized (this)
536                             {
537                                 // BLOODY SUN BUG !!!  Try refreshing the entire selector.
538                                 final Selector new_selector = Selector.open();
539                                 for (SelectionKey k: selector.keys())
540                                 {
541                                     if (!k.isValid() || k.interestOps()==0)
542                                         continue;
543                                     
544                                     final SelectableChannel channel = k.channel();
545                                     final Object attachment = k.attachment();
546                                     
547                                     if (attachment==null)
548                                         addChange(channel);
549                                     else
550                                         addChange(channel,attachment);
551                                 }
552                                 _selector.close();
553                                 _selector=new_selector;
554                                 return;
555                             }
556                         }
557                         else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
558                         {
559                             // Cancel keys with 0 interested ops
560                             int cancelled=0;
561                             for (SelectionKey k: selector.keys())
562                             {
563                                 if (k.isValid()&&k.interestOps()==0)
564                                 {
565                                     k.cancel();
566                                     cancelled++;
567                                 }
568                             }
569                             if (cancelled>0)
570                                 _jvmFix0++;
571                             
572                             return;
573                         }
574                     }
575                     else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
576                     {
577                         // Look for busy key
578                         SelectionKey busy = selector.selectedKeys().iterator().next();
579                         if (busy==_busyKey)
580                         {
581                             if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
582                             {
583                                 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
584                                 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
585                                 busy.cancel();
586                                 if (endpoint!=null)
587                                 {
588                                     dispatch(new Runnable()
589                                     {
590                                         public void run()
591                                         {
592                                             try
593                                             {
594                                                 endpoint.close();
595                                             }
596                                             catch (IOException e)
597                                             {
598                                                 Log.ignore(e);
599                                             }
600                                         }
601                                     });
602                                 }
603                             }
604                         }
605                         else
606                             _busyKeyCount=0;
607                         _busyKey=busy;
608                     }
609                 }
610                 else 
611                 {
612                     selector.selectNow();
613                     _selects++;
614                 }
615 
616                 // have we been destroyed while sleeping
617                 if (_selector==null || !selector.isOpen())
618                     return;
619 
620                 // Look for things to do
621                 for (SelectionKey key: selector.selectedKeys())
622                 {   
623                     try
624                     {
625                         if (!key.isValid())
626                         {
627                             key.cancel();
628                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
629                             if (endpoint != null)
630                                 endpoint.doUpdateKey();
631                             continue;
632                         }
633 
634                         Object att = key.attachment();
635                         if (att instanceof SelectChannelEndPoint)
636                         {
637                             ((SelectChannelEndPoint)att).schedule();
638                         }
639                         else if (key.isAcceptable())
640                         {
641                             SocketChannel channel = acceptChannel(key);
642                             if (channel==null)
643                                 continue;
644 
645                             channel.configureBlocking(false);
646 
647                             // TODO make it reluctant to leave 0
648                             _nextSet=++_nextSet%_selectSet.length;
649 
650                             // Is this for this selectset
651                             if (_nextSet==_setID)
652                             {
653                                 // bind connections to this select set.
654                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
655                                 
656                                 SelectChannelEndPoint endpoint=_selectSet[_nextSet].createEndPoint(channel,cKey);
657                                 cKey.attach(endpoint);
658                                 if (endpoint != null)
659                                     endpoint.schedule();
660                             }
661                             else
662                             {
663                                 // nope - give it to another.
664                                 _selectSet[_nextSet].addChange(channel);
665                                 _selectSet[_nextSet].wakeup();
666                             }
667                         }
668                         else if (key.isConnectable())
669                         {
670                             // Complete a connection of a registered channel
671                             SocketChannel channel = (SocketChannel)key.channel();
672                             boolean connected=false;
673                             try
674                             {
675                                 connected=channel.finishConnect();
676                             }
677                             catch(Exception e)
678                             {
679                                 connectionFailed(channel,e,att);
680                             }
681                             finally
682                             {
683                                 if (connected)
684                                 {
685                                     key.interestOps(SelectionKey.OP_READ);
686                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
687                                     key.attach(endpoint);
688                                     endpoint.schedule();
689                                 }
690                                 else
691                                 {
692                                     key.cancel();
693                                 }
694                             }
695                         }
696                         else
697                         {
698                             // Wrap readable registered channel in an endpoint
699                             SocketChannel channel = (SocketChannel)key.channel();
700                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
701                             key.attach(endpoint);
702                             if (key.isReadable())
703                                 endpoint.schedule();                           
704                         }
705                         key = null;
706                     }
707                     catch (CancelledKeyException e)
708                     {
709                         Log.ignore(e);
710                     }
711                     catch (Exception e)
712                     {
713                         if (isRunning())
714                             Log.warn(e);
715                         else
716                             Log.ignore(e);
717 
718                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
719                             key.cancel();
720                     }
721                 }
722                 
723                 // Everything always handled
724                 selector.selectedKeys().clear();
725                 
726                 _timeout.setNow(now);
727                 Task task = _timeout.expired();
728                 while (task!=null)
729                 {
730                     if (task instanceof Runnable)
731                         dispatch((Runnable)task);
732                     else
733                         task.expired();
734                         
735                     task = _timeout.expired();
736                 }
737 
738                 // Idle tick
739                 if (now-_idleTick>1000)
740                 {
741                     _idleTick=now;
742                     
743                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
744                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
745                         :now;
746                         
747                     dispatch(new Runnable()
748                     {
749                         public void run()
750                         {
751                             for (SelectChannelEndPoint endp:_endPoints.keySet())
752                             {
753                                 endp.checkIdleTimestamp(idle_now);
754                             }
755                         }
756                     });
757                 }
758             }
759             catch (CancelledKeyException e)
760             {
761                 Log.ignore(e);
762             }
763             finally
764             {
765                 _selecting=null;
766             }
767         }
768 
769         /* ------------------------------------------------------------ */
770         public SelectorManager getManager()
771         {
772             return SelectorManager.this;
773         }
774 
775         /* ------------------------------------------------------------ */
776         public long getNow()
777         {
778             return _timeout.getNow();
779         }
780 
781         /* ------------------------------------------------------------ */
782         /**
783          * @param task The task to timeout. If it implements Runnable, then 
784          * expired will be called from a dispatched thread.
785          * 
786          * @param timeoutMs
787          */
788         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
789         {
790             _timeout.schedule(task, timeoutMs);
791         }
792         
793         /* ------------------------------------------------------------ */
794         public void cancelTimeout(Timeout.Task task)
795         {
796             task.cancel();
797         }
798 
799         /* ------------------------------------------------------------ */
800         public void wakeup()
801         {
802             Selector selector = _selector;
803             if (selector!=null)
804                 selector.wakeup();
805         }
806         
807         /* ------------------------------------------------------------ */
808         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
809         {
810             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
811             endPointOpened(endp); 
812             _endPoints.put(endp,this);
813             return endp;
814         }
815         
816         /* ------------------------------------------------------------ */
817         public void destroyEndPoint(SelectChannelEndPoint endp)
818         {
819             _endPoints.remove(endp);
820             endPointClosed(endp);
821         }
822 
823         /* ------------------------------------------------------------ */
824         Selector getSelector()
825         {
826             return _selector;
827         }
828         
829         /* ------------------------------------------------------------ */
830         void stop() throws Exception
831         {
832             boolean selecting=true;
833             while(selecting)
834             {
835                 wakeup();
836                 selecting=_selecting!=null;
837             }
838 
839             for (SelectionKey key:_selector.keys())
840             {
841                 if (key==null)
842                     continue;
843                 Object att=key.attachment();
844                 if (att instanceof EndPoint)
845                 {
846                     EndPoint endpoint = (EndPoint)att;
847                     try
848                     {
849                         endpoint.close();
850                     }
851                     catch(IOException e)
852                     {
853                         Log.ignore(e);
854                     }
855                 }
856             }
857             
858             synchronized (this)
859             {
860                 selecting=_selecting!=null;
861                 while(selecting)
862                 {
863                     wakeup();
864                     selecting=_selecting!=null;
865                 }
866                 
867                 _timeout.cancelAll();
868                 try
869                 {
870                     if (_selector != null)
871                         _selector.close();
872                 }
873                 catch (IOException e)
874                 {
875                     Log.ignore(e);
876                 } 
877                 _selector=null;
878             }
879         }
880         
881         public void dump()
882         {
883             synchronized (System.err)
884             {
885                 Selector selector=_selector;
886                 Log.info("SelectSet "+_setID+" "+selector.keys().size());
887                 for (SelectionKey key: selector.keys())
888                 {
889                     if (key.isValid())
890                         Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment());
891                     else
892                         Log.info(key.channel()+" - - "+key.attachment());
893                 }
894             }
895         }
896     }
897 
898     /* ------------------------------------------------------------ */
899     private static class ChangeSelectableChannel
900     {
901         final SelectableChannel _channel;
902         final Object _attachment;
903         
904         public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
905         {
906             super();
907             _channel = channel;
908             _attachment = attachment;
909         }
910     }
911 
912     /* ------------------------------------------------------------ */
913     private interface ChangeTask
914     {
915         public void run();
916     }
917 }