View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.net.ConnectException;
24  import java.net.Socket;
25  import java.net.SocketAddress;
26  import java.net.SocketTimeoutException;
27  import java.nio.channels.CancelledKeyException;
28  import java.nio.channels.SelectionKey;
29  import java.nio.channels.Selector;
30  import java.nio.channels.ServerSocketChannel;
31  import java.nio.channels.SocketChannel;
32  import java.util.ArrayList;
33  import java.util.List;
34  import java.util.Queue;
35  import java.util.Set;
36  import java.util.concurrent.CountDownLatch;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import org.eclipse.jetty.util.ConcurrentArrayQueue;
43  import org.eclipse.jetty.util.TypeUtil;
44  import org.eclipse.jetty.util.component.AbstractLifeCycle;
45  import org.eclipse.jetty.util.component.ContainerLifeCycle;
46  import org.eclipse.jetty.util.component.Dumpable;
47  import org.eclipse.jetty.util.log.Log;
48  import org.eclipse.jetty.util.log.Logger;
49  import org.eclipse.jetty.util.thread.Scheduler;
50  
51  /**
52   * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
53   * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
54   * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
55   * {@link EndPoint}s and {@link Connection}s.</p>
56   */
57  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
58  {
59      public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
60      public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
61      protected static final Logger LOG = Log.getLogger(SelectorManager.class);
62      private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
63      
64      private final Executor executor;
65      private final Scheduler scheduler;
66      private final ManagedSelector[] _selectors;
67      private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
68      private long _selectorIndex;
69  
70      protected SelectorManager(Executor executor, Scheduler scheduler)
71      {
72          this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
73      }
74  
75      protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
76      {
77          if (selectors<=0)
78              throw new IllegalArgumentException("No selectors");
79          this.executor = executor;
80          this.scheduler = scheduler;
81          _selectors = new ManagedSelector[selectors];
82      }
83  
84      public Executor getExecutor()
85      {
86          return executor;
87      }
88  
89      public Scheduler getScheduler()
90      {
91          return scheduler;
92      }
93  
94      /**
95       * Get the connect timeout
96       *
97       * @return the connect timeout (in milliseconds)
98       */
99      public long getConnectTimeout()
100     {
101         return _connectTimeout;
102     }
103 
104     /**
105      * Set the connect timeout (in milliseconds)
106      *
107      * @param milliseconds the number of milliseconds for the timeout
108      */
109     public void setConnectTimeout(long milliseconds)
110     {
111         _connectTimeout = milliseconds;
112     }
113 
114     /**
115      * Executes the given task in a different thread.
116      *
117      * @param task the task to execute
118      */
119     protected void execute(Runnable task)
120     {
121         executor.execute(task);
122     }
123 
124     /**
125      * @return the number of selectors in use
126      */
127     public int getSelectorCount()
128     {
129         return _selectors.length;
130     }
131 
132     private ManagedSelector chooseSelector()
133     {
134         // The ++ increment here is not atomic, but it does not matter,
135         // so long as the value changes sometimes, then connections will
136         // be distributed over the available selectors.
137         long s = _selectorIndex++;
138         int index = (int)(s % getSelectorCount());
139         return _selectors[index];
140     }
141 
142     /**
143      * <p>Registers a channel to perform a non-blocking connect.</p>
144      * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
145      * must be called prior to calling this method.</p>
146      *
147      * @param channel    the channel to register
148      * @param attachment the attachment object
149      */
150     public void connect(SocketChannel channel, Object attachment)
151     {
152         ManagedSelector set = chooseSelector();
153         set.submit(set.new Connect(channel, attachment));
154     }
155 
156     /**
157      * <p>Registers a channel to perform non-blocking read/write operations.</p>
158      * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
159      * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
160      *
161      * @param channel the channel to register
162      */
163     public void accept(final SocketChannel channel)
164     {
165         final ManagedSelector selector = chooseSelector();
166         selector.submit(selector.new Accept(channel));
167     }
168     
169     /**
170      * <p>Registers a server channel for accept operations.
171      * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
172      * then the {@link #accepted(SocketChannel)} method is called, which must be
173      * overridden by a derivation of this class to handle the accepted channel
174      * 
175      * @param server the server channel to register
176      */
177     public void acceptor(final ServerSocketChannel server)
178     {
179         final ManagedSelector selector = chooseSelector();
180         selector.submit(selector.new Acceptor(server));
181     }
182     
183     /**
184      * Callback method when a channel is accepted from the {@link ServerSocketChannel}
185      * passed to {@link #acceptor(ServerSocketChannel)}.
186      * The default impl throws an {@link UnsupportedOperationException}, so it must
187      * be overridden by subclasses if a server channel is provided.
188      *
189      * @param channel the
190      * @throws IOException
191      */
192     protected void accepted(SocketChannel channel) throws IOException
193     {
194         throw new UnsupportedOperationException();
195     }
196 
197     @Override
198     protected void doStart() throws Exception
199     {
200         super.doStart();
201         for (int i = 0; i < _selectors.length; i++)
202         {
203             ManagedSelector selector = newSelector(i);
204             _selectors[i] = selector;
205             selector.start();
206             execute(selector);
207         }
208     }
209 
210     /**
211      * <p>Factory method for {@link ManagedSelector}.</p>
212      *
213      * @param id an identifier for the {@link ManagedSelector to create}
214      * @return a new {@link ManagedSelector}
215      */
216     protected ManagedSelector newSelector(int id)
217     {
218         return new ManagedSelector(id);
219     }
220 
221     @Override
222     protected void doStop() throws Exception
223     {
224         for (ManagedSelector selector : _selectors)
225             selector.stop();
226         super.doStop();
227     }
228 
229     /**
230      * <p>Callback method invoked when an endpoint is opened.</p>
231      *
232      * @param endpoint the endpoint being opened
233      */
234     protected void endPointOpened(EndPoint endpoint)
235     {
236         endpoint.onOpen();
237     }
238 
239     /**
240      * <p>Callback method invoked when an endpoint is closed.</p>
241      *
242      * @param endpoint the endpoint being closed
243      */
244     protected void endPointClosed(EndPoint endpoint)
245     {
246         endpoint.onClose();
247     }
248 
249     /**
250      * <p>Callback method invoked when a connection is opened.</p>
251      *
252      * @param connection the connection just opened
253      */
254     public void connectionOpened(Connection connection)
255     {
256         try
257         {
258             connection.onOpen();
259         }
260         catch (Throwable x)
261         {
262             if (isRunning())
263                 LOG.warn("Exception while notifying connection " + connection, x);
264             else
265                 LOG.debug("Exception while notifying connection {}",connection, x);
266         }
267     }
268 
269     /**
270      * <p>Callback method invoked when a connection is closed.</p>
271      *
272      * @param connection the connection just closed
273      */
274     public void connectionClosed(Connection connection)
275     {
276         try
277         {
278             connection.onClose();
279         }
280         catch (Throwable x)
281         {
282             LOG.debug("Exception while notifying connection " + connection, x);
283         }
284     }
285 
286     protected boolean finishConnect(SocketChannel channel) throws IOException
287     {
288         return channel.finishConnect();
289     }
290 
291     /**
292      * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
293      * <p>By default it just logs with level warning.</p>
294      *
295      * @param channel the channel that attempted the connect
296      * @param ex the exception that caused the connect to fail
297      * @param attachment the attachment object associated at registration
298      */
299     protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
300     {
301         LOG.warn(String.format("%s - %s", channel, attachment), ex);
302     }
303 
304     /**
305      * <p>Factory method to create {@link EndPoint}.</p>
306      * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
307      * or {@link #accept(SocketChannel)}.</p>
308      *
309      * @param channel   the channel associated to the endpoint
310      * @param selector the selector the channel is registered to
311      * @param selectionKey      the selection key
312      * @return a new endpoint
313      * @throws IOException if the endPoint cannot be created
314      * @see #newConnection(SocketChannel, EndPoint, Object)
315      */
316     protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
317 
318     /**
319      * <p>Factory method to create {@link Connection}.</p>
320      *
321      * @param channel    the channel associated to the connection
322      * @param endpoint   the endpoint
323      * @param attachment the attachment
324      * @return a new connection
325      * @throws IOException
326      * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
327      */
328     public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
329 
330     @Override
331     public String dump()
332     {
333         return ContainerLifeCycle.dump(this);
334     }
335 
336     @Override
337     public void dump(Appendable out, String indent) throws IOException
338     {
339         ContainerLifeCycle.dumpObject(out, this);
340         ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
341     }
342 
343     private enum State
344     {
345         CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
346     }
347 
348     /**
349      * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
350      * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
351      * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
352      * with the channel.</p>
353      */
354     public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
355     {
356         private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
357         private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
358         private final int _id;
359         private Selector _selector;
360         private volatile Thread _thread;
361 
362         public ManagedSelector(int id)
363         {
364             _id = id;
365             setStopTimeout(5000);
366         }
367 
368         @Override
369         protected void doStart() throws Exception
370         {
371             super.doStart();
372             _selector = Selector.open();
373             _state.set(State.PROCESS);
374         }
375 
376         @Override
377         protected void doStop() throws Exception
378         {
379             LOG.debug("Stopping {}", this);
380             Stop stop = new Stop();
381             submit(stop);
382             stop.await(getStopTimeout());
383             LOG.debug("Stopped {}", this);
384         }
385 
386         /**
387          * Submit a task to update a selector key.  If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES}
388          * is set true (default is false), the task is passed to {@link #submit(Runnable)}.   Otherwise it is run immediately and the selector 
389          * woken up if need be.   
390          * @param update the update to a key
391          */
392         public void updateKey(Runnable update)
393         {
394             if (__submitKeyUpdates)
395             {
396                 submit(update);
397             }
398             else
399             {
400                 runChange(update);
401                 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
402                    wakeup();
403             }
404         }
405         
406         /**
407          * <p>Submits a change to be executed in the selector thread.</p>
408          * <p>Changes may be submitted from any thread, and the selector thread woken up
409          * (if necessary) to execute the change.</p>
410          *
411          * @param change the change to submit
412          */
413         public void submit(Runnable change)
414         {
415             // This method may be called from the selector thread, and therefore
416             // we could directly run the change without queueing, but this may
417             // lead to stack overflows on a busy server, so we always offer the
418             // change to the queue and process the state.
419 
420             _changes.offer(change);
421             LOG.debug("Queued change {}", change);
422 
423             out: while (true)
424             {
425                 switch (_state.get())
426                 {
427                     case SELECT:
428                         // Avoid multiple wakeup() calls if we the CAS fails
429                         if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
430                             continue;
431                         wakeup();
432                         break out;
433                     case CHANGES:
434                         // Tell the selector thread that we have more changes.
435                         // If we fail to CAS, we possibly need to wakeup(), so loop.
436                         if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
437                             break out;
438                         continue;
439                     case WAKEUP:
440                         // Do nothing, we have already a wakeup scheduled
441                         break out;
442                     case MORE_CHANGES:
443                         // Do nothing, we already notified the selector thread of more changes
444                         break out;
445                     case PROCESS:
446                         // Do nothing, the changes will be run after the processing
447                         break out;
448                     default:
449                         throw new IllegalStateException();
450                 }
451             }
452         }
453 
454         private void runChanges()
455         {
456             Runnable change;
457             while ((change = _changes.poll()) != null)
458                 runChange(change);
459         }
460 
461         protected void runChange(Runnable change)
462         {
463             try
464             {
465                 LOG.debug("Running change {}", change);
466                 change.run();
467             }
468             catch (Throwable x)
469             {
470                 LOG.debug("Could not run change " + change, x);
471             }
472         }
473 
474         @Override
475         public void run()
476         {
477             _thread = Thread.currentThread();
478             String name = _thread.getName();
479             try
480             {
481                 _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id);
482                 LOG.debug("Starting {} on {}", _thread, this);
483                 while (isRunning())
484                     select();
485                 runChanges();
486             }
487             finally
488             {
489                 LOG.debug("Stopped {} on {}", _thread, this);
490                 _thread.setName(name);
491             }
492         }
493 
494         /**
495          * <p>Process changes and waits on {@link Selector#select()}.</p>
496          *
497          * @see #submit(Runnable)
498          */
499         public void select()
500         {
501             boolean debug = LOG.isDebugEnabled();
502             try
503             {
504                 _state.set(State.CHANGES);
505 
506                 // Run the changes, and only exit if we ran all changes
507                 out: while(true)
508                 {
509                     switch (_state.get())
510                     {
511                         case CHANGES:
512                             runChanges();
513                             if (_state.compareAndSet(State.CHANGES, State.SELECT))
514                                 break out;
515                             continue;
516                         case MORE_CHANGES:
517                             runChanges();
518                             _state.set(State.CHANGES);
519                             continue;
520                         default:
521                             throw new IllegalStateException();    
522                     }
523                 }
524                 // Must check first for SELECT and *then* for WAKEUP
525                 // because we read the state twice in the assert, and
526                 // it could change from SELECT to WAKEUP in between.
527                 assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
528 
529                 if (debug)
530                     LOG.debug("Selector loop waiting on select");
531                 int selected = _selector.select();
532                 if (debug)
533                     LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
534 
535                 _state.set(State.PROCESS);
536 
537                 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
538                 for (SelectionKey key : selectedKeys)
539                 {
540                     if (key.isValid())
541                     {
542                         processKey(key);
543                     }
544                     else
545                     {
546                         if (debug)
547                             LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
548                         Object attachment = key.attachment();
549                         if (attachment instanceof EndPoint)
550                             ((EndPoint)attachment).close();
551                     }
552                 }
553                 selectedKeys.clear();
554             }
555             catch (Throwable x)
556             {
557                 if (isRunning())
558                     LOG.warn(x);
559                 else
560                     LOG.ignore(x);
561             }
562         }
563 
564         private void processKey(SelectionKey key)
565         {
566             Object attachment = key.attachment();
567             try
568             {
569                 if (attachment instanceof SelectableEndPoint)
570                 {
571                     ((SelectableEndPoint)attachment).onSelected();
572                 }
573                 else if (key.isConnectable())
574                 {
575                     processConnect(key, (Connect)attachment);
576                 }
577                 else if (key.isAcceptable())
578                 {
579                     processAccept(key);
580                 }
581                 else
582                 {
583                     throw new IllegalStateException();
584                 }
585             }
586             catch (CancelledKeyException x)
587             {
588                 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
589                 if (attachment instanceof EndPoint)
590                     closeNoExceptions((EndPoint)attachment);
591             }
592             catch (Throwable x)
593             {
594                 LOG.warn("Could not process key for channel " + key.channel(), x);
595                 if (attachment instanceof EndPoint)
596                     closeNoExceptions((EndPoint)attachment);
597             }
598         }
599 
600         private void processConnect(SelectionKey key, Connect connect)
601         {
602             SocketChannel channel = (SocketChannel)key.channel();
603             try
604             {
605                 key.attach(connect.attachment);
606                 boolean connected = finishConnect(channel);
607                 if (connected)
608                 {
609                     connect.timeout.cancel();
610                     key.interestOps(0);
611                     EndPoint endpoint = createEndPoint(channel, key);
612                     key.attach(endpoint);
613                 }
614                 else
615                 {
616                     throw new ConnectException();
617                 }
618             }
619             catch (Throwable x)
620             {
621                 connect.failed(x);
622             }
623         }
624         
625         private void processAccept(SelectionKey key)
626         {
627             ServerSocketChannel server = (ServerSocketChannel)key.channel();
628             SocketChannel channel = null;
629             try
630             {
631                 while ((channel = server.accept()) != null)
632                 {
633                     accepted(channel);
634                 }
635             }
636             catch (Throwable x)
637             {
638                 closeNoExceptions(channel);
639                 LOG.warn("Accept failed for channel " + channel, x);
640             }
641         }
642 
643         private void closeNoExceptions(Closeable closeable)
644         {
645             try
646             {
647                 if (closeable != null)
648                     closeable.close();
649             }
650             catch (Throwable x)
651             {
652                 LOG.ignore(x);
653             }
654         }
655 
656         public void wakeup()
657         {
658             _selector.wakeup();
659         }
660 
661         public boolean isSelectorThread()
662         {
663             return Thread.currentThread() == _thread;
664         }
665 
666         private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
667         {
668             EndPoint endPoint = newEndPoint(channel, this, selectionKey);
669             endPointOpened(endPoint);
670             Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
671             endPoint.setConnection(connection);
672             connectionOpened(connection);
673             LOG.debug("Created {}", endPoint);
674             return endPoint;
675         }
676 
677         public void destroyEndPoint(EndPoint endPoint)
678         {
679             LOG.debug("Destroyed {}", endPoint);
680             Connection connection = endPoint.getConnection();
681             if (connection != null)
682                 connectionClosed(connection);
683             endPointClosed(endPoint);
684         }
685 
686         @Override
687         public String dump()
688         {
689             return ContainerLifeCycle.dump(this);
690         }
691 
692         @Override
693         public void dump(Appendable out, String indent) throws IOException
694         {
695             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
696 
697             Thread selecting = _thread;
698 
699             Object where = "not selecting";
700             StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
701             if (trace != null)
702             {
703                 for (StackTraceElement t : trace)
704                     if (t.getClassName().startsWith("org.eclipse.jetty."))
705                     {
706                         where = t;
707                         break;
708                     }
709             }
710 
711             Selector selector = _selector;
712             if (selector != null && selector.isOpen())
713             {
714                 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
715                 dump.add(where);
716 
717                 DumpKeys dumpKeys = new DumpKeys(dump);
718                 submit(dumpKeys);
719                 dumpKeys.await(5, TimeUnit.SECONDS);
720 
721                 ContainerLifeCycle.dump(out, indent, dump);
722             }
723         }
724 
725         public void dumpKeysState(List<Object> dumps)
726         {
727             Selector selector = _selector;
728             Set<SelectionKey> keys = selector.keys();
729             dumps.add(selector + " keys=" + keys.size());
730             for (SelectionKey key : keys)
731             {
732                 if (key.isValid())
733                     dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
734                 else
735                     dumps.add(key.attachment() + " iOps=-1 rOps=-1");
736             }
737         }
738 
739         @Override
740         public String toString()
741         {
742             Selector selector = _selector;
743             return String.format("%s keys=%d selected=%d",
744                     super.toString(),
745                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
746                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
747         }
748 
749         private class DumpKeys implements Runnable
750         {
751             private final CountDownLatch latch = new CountDownLatch(1);
752             private final List<Object> _dumps;
753 
754             private DumpKeys(List<Object> dumps)
755             {
756                 this._dumps = dumps;
757             }
758 
759             @Override
760             public void run()
761             {
762                 dumpKeysState(_dumps);
763                 latch.countDown();
764             }
765 
766             public boolean await(long timeout, TimeUnit unit)
767             {
768                 try
769                 {
770                     return latch.await(timeout, unit);
771                 }
772                 catch (InterruptedException x)
773                 {
774                     return false;
775                 }
776             }
777         }
778 
779         private class Acceptor implements Runnable
780         {
781             private final ServerSocketChannel _channel;
782 
783             public Acceptor(ServerSocketChannel channel)
784             {
785                 this._channel = channel;
786             }
787 
788             @Override
789             public void run()
790             {
791                 try
792                 {
793                     SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
794                     LOG.debug("{} acceptor={}", this, key);
795                 }
796                 catch (Throwable x)
797                 {
798                     closeNoExceptions(_channel);
799                     LOG.warn(x);
800                 }
801             }
802         }
803 
804         private class Accept implements Runnable
805         {
806             private final SocketChannel _channel;
807 
808             public Accept(SocketChannel channel)
809             {
810                 this._channel = channel;
811             }
812 
813             @Override
814             public void run()
815             {
816                 try
817                 {
818                     SelectionKey key = _channel.register(_selector, 0, null);
819                     EndPoint endpoint = createEndPoint(_channel, key);
820                     key.attach(endpoint);
821                 }
822                 catch (Throwable x)
823                 {
824                     closeNoExceptions(_channel);
825                     LOG.debug(x);
826                 }
827             }
828         }
829 
830         private class Connect implements Runnable
831         {
832             private final AtomicBoolean failed = new AtomicBoolean();
833             private final SocketChannel channel;
834             private final Object attachment;
835             private final Scheduler.Task timeout;
836 
837             public Connect(SocketChannel channel, Object attachment)
838             {
839                 this.channel = channel;
840                 this.attachment = attachment;
841                 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
842             }
843 
844             @Override
845             public void run()
846             {
847                 try
848                 {
849                     channel.register(_selector, SelectionKey.OP_CONNECT, this);
850                 }
851                 catch (Throwable x)
852                 {
853                     failed(x);
854                 }
855             }
856 
857             protected void failed(Throwable failure)
858             {
859                 if (failed.compareAndSet(false, true))
860                 {
861                     timeout.cancel();
862                     closeNoExceptions(channel);
863                     connectionFailed(channel, failure, attachment);
864                 }
865             }
866         }
867 
868         private class ConnectTimeout implements Runnable
869         {
870             private final Connect connect;
871 
872             private ConnectTimeout(Connect connect)
873             {
874                 this.connect = connect;
875             }
876 
877             @Override
878             public void run()
879             {
880                 SocketChannel channel = connect.channel;
881                 if (channel.isConnectionPending())
882                 {
883                     LOG.debug("Channel {} timed out while connecting, closing it", channel);
884                     connect.failed(new SocketTimeoutException());
885                 }
886             }
887         }
888 
889         private class Stop implements Runnable
890         {
891             private final CountDownLatch latch = new CountDownLatch(1);
892 
893             @Override
894             public void run()
895             {
896                 try
897                 {
898                     for (SelectionKey key : _selector.keys())
899                     {
900                         Object attachment = key.attachment();
901                         if (attachment instanceof EndPoint)
902                         {
903                             EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
904                             execute(closer);
905                             // We are closing the SelectorManager, so we want to block the
906                             // selector thread here until we have closed all EndPoints.
907                             // This is different than calling close() directly, because close()
908                             // can wait forever, while here we are limited by the stop timeout.
909                             closer.await(getStopTimeout());
910                         }
911                     }
912 
913                     closeNoExceptions(_selector);
914                 }
915                 finally
916                 {
917                     latch.countDown();
918                 }
919             }
920 
921             public boolean await(long timeout)
922             {
923                 try
924                 {
925                     return latch.await(timeout, TimeUnit.MILLISECONDS);
926                 }
927                 catch (InterruptedException x)
928                 {
929                     return false;
930                 }
931             }
932         }
933 
934         private class EndPointCloser implements Runnable
935         {
936             private final CountDownLatch latch = new CountDownLatch(1);
937             private final EndPoint endPoint;
938 
939             private EndPointCloser(EndPoint endPoint)
940             {
941                 this.endPoint = endPoint;
942             }
943 
944             @Override
945             public void run()
946             {
947                 try
948                 {
949                     closeNoExceptions(endPoint.getConnection());
950                 }
951                 finally
952                 {
953                     latch.countDown();
954                 }
955             }
956 
957             private boolean await(long timeout)
958             {
959                 try
960                 {
961                     return latch.await(timeout, TimeUnit.MILLISECONDS);
962                 }
963                 catch (InterruptedException x)
964                 {
965                     return false;
966                 }
967             }
968         }
969     }
970 
971     /**
972      * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
973      * non-blocking events by the {@link ManagedSelector}.
974      */
975     public interface SelectableEndPoint extends EndPoint
976     {
977         /**
978          * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
979          * for this endpoint.</p>
980          */
981         void onSelected();
982     }
983 }