View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.CancelledKeyException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.Selector;
21  import java.nio.channels.ServerSocketChannel;
22  import java.nio.channels.SocketChannel;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.eclipse.jetty.io.ConnectedEndPoint;
27  import org.eclipse.jetty.io.Connection;
28  import org.eclipse.jetty.io.EndPoint;
29  import org.eclipse.jetty.util.component.AbstractLifeCycle;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.thread.Timeout;
32  
33  
34  /* ------------------------------------------------------------ */
35  /**
36   * The Selector Manager manages and number of SelectSets to allow
37   * NIO scheduling to scale to large numbers of connections.
38   * 
39   * 
40   *
41   */
42  public abstract class SelectorManager extends AbstractLifeCycle
43  {
44      // TODO Tune these by approx system speed.
45      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
46      private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
47      private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
48      private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
49      private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
50      
51      private long _maxIdleTime;
52      private long _lowResourcesConnections;
53      private long _lowResourcesMaxIdleTime;
54      private transient SelectSet[] _selectSet;
55      private int _selectSets=1;
56      private volatile int _set;
57      
58      /* ------------------------------------------------------------ */
59      /**
60       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
61       * @see {@link #setLowResourcesMaxIdleTime(long)}
62       */
63      public void setMaxIdleTime(long maxIdleTime)
64      {
65          _maxIdleTime=maxIdleTime;
66      }
67      
68      /* ------------------------------------------------------------ */
69      /**
70       * @param selectSets number of select sets to create
71       */
72      public void setSelectSets(int selectSets)
73      {
74          long lrc = _lowResourcesConnections * _selectSets; 
75          _selectSets=selectSets;
76          _lowResourcesConnections=lrc/_selectSets;
77      }
78      
79      /* ------------------------------------------------------------ */
80      /**
81       * @return
82       */
83      public long getMaxIdleTime()
84      {
85          return _maxIdleTime;
86      }
87      
88      /* ------------------------------------------------------------ */
89      /**
90       * @return
91       */
92      public int getSelectSets()
93      {
94          return _selectSets;
95      }
96      
97      /* ------------------------------------------------------------ */
98      /** Register a channel
99       * @param channel
100      * @param att Attached Object
101      * @throws IOException
102      */
103     public void register(SocketChannel channel, Object att)
104     {
105         int s=_set++; 
106         s=s%_selectSets;
107         SelectSet[] sets=_selectSet;
108         if (sets!=null)
109         {
110             SelectSet set=sets[s];
111             set.addChange(channel,att);
112             set.wakeup();
113         }
114     }
115     
116     /* ------------------------------------------------------------ */
117     /** Register a serverchannel
118      * @param acceptChannel
119      * @return
120      * @throws IOException
121      */
122     public void register(ServerSocketChannel acceptChannel)
123     {
124         int s=_set++; 
125         s=s%_selectSets;
126         SelectSet set=_selectSet[s];
127         set.addChange(acceptChannel);
128         set.wakeup();
129     }
130 
131     /* ------------------------------------------------------------ */
132     /**
133      * @return the lowResourcesConnections
134      */
135     public long getLowResourcesConnections()
136     {
137         return _lowResourcesConnections*_selectSets;
138     }
139 
140     /* ------------------------------------------------------------ */
141     /**
142      * Set the number of connections, which if exceeded places this manager in low resources state.
143      * This is not an exact measure as the connection count is averaged over the select sets.
144      * @param lowResourcesConnections the number of connections
145      * @see {@link #setLowResourcesMaxIdleTime(long)}
146      */
147     public void setLowResourcesConnections(long lowResourcesConnections)
148     {
149         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
150     }
151 
152     /* ------------------------------------------------------------ */
153     /**
154      * @return the lowResourcesMaxIdleTime
155      */
156     public long getLowResourcesMaxIdleTime()
157     {
158         return _lowResourcesMaxIdleTime;
159     }
160 
161     /* ------------------------------------------------------------ */
162     /**
163      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
164      * @see {@link #setMaxIdleTime(long)}
165      */
166     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
167     {
168         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
169     }
170     
171     /* ------------------------------------------------------------ */
172     /**
173      * @param acceptorID
174      * @throws IOException
175      */
176     public void doSelect(int acceptorID) throws IOException
177     {
178         SelectSet[] sets= _selectSet;
179         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
180             sets[acceptorID].doSelect();
181     }
182 
183     /* ------------------------------------------------------------ */
184     /**
185      * @param key
186      * @return
187      * @throws IOException 
188      */
189     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
190 
191     /* ------------------------------------------------------------------------------- */
192     public abstract boolean dispatch(Runnable task);
193 
194     /* ------------------------------------------------------------ */
195     /* (non-Javadoc)
196      * @see org.eclipse.component.AbstractLifeCycle#doStart()
197      */
198     @Override
199     protected void doStart() throws Exception
200     {
201         _selectSet = new SelectSet[_selectSets];
202         for (int i=0;i<_selectSet.length;i++)
203             _selectSet[i]= new SelectSet(i);
204 
205         super.doStart();
206     }
207 
208 
209     /* ------------------------------------------------------------------------------- */
210     @Override
211     protected void doStop() throws Exception
212     {
213         SelectSet[] sets= _selectSet;
214         _selectSet=null;
215         if (sets!=null)
216         {
217             for (SelectSet set : sets)
218             {
219                 if (set!=null)
220                     set.stop();
221             }
222         }
223         super.doStop();
224     }
225 
226     /* ------------------------------------------------------------ */
227     /**
228      * @param endpoint
229      */
230     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
231 
232     /* ------------------------------------------------------------ */
233     /**
234      * @param endpoint
235      */
236     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
237 
238     /* ------------------------------------------------------------ */
239     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
240 
241     /* ------------------------------------------------------------------------------- */
242     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
243 
244     /* ------------------------------------------------------------ */
245     /**
246      * @param channel
247      * @param selectSet
248      * @param sKey
249      * @return
250      * @throws IOException
251      */
252     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
253 
254     /* ------------------------------------------------------------------------------- */
255     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
256     {
257         Log.warn(ex+","+channel+","+attachment);
258         Log.debug(ex);
259     }
260 
261     /* ------------------------------------------------------------------------------- */
262     public void dump()
263     {
264         for (final SelectSet set :_selectSet)
265         {
266             Thread selecting = set._selecting;
267             Log.info("SelectSet "+set._setID+" : "+selecting);
268             if (selecting!=null)
269             {
270                 StackTraceElement[] trace =selecting.getStackTrace();
271                 if (trace!=null)
272                 {
273                     for (StackTraceElement e : trace)
274                     {
275                         Log.info("\tat "+e.toString());
276                     }
277                 }
278             }
279                 
280             set.addChange(new ChangeTask(){
281                 public void run()
282                 {
283                     set.dump();
284                 }
285             });
286         }
287     }
288     
289     
290     /* ------------------------------------------------------------------------------- */
291     /* ------------------------------------------------------------------------------- */
292     /* ------------------------------------------------------------------------------- */
293     public class SelectSet 
294     {
295         private final int _setID;
296         private final Timeout _idleTimeout;
297         private final Timeout _timeout;
298         private final List<Object>[] _changes;
299 
300         private int _change;
301         private int _nextSet;
302         private Selector _selector;
303         private volatile Thread _selecting;
304         private int _jvmBug;
305         private int _selects;
306         private long _monitorStart;
307         private long _monitorNext;
308         private boolean _pausing;
309         private SelectionKey _busyKey;
310         private int _busyKeyCount;
311         private long _log;
312         private int _paused;
313         private int _jvmFix0;
314         private int _jvmFix1;
315         private int _jvmFix2;
316         
317         /* ------------------------------------------------------------ */
318         SelectSet(int acceptorID) throws Exception
319         {
320             _setID=acceptorID;
321 
322             _idleTimeout = new Timeout(this);
323             _idleTimeout.setDuration(getMaxIdleTime());
324             _timeout = new Timeout(this);
325             _timeout.setDuration(0L);
326             _changes = new List[] {new ArrayList(),new ArrayList()};
327 
328             // create a selector;
329             _selector = Selector.open();
330             _change=0;
331             _monitorStart=System.currentTimeMillis();
332             _monitorNext=_monitorStart+__MONITOR_PERIOD;
333             _log=_monitorStart+60000;
334         }
335         
336         /* ------------------------------------------------------------ */
337         public void addChange(Object point)
338         {
339             synchronized (_changes)
340             {
341                 _changes[_change].add(point);
342             }
343         }
344         
345         /* ------------------------------------------------------------ */
346         public void addChange(SelectableChannel channel, Object att)
347         {   
348             if (att==null)
349                 addChange(channel);
350             else if (att instanceof EndPoint)
351                 addChange(att);
352             else
353                 addChange(new ChangeSelectableChannel(channel,att));
354         }
355         
356         /* ------------------------------------------------------------ */
357         public void cancelIdle(Timeout.Task task)
358         {
359             task.cancel();
360         }
361 
362         /* ------------------------------------------------------------ */
363         /**
364          * Select and dispatch tasks found from changes and the selector.
365          * 
366          * @throws IOException
367          */
368         public void doSelect() throws IOException
369         {
370             try
371             {
372                 _selecting=Thread.currentThread();
373                 List<?> changes;
374                 final Selector selector;
375                 synchronized (_changes)
376                 {
377                     changes=_changes[_change];
378                     _change=_change==0?1:0;
379                     selector=_selector;
380                 }
381 
382                 // Make any key changes required
383                 final int size=changes.size();
384                 for (int i = 0; i < size; i++)
385                 {
386                     try
387                     {
388                         Object o = changes.get(i);
389                         
390                         if (o instanceof EndPoint)
391                         {
392                             // Update the operations for a key.
393                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
394                             endpoint.doUpdateKey();
395                         }
396                         else if (o instanceof Runnable)
397                         {
398                             dispatch((Runnable)o);
399                         }
400                         else if (o instanceof ChangeSelectableChannel)
401                         {
402                             // finish accepting/connecting this connection
403                             final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
404                             final SelectableChannel channel=asc._channel;
405                             final Object att = asc._attachment;
406 
407                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
408                             {
409                                 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
410                                 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
411                                 key.attach(endpoint);
412                                 endpoint.schedule();
413                             }
414                             else if (channel.isOpen())
415                             {
416                                 channel.register(selector,SelectionKey.OP_CONNECT,att);
417                             }
418                         }
419                         else if (o instanceof SocketChannel)
420                         {
421                             final SocketChannel channel=(SocketChannel)o;
422 
423                             if (channel.isConnected())
424                             {
425                                 SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
426                                 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
427                                 key.attach(endpoint);
428                                 endpoint.schedule();
429                             }
430                             else if (channel.isOpen())
431                             {
432                                 channel.register(selector,SelectionKey.OP_CONNECT,null);
433                             }
434                         }
435                         else if (o instanceof ServerSocketChannel)
436                         {
437                             ServerSocketChannel channel = (ServerSocketChannel)o;
438                             channel.register(getSelector(),SelectionKey.OP_ACCEPT);
439                         }
440                         else if (o instanceof ChangeTask)
441                         {
442                             ((ChangeTask)o).run();
443                         }
444                         else
445                             throw new IllegalArgumentException(o.toString());
446                     }
447                     catch (Exception e)
448                     {
449                         if (isRunning())
450                             Log.warn(e);
451                         else
452                             Log.debug(e);
453                     }
454                 }
455                 changes.clear();
456 
457                 long idle_next;
458                 long retry_next;
459                 long now=System.currentTimeMillis();
460                 synchronized (this)
461                 {
462                     _idleTimeout.setNow(now);
463                     _timeout.setNow(now);
464                     
465                     if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
466                         _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
467                     else 
468                         _idleTimeout.setDuration(_maxIdleTime);
469                     idle_next=_idleTimeout.getTimeToNext();
470                     retry_next=_timeout.getTimeToNext();
471                 }
472 
473                 // workout how low to wait in select
474                 long wait = 1000L;  // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
475                 if (idle_next >= 0 && wait > idle_next)
476                     wait = idle_next;
477                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
478                     wait = retry_next;
479     
480                 // Do the select.
481                 if (wait > 0) 
482                 {
483                     // If we are in pausing mode
484                     if (_pausing)
485                     {
486                         try
487                         {
488                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
489                         }
490                         catch(InterruptedException e)
491                         {
492                             Log.ignore(e);
493                         }
494                     }
495                         
496                     long before=now;
497                     int selected=selector.select(wait);
498                     now = System.currentTimeMillis();
499                     _idleTimeout.setNow(now);
500                     _timeout.setNow(now);
501                     _selects++;
502   
503                     // Look for JVM bugs over a monitor period.
504                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
505                     // http://bugs.sun.com/view_bug.do?bug_id=6693490
506                     if (now>_monitorNext)
507                     {
508                         _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
509                         _pausing=_selects>__MAX_SELECTS;
510                         if (_pausing)
511                             _paused++;
512                             
513                         _selects=0;
514                         _jvmBug=0;
515                         _monitorStart=now;
516                         _monitorNext=now+__MONITOR_PERIOD;
517                     }
518                     
519                     if (now>_log)
520                     {
521                         if (_paused>0)  
522                             Log.info(this+" Busy selector - injecting delay "+_paused+" times");
523 
524                         if (_jvmFix2>0)
525                             Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
526 
527                         if (_jvmFix1>0)
528                             Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, canceled keys "+_jvmFix0+" times");
529 
530                         else if(Log.isDebugEnabled() && _jvmFix0>0)
531                             Log.info(this+" JVM BUG(s) - canceled keys "+_jvmFix0+" times");
532                         _paused=0;
533                         _jvmFix2=0;
534                         _jvmFix1=0;
535                         _jvmFix0=0;
536                         _log=now+60000;
537                     }
538                     
539                     // If we see signature of possible JVM bug, increment count.
540                     if (selected==0 && wait>10 && (now-before)<(wait/2))
541                     {
542                         // Increment bug count and try a work around
543                         _jvmBug++;
544                         if (_jvmBug>(__JVMBUG_THRESHHOLD))
545                         {
546                             try
547                             {
548                                 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
549                                     _jvmFix2++;
550                                     
551                                 Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
552                             }
553                             catch(InterruptedException e)
554                             {
555                                 Log.ignore(e);
556                             }
557                         }
558                         else if (_jvmBug==__JVMBUG_THRESHHOLD)
559                         {
560                             synchronized (this)
561                             {
562                                 // BLOODY SUN BUG !!!  Try refreshing the entire selector.
563                                 final Selector new_selector = Selector.open();
564                                 for (SelectionKey k: selector.keys())
565                                 {
566                                     if (!k.isValid() || k.interestOps()==0)
567                                         continue;
568                                     
569                                     final SelectableChannel channel = k.channel();
570                                     final Object attachment = k.attachment();
571                                     
572                                     if (attachment==null)
573                                         addChange(channel);
574                                     else
575                                         addChange(channel,attachment);
576                                 }
577                                 _selector.close();
578                                 _selector=new_selector;
579                                 return;
580                             }
581                         }
582                         else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
583                         {
584                             // Cancel keys with 0 interested ops
585                             int cancelled=0;
586                             for (SelectionKey k: selector.keys())
587                             {
588                                 if (k.isValid()&&k.interestOps()==0)
589                                 {
590                                     k.cancel();
591                                     cancelled++;
592                                 }
593                             }
594                             if (cancelled>0)
595                                 _jvmFix0++;
596                             
597                             return;
598                         }
599                     }
600                     else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
601                     {
602                         // Look for busy key
603                         SelectionKey busy = selector.selectedKeys().iterator().next();
604                         if (busy==_busyKey)
605                         {
606                             if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
607                             {
608                                 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
609                                 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
610                                 busy.cancel();
611                                 if (endpoint!=null)
612                                     endpoint.close();
613                             }
614                         }
615                         else
616                             _busyKeyCount=0;
617                         _busyKey=busy;
618                     }
619                 }
620                 else 
621                 {
622                     selector.selectNow();
623                     _selects++;
624                 }
625 
626                 // have we been destroyed while sleeping
627                 if (_selector==null || !selector.isOpen())
628                     return;
629 
630                 // Look for things to do
631                 for (SelectionKey key: selector.selectedKeys())
632                 {   
633                     try
634                     {
635                         if (!key.isValid())
636                         {
637                             key.cancel();
638                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
639                             if (endpoint != null)
640                                 endpoint.doUpdateKey();
641                             continue;
642                         }
643 
644                         Object att = key.attachment();
645                         if (att instanceof SelectChannelEndPoint)
646                         {
647                             ((SelectChannelEndPoint)att).schedule();
648                         }
649                         else if (key.isAcceptable())
650                         {
651                             SocketChannel channel = acceptChannel(key);
652                             if (channel==null)
653                                 continue;
654 
655                             channel.configureBlocking(false);
656 
657                             // TODO make it reluctant to leave 0
658                             _nextSet=++_nextSet%_selectSet.length;
659 
660                             // Is this for this selectset
661                             if (_nextSet==_setID)
662                             {
663                                 // bind connections to this select set.
664                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
665                                 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
666                                 cKey.attach(endpoint);
667                                 if (endpoint != null)
668                                     endpoint.schedule();
669                             }
670                             else
671                             {
672                                 // nope - give it to another.
673                                 _selectSet[_nextSet].addChange(channel);
674                                 _selectSet[_nextSet].wakeup();
675                             }
676                         }
677                         else if (key.isConnectable())
678                         {
679                             // Complete a connection of a registered channel
680                             SocketChannel channel = (SocketChannel)key.channel();
681                             boolean connected=false;
682                             try
683                             {
684                                 connected=channel.finishConnect();
685                             }
686                             catch(Exception e)
687                             {
688                                 connectionFailed(channel,e,att);
689                             }
690                             finally
691                             {
692                                 if (connected)
693                                 {
694                                     key.interestOps(SelectionKey.OP_READ);
695                                     SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
696                                     key.attach(endpoint);
697                                     endpoint.schedule();
698                                 }
699                                 else
700                                 {
701                                     key.cancel();
702                                 }
703                             }
704                         }
705                         else
706                         {
707                             // Wrap readable registered channel in an endpoint
708                             SocketChannel channel = (SocketChannel)key.channel();
709                             SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
710                             key.attach(endpoint);
711                             if (key.isReadable())
712                                 endpoint.schedule();                           
713                         }
714                         key = null;
715                     }
716                     catch (CancelledKeyException e)
717                     {
718                         Log.ignore(e);
719                     }
720                     catch (Exception e)
721                     {
722                         if (isRunning())
723                             Log.warn(e);
724                         else
725                             Log.ignore(e);
726 
727                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
728                             key.cancel();
729                     }
730                 }
731                 
732                 // Everything always handled
733                 selector.selectedKeys().clear();
734                 
735                 // tick over the timers
736                 _idleTimeout.tick(now);
737                 _timeout.tick(now);
738             }
739             catch (CancelledKeyException e)
740             {
741                 Log.ignore(e);
742             }
743             finally
744             {
745                 _selecting=null;
746             }
747         }
748 
749         /* ------------------------------------------------------------ */
750         public SelectorManager getManager()
751         {
752             return SelectorManager.this;
753         }
754 
755         /* ------------------------------------------------------------ */
756         public long getNow()
757         {
758             return _idleTimeout.getNow();
759         }
760         
761         /* ------------------------------------------------------------ */
762         public void scheduleIdle(Timeout.Task task)
763         {
764             if (_idleTimeout.getDuration() <= 0)
765                 return;
766             _idleTimeout.schedule(task);
767         }
768 
769         /* ------------------------------------------------------------ */
770         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
771         {
772             _timeout.schedule(task, timeoutMs);
773         }
774         
775         /* ------------------------------------------------------------ */
776         public void cancelTimeout(Timeout.Task task)
777         {
778             task.cancel();
779         }
780 
781         /* ------------------------------------------------------------ */
782         public void wakeup()
783         {
784             Selector selector = _selector;
785             if (selector!=null)
786                 selector.wakeup();
787         }
788 
789         /* ------------------------------------------------------------ */
790         Selector getSelector()
791         {
792             return _selector;
793         }
794         
795         /* ------------------------------------------------------------ */
796         void stop() throws Exception
797         {
798             boolean selecting=true;
799             while(selecting)
800             {
801                 wakeup();
802                 selecting=_selecting!=null;
803             }
804 
805             for (SelectionKey key:_selector.keys())
806             {
807                 if (key==null)
808                     continue;
809                 Object att=key.attachment();
810                 if (att instanceof EndPoint)
811                 {
812                     EndPoint endpoint = (EndPoint)att;
813                     try
814                     {
815                         endpoint.close();
816                     }
817                     catch(IOException e)
818                     {
819                         Log.ignore(e);
820                     }
821                 }
822             }
823             
824             synchronized (this)
825             {
826                 selecting=_selecting!=null;
827                 while(selecting)
828                 {
829                     wakeup();
830                     selecting=_selecting!=null;
831                 }
832                 
833                 _idleTimeout.cancelAll();
834                 _timeout.cancelAll();
835                 try
836                 {
837                     if (_selector != null)
838                         _selector.close();
839                 }
840                 catch (IOException e)
841                 {
842                     Log.ignore(e);
843                 } 
844                 _selector=null;
845             }
846         }
847         
848         public void dump()
849         {
850             synchronized (System.err)
851             {
852                 Selector selector=_selector;
853                 Log.info("SelectSet "+_setID+" "+selector.keys().size());
854                 for (SelectionKey key: selector.keys())
855                 {
856                     if (key.isValid())
857                         Log.info(key.channel()+" "+key.interestOps()+" "+key.readyOps()+" "+key.attachment());
858                     else
859                         Log.info(key.channel()+" - - "+key.attachment());
860                 }
861             }
862         }
863     }
864 
865     /* ------------------------------------------------------------ */
866     private static class ChangeSelectableChannel
867     {
868         final SelectableChannel _channel;
869         final Object _attachment;
870         
871         public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
872         {
873             super();
874             _channel = channel;
875             _attachment = attachment;
876         }
877     }
878 
879     /* ------------------------------------------------------------ */
880     private interface ChangeTask
881     {
882         public void run();
883     }
884 }