View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.net.ConnectException;
24  import java.net.Socket;
25  import java.net.SocketAddress;
26  import java.net.SocketTimeoutException;
27  import java.nio.channels.CancelledKeyException;
28  import java.nio.channels.SelectionKey;
29  import java.nio.channels.Selector;
30  import java.nio.channels.ServerSocketChannel;
31  import java.nio.channels.SocketChannel;
32  import java.util.ArrayList;
33  import java.util.List;
34  import java.util.Queue;
35  import java.util.Set;
36  import java.util.concurrent.CountDownLatch;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import org.eclipse.jetty.util.ConcurrentArrayQueue;
43  import org.eclipse.jetty.util.TypeUtil;
44  import org.eclipse.jetty.util.annotation.ManagedAttribute;
45  import org.eclipse.jetty.util.component.AbstractLifeCycle;
46  import org.eclipse.jetty.util.component.ContainerLifeCycle;
47  import org.eclipse.jetty.util.component.Dumpable;
48  import org.eclipse.jetty.util.log.Log;
49  import org.eclipse.jetty.util.log.Logger;
50  import org.eclipse.jetty.util.thread.NonBlockingThread;
51  import org.eclipse.jetty.util.thread.Scheduler;
52  
53  /**
54   * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
55   * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
56   * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
57   * {@link EndPoint}s and {@link Connection}s.</p>
58   */
59  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
60  {
61      public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
62      public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
63      protected static final Logger LOG = Log.getLogger(SelectorManager.class);
64      private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
65      
66      private final Executor executor;
67      private final Scheduler scheduler;
68      private final ManagedSelector[] _selectors;
69      private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
70      private long _selectorIndex;
71      private int _priorityDelta;
72      
73      protected SelectorManager(Executor executor, Scheduler scheduler)
74      {
75          this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
76      }
77  
78      protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
79      {
80          if (selectors<=0)
81              throw new IllegalArgumentException("No selectors");
82          this.executor = executor;
83          this.scheduler = scheduler;
84          _selectors = new ManagedSelector[selectors];
85      }
86  
87      public Executor getExecutor()
88      {
89          return executor;
90      }
91  
92      public Scheduler getScheduler()
93      {
94          return scheduler;
95      }
96  
97      /**
98       * Get the connect timeout
99       *
100      * @return the connect timeout (in milliseconds)
101      */
102     public long getConnectTimeout()
103     {
104         return _connectTimeout;
105     }
106 
107     /**
108      * Set the connect timeout (in milliseconds)
109      *
110      * @param milliseconds the number of milliseconds for the timeout
111      */
112     public void setConnectTimeout(long milliseconds)
113     {
114         _connectTimeout = milliseconds;
115     }
116 
117 
118     @ManagedAttribute("The priority delta to apply to selector threads")
119     public int getSelectorPriorityDelta()
120     {
121         return _priorityDelta;
122     }
123 
124     /**
125      * Sets the selector thread priority delta to the given amount.
126      * <p>This allows the selector threads to run at a different priority.
127      * Typically this would be used to lower the priority to give preference
128      * to handling previously accepted connections rather than accepting
129      * new connections.</p>
130      *
131      * @param selectorPriorityDelta the amount to change the thread priority
132      *                              delta to (may be negative)
133      * @see Thread#getPriority()
134      */
135     public void setSelectorPriorityDelta(int selectorPriorityDelta)
136     {
137         int oldDelta = _priorityDelta;
138         _priorityDelta = selectorPriorityDelta;
139         if (oldDelta != selectorPriorityDelta && isStarted())
140         {
141             for (ManagedSelector selector : _selectors)
142             {
143                 Thread thread = selector._thread;
144                 if (thread != null)
145                 {
146                     int deltaDiff = selectorPriorityDelta - oldDelta;
147                     thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff)));
148                 }
149             }
150         }
151     }
152     
153     /**
154      * Executes the given task in a different thread.
155      *
156      * @param task the task to execute
157      */
158     protected void execute(Runnable task)
159     {
160         executor.execute(task);
161     }
162 
163     /**
164      * @return the number of selectors in use
165      */
166     public int getSelectorCount()
167     {
168         return _selectors.length;
169     }
170 
171     private ManagedSelector chooseSelector()
172     {
173         // The ++ increment here is not atomic, but it does not matter,
174         // so long as the value changes sometimes, then connections will
175         // be distributed over the available selectors.
176         long s = _selectorIndex++;
177         int index = (int)(s % getSelectorCount());
178         return _selectors[index];
179     }
180 
181     /**
182      * <p>Registers a channel to perform a non-blocking connect.</p>
183      * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)}
184      * must be called prior to calling this method, and the connect operation must not be completed
185      * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p>
186      *
187      * @param channel    the channel to register
188      * @param attachment the attachment object
189      * @see #accept(SocketChannel, Object)
190      */
191     public void connect(SocketChannel channel, Object attachment)
192     {
193         ManagedSelector set = chooseSelector();
194         set.submit(set.new Connect(channel, attachment));
195     }
196 
197     /**
198      * @see #accept(SocketChannel, Object)
199      */
200     public void accept(SocketChannel channel)
201     {
202         accept(channel, null);
203     }
204 
205     /**
206      * <p>Registers a channel to perform non-blocking read/write operations.</p>
207      * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
208      * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or
209      * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed
210      * successfully.</p>
211      *
212      * @param channel the channel to register
213      * @param attachment the attachment object
214      */
215     public void accept(SocketChannel channel, Object attachment)
216     {
217         final ManagedSelector selector = chooseSelector();
218         selector.submit(selector.new Accept(channel, attachment));
219     }
220     
221     /**
222      * <p>Registers a server channel for accept operations.
223      * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
224      * then the {@link #accepted(SocketChannel)} method is called, which must be
225      * overridden by a derivation of this class to handle the accepted channel
226      * 
227      * @param server the server channel to register
228      */
229     public void acceptor(ServerSocketChannel server)
230     {
231         final ManagedSelector selector = chooseSelector();
232         selector.submit(selector.new Acceptor(server));
233     }
234     
235     /**
236      * Callback method when a channel is accepted from the {@link ServerSocketChannel}
237      * passed to {@link #acceptor(ServerSocketChannel)}.
238      * The default impl throws an {@link UnsupportedOperationException}, so it must
239      * be overridden by subclasses if a server channel is provided.
240      *
241      * @param channel the
242      * @throws IOException
243      */
244     protected void accepted(SocketChannel channel) throws IOException
245     {
246         throw new UnsupportedOperationException();
247     }
248 
249     @Override
250     protected void doStart() throws Exception
251     {
252         super.doStart();
253         for (int i = 0; i < _selectors.length; i++)
254         {
255             ManagedSelector selector = newSelector(i);
256             _selectors[i] = selector;
257             selector.start();
258             execute(new NonBlockingThread(selector));
259         }
260     }
261 
262     /**
263      * <p>Factory method for {@link ManagedSelector}.</p>
264      *
265      * @param id an identifier for the {@link ManagedSelector to create}
266      * @return a new {@link ManagedSelector}
267      */
268     protected ManagedSelector newSelector(int id)
269     {
270         return new ManagedSelector(id);
271     }
272 
273     @Override
274     protected void doStop() throws Exception
275     {
276         for (ManagedSelector selector : _selectors)
277             selector.stop();
278         super.doStop();
279     }
280 
281     /**
282      * <p>Callback method invoked when an endpoint is opened.</p>
283      *
284      * @param endpoint the endpoint being opened
285      */
286     protected void endPointOpened(EndPoint endpoint)
287     {
288         endpoint.onOpen();
289     }
290 
291     /**
292      * <p>Callback method invoked when an endpoint is closed.</p>
293      *
294      * @param endpoint the endpoint being closed
295      */
296     protected void endPointClosed(EndPoint endpoint)
297     {
298         endpoint.onClose();
299     }
300 
301     /**
302      * <p>Callback method invoked when a connection is opened.</p>
303      *
304      * @param connection the connection just opened
305      */
306     public void connectionOpened(Connection connection)
307     {
308         try
309         {
310             connection.onOpen();
311         }
312         catch (Throwable x)
313         {
314             if (isRunning())
315                 LOG.warn("Exception while notifying connection " + connection, x);
316             else
317                 LOG.debug("Exception while notifying connection {}",connection, x);
318         }
319     }
320 
321     /**
322      * <p>Callback method invoked when a connection is closed.</p>
323      *
324      * @param connection the connection just closed
325      */
326     public void connectionClosed(Connection connection)
327     {
328         try
329         {
330             connection.onClose();
331         }
332         catch (Throwable x)
333         {
334             LOG.debug("Exception while notifying connection " + connection, x);
335         }
336     }
337 
338     protected boolean finishConnect(SocketChannel channel) throws IOException
339     {
340         return channel.finishConnect();
341     }
342 
343     /**
344      * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
345      * <p>By default it just logs with level warning.</p>
346      *
347      * @param channel the channel that attempted the connect
348      * @param ex the exception that caused the connect to fail
349      * @param attachment the attachment object associated at registration
350      */
351     protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
352     {
353         LOG.warn(String.format("%s - %s", channel, attachment), ex);
354     }
355 
356     /**
357      * <p>Factory method to create {@link EndPoint}.</p>
358      * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
359      * or {@link #accept(SocketChannel)}.</p>
360      *
361      * @param channel   the channel associated to the endpoint
362      * @param selector the selector the channel is registered to
363      * @param selectionKey      the selection key
364      * @return a new endpoint
365      * @throws IOException if the endPoint cannot be created
366      * @see #newConnection(SocketChannel, EndPoint, Object)
367      */
368     protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
369 
370     /**
371      * <p>Factory method to create {@link Connection}.</p>
372      *
373      * @param channel    the channel associated to the connection
374      * @param endpoint   the endpoint
375      * @param attachment the attachment
376      * @return a new connection
377      * @throws IOException
378      * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
379      */
380     public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
381 
382     @Override
383     public String dump()
384     {
385         return ContainerLifeCycle.dump(this);
386     }
387 
388     @Override
389     public void dump(Appendable out, String indent) throws IOException
390     {
391         ContainerLifeCycle.dumpObject(out, this);
392         ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
393     }
394 
395     private enum State
396     {
397         CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
398     }
399 
400     /**
401      * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
402      * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
403      * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
404      * with the channel.</p>
405      */
406     public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
407     {
408         private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
409         private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
410         private final int _id;
411         private Selector _selector;
412         private volatile Thread _thread;
413 
414         public ManagedSelector(int id)
415         {
416             _id = id;
417             setStopTimeout(5000);
418         }
419 
420         @Override
421         protected void doStart() throws Exception
422         {
423             super.doStart();
424             _selector = Selector.open();
425             _state.set(State.PROCESS);
426         }
427 
428         @Override
429         protected void doStop() throws Exception
430         {
431             if (LOG.isDebugEnabled())
432                 LOG.debug("Stopping {}", this);
433             Stop stop = new Stop();
434             submit(stop);
435             stop.await(getStopTimeout());
436             if (LOG.isDebugEnabled())
437                 LOG.debug("Stopped {}", this);
438         }
439 
440         /**
441          * Submit a task to update a selector key.  If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES}
442          * is set true (default is false), the task is passed to {@link #submit(Runnable)}.   Otherwise it is run immediately and the selector 
443          * woken up if need be.   
444          * @param update the update to a key
445          */
446         public void updateKey(Runnable update)
447         {
448             if (__submitKeyUpdates)
449             {
450                 submit(update);
451             }
452             else
453             {
454                 runChange(update);
455                 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
456                    wakeup();
457             }
458         }
459         
460         /**
461          * <p>Submits a change to be executed in the selector thread.</p>
462          * <p>Changes may be submitted from any thread, and the selector thread woken up
463          * (if necessary) to execute the change.</p>
464          *
465          * @param change the change to submit
466          */
467         public void submit(Runnable change)
468         {
469             // This method may be called from the selector thread, and therefore
470             // we could directly run the change without queueing, but this may
471             // lead to stack overflows on a busy server, so we always offer the
472             // change to the queue and process the state.
473 
474             _changes.offer(change);
475             if (LOG.isDebugEnabled())
476                 LOG.debug("Queued change {}", change);
477 
478             out: while (true)
479             {
480                 switch (_state.get())
481                 {
482                     case SELECT:
483                         // Avoid multiple wakeup() calls if we the CAS fails
484                         if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
485                             continue;
486                         wakeup();
487                         break out;
488                     case CHANGES:
489                         // Tell the selector thread that we have more changes.
490                         // If we fail to CAS, we possibly need to wakeup(), so loop.
491                         if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
492                             break out;
493                         continue;
494                     case WAKEUP:
495                         // Do nothing, we have already a wakeup scheduled
496                         break out;
497                     case MORE_CHANGES:
498                         // Do nothing, we already notified the selector thread of more changes
499                         break out;
500                     case PROCESS:
501                         // Do nothing, the changes will be run after the processing
502                         break out;
503                     default:
504                         throw new IllegalStateException();
505                 }
506             }
507         }
508 
509         private void runChanges()
510         {
511             Runnable change;
512             while ((change = _changes.poll()) != null)
513                 runChange(change);
514         }
515 
516         protected void runChange(Runnable change)
517         {
518             try
519             {
520                 if (LOG.isDebugEnabled())
521                     LOG.debug("Running change {}", change);
522                 change.run();
523             }
524             catch (Throwable x)
525             {
526                 LOG.debug("Could not run change " + change, x);
527             }
528         }
529 
530         @Override
531         public void run()
532         {
533             _thread = Thread.currentThread();
534             String name = _thread.getName();
535             int priority = _thread.getPriority();
536             try
537             {
538                 if (_priorityDelta != 0)
539                     _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _priorityDelta)));
540 
541                 _thread.setName(String.format("%s-selector-%s@%h/%d", name, SelectorManager.this.getClass().getSimpleName(), SelectorManager.this.hashCode(), _id));
542                 if (LOG.isDebugEnabled())
543                     LOG.debug("Starting {} on {}", _thread, this);
544                 while (isRunning())
545                     select();
546                 while(isStopping())
547                     runChanges();
548             }
549             finally
550             {
551                 if (LOG.isDebugEnabled())
552                     LOG.debug("Stopped {} on {}", _thread, this);
553                 _thread.setName(name);
554                 if (_priorityDelta != 0)
555                     _thread.setPriority(priority);
556             }
557         }
558 
559         /**
560          * <p>Process changes and waits on {@link Selector#select()}.</p>
561          *
562          * @see #submit(Runnable)
563          */
564         public void select()
565         {
566             boolean debug = LOG.isDebugEnabled();
567             try
568             {
569                 _state.set(State.CHANGES);
570 
571                 // Run the changes, and only exit if we ran all changes
572                 out: while(true)
573                 {
574                     switch (_state.get())
575                     {
576                         case CHANGES:
577                             runChanges();
578                             if (_state.compareAndSet(State.CHANGES, State.SELECT))
579                                 break out;
580                             continue;
581                         case MORE_CHANGES:
582                             runChanges();
583                             _state.set(State.CHANGES);
584                             continue;
585                         default:
586                             throw new IllegalStateException();    
587                     }
588                 }
589                 // Must check first for SELECT and *then* for WAKEUP
590                 // because we read the state twice in the assert, and
591                 // it could change from SELECT to WAKEUP in between.
592                 assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
593 
594                 if (debug)
595                     LOG.debug("Selector loop waiting on select");
596                 int selected = _selector.select();
597                 if (debug)
598                     LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
599 
600                 _state.set(State.PROCESS);
601 
602                 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
603                 for (SelectionKey key : selectedKeys)
604                 {
605                     if (key.isValid())
606                     {
607                         processKey(key);
608                     }
609                     else
610                     {
611                         if (debug)
612                             LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
613                         Object attachment = key.attachment();
614                         if (attachment instanceof EndPoint)
615                             ((EndPoint)attachment).close();
616                     }
617                 }
618                 selectedKeys.clear();
619             }
620             catch (Throwable x)
621             {
622                 if (isRunning())
623                     LOG.warn(x);
624                 else
625                     LOG.ignore(x);
626             }
627         }
628 
629         private void processKey(SelectionKey key)
630         {
631             Object attachment = key.attachment();
632             try
633             {
634                 if (attachment instanceof SelectableEndPoint)
635                 {
636                     ((SelectableEndPoint)attachment).onSelected();
637                 }
638                 else if (key.isConnectable())
639                 {
640                     processConnect(key, (Connect)attachment);
641                 }
642                 else if (key.isAcceptable())
643                 {
644                     processAccept(key);
645                 }
646                 else
647                 {
648                     throw new IllegalStateException();
649                 }
650             }
651             catch (CancelledKeyException x)
652             {
653                 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
654                 if (attachment instanceof EndPoint)
655                     closeNoExceptions((EndPoint)attachment);
656             }
657             catch (Throwable x)
658             {
659                 LOG.warn("Could not process key for channel " + key.channel(), x);
660                 if (attachment instanceof EndPoint)
661                     closeNoExceptions((EndPoint)attachment);
662             }
663         }
664 
665         private void processConnect(SelectionKey key, Connect connect)
666         {
667             SocketChannel channel = (SocketChannel)key.channel();
668             try
669             {
670                 key.attach(connect.attachment);
671                 boolean connected = finishConnect(channel);
672                 if (connected)
673                 {
674                     connect.timeout.cancel();
675                     key.interestOps(0);
676                     EndPoint endpoint = createEndPoint(channel, key);
677                     key.attach(endpoint);
678                 }
679                 else
680                 {
681                     throw new ConnectException();
682                 }
683             }
684             catch (Throwable x)
685             {
686                 connect.failed(x);
687             }
688         }
689         
690         private void processAccept(SelectionKey key)
691         {
692             ServerSocketChannel server = (ServerSocketChannel)key.channel();
693             SocketChannel channel = null;
694             try
695             {
696                 while ((channel = server.accept()) != null)
697                 {
698                     accepted(channel);
699                 }
700             }
701             catch (Throwable x)
702             {
703                 closeNoExceptions(channel);
704                 LOG.warn("Accept failed for channel " + channel, x);
705             }
706         }
707 
708         private void closeNoExceptions(Closeable closeable)
709         {
710             try
711             {
712                 if (closeable != null)
713                     closeable.close();
714             }
715             catch (Throwable x)
716             {
717                 LOG.ignore(x);
718             }
719         }
720 
721         public void wakeup()
722         {
723             _selector.wakeup();
724         }
725 
726         public boolean isSelectorThread()
727         {
728             return Thread.currentThread() == _thread;
729         }
730 
731         private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
732         {
733             EndPoint endPoint = newEndPoint(channel, this, selectionKey);
734             endPointOpened(endPoint);
735             Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
736             endPoint.setConnection(connection);
737             connectionOpened(connection);
738             if (LOG.isDebugEnabled())
739                 LOG.debug("Created {}", endPoint);
740             return endPoint;
741         }
742 
743         public void destroyEndPoint(EndPoint endPoint)
744         {
745             if (LOG.isDebugEnabled())
746                 LOG.debug("Destroyed {}", endPoint);
747             Connection connection = endPoint.getConnection();
748             if (connection != null)
749                 connectionClosed(connection);
750             endPointClosed(endPoint);
751         }
752 
753         @Override
754         public String dump()
755         {
756             return ContainerLifeCycle.dump(this);
757         }
758 
759         @Override
760         public void dump(Appendable out, String indent) throws IOException
761         {
762             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
763 
764             Thread selecting = _thread;
765 
766             Object where = "not selecting";
767             StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
768             if (trace != null)
769             {
770                 for (StackTraceElement t : trace)
771                     if (t.getClassName().startsWith("org.eclipse.jetty."))
772                     {
773                         where = t;
774                         break;
775                     }
776             }
777 
778             Selector selector = _selector;
779             if (selector != null && selector.isOpen())
780             {
781                 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
782                 dump.add(where);
783 
784                 DumpKeys dumpKeys = new DumpKeys(dump);
785                 submit(dumpKeys);
786                 dumpKeys.await(5, TimeUnit.SECONDS);
787 
788                 ContainerLifeCycle.dump(out, indent, dump);
789             }
790         }
791 
792         public void dumpKeysState(List<Object> dumps)
793         {
794             Selector selector = _selector;
795             Set<SelectionKey> keys = selector.keys();
796             dumps.add(selector + " keys=" + keys.size());
797             for (SelectionKey key : keys)
798             {
799                 if (key.isValid())
800                     dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
801                 else
802                     dumps.add(key.attachment() + " iOps=-1 rOps=-1");
803             }
804         }
805 
806         @Override
807         public String toString()
808         {
809             Selector selector = _selector;
810             return String.format("%s keys=%d selected=%d",
811                     super.toString(),
812                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
813                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
814         }
815 
816         private class DumpKeys implements Runnable
817         {
818             private final CountDownLatch latch = new CountDownLatch(1);
819             private final List<Object> _dumps;
820 
821             private DumpKeys(List<Object> dumps)
822             {
823                 this._dumps = dumps;
824             }
825 
826             @Override
827             public void run()
828             {
829                 dumpKeysState(_dumps);
830                 latch.countDown();
831             }
832 
833             public boolean await(long timeout, TimeUnit unit)
834             {
835                 try
836                 {
837                     return latch.await(timeout, unit);
838                 }
839                 catch (InterruptedException x)
840                 {
841                     return false;
842                 }
843             }
844         }
845 
846         private class Acceptor implements Runnable
847         {
848             private final ServerSocketChannel _channel;
849 
850             public Acceptor(ServerSocketChannel channel)
851             {
852                 this._channel = channel;
853             }
854 
855             @Override
856             public void run()
857             {
858                 try
859                 {
860                     SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
861                     if (LOG.isDebugEnabled())
862                         LOG.debug("{} acceptor={}", this, key);
863                 }
864                 catch (Throwable x)
865                 {
866                     closeNoExceptions(_channel);
867                     LOG.warn(x);
868                 }
869             }
870         }
871 
872         private class Accept implements Runnable
873         {
874             private final SocketChannel channel;
875             private final Object attachment;
876 
877             private Accept(SocketChannel channel, Object attachment)
878             {
879                 this.channel = channel;
880                 this.attachment = attachment;
881             }
882 
883             @Override
884             public void run()
885             {
886                 try
887                 {
888                     SelectionKey key = channel.register(_selector, 0, attachment);
889                     EndPoint endpoint = createEndPoint(channel, key);
890                     key.attach(endpoint);
891                 }
892                 catch (Throwable x)
893                 {
894                     closeNoExceptions(channel);
895                     LOG.debug(x);
896                 }
897             }
898         }
899 
900         private class Connect implements Runnable
901         {
902             private final AtomicBoolean failed = new AtomicBoolean();
903             private final SocketChannel channel;
904             private final Object attachment;
905             private final Scheduler.Task timeout;
906 
907             private Connect(SocketChannel channel, Object attachment)
908             {
909                 this.channel = channel;
910                 this.attachment = attachment;
911                 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
912             }
913 
914             @Override
915             public void run()
916             {
917                 try
918                 {
919                     channel.register(_selector, SelectionKey.OP_CONNECT, this);
920                 }
921                 catch (Throwable x)
922                 {
923                     failed(x);
924                 }
925             }
926 
927             private void failed(Throwable failure)
928             {
929                 if (failed.compareAndSet(false, true))
930                 {
931                     timeout.cancel();
932                     closeNoExceptions(channel);
933                     connectionFailed(channel, failure, attachment);
934                 }
935             }
936         }
937 
938         private class ConnectTimeout implements Runnable
939         {
940             private final Connect connect;
941 
942             private ConnectTimeout(Connect connect)
943             {
944                 this.connect = connect;
945             }
946 
947             @Override
948             public void run()
949             {
950                 SocketChannel channel = connect.channel;
951                 if (channel.isConnectionPending())
952                 {
953                     if (LOG.isDebugEnabled())
954                         LOG.debug("Channel {} timed out while connecting, closing it", channel);
955                     connect.failed(new SocketTimeoutException());
956                 }
957             }
958         }
959 
960         private class Stop implements Runnable
961         {
962             private final CountDownLatch latch = new CountDownLatch(1);
963 
964             @Override
965             public void run()
966             {
967                 try
968                 {
969                     for (SelectionKey key : _selector.keys())
970                     {
971                         Object attachment = key.attachment();
972                         if (attachment instanceof EndPoint)
973                         {
974                             EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
975                             execute(closer);
976                             // We are closing the SelectorManager, so we want to block the
977                             // selector thread here until we have closed all EndPoints.
978                             // This is different than calling close() directly, because close()
979                             // can wait forever, while here we are limited by the stop timeout.
980                             closer.await(getStopTimeout());
981                         }
982                     }
983 
984                     closeNoExceptions(_selector);
985                 }
986                 finally
987                 {
988                     latch.countDown();
989                 }
990             }
991 
992             public boolean await(long timeout)
993             {
994                 try
995                 {
996                     return latch.await(timeout, TimeUnit.MILLISECONDS);
997                 }
998                 catch (InterruptedException x)
999                 {
1000                     return false;
1001                 }
1002             }
1003         }
1004 
1005         private class EndPointCloser implements Runnable
1006         {
1007             private final CountDownLatch latch = new CountDownLatch(1);
1008             private final EndPoint endPoint;
1009 
1010             private EndPointCloser(EndPoint endPoint)
1011             {
1012                 this.endPoint = endPoint;
1013             }
1014 
1015             @Override
1016             public void run()
1017             {
1018                 try
1019                 {
1020                     closeNoExceptions(endPoint.getConnection());
1021                 }
1022                 finally
1023                 {
1024                     latch.countDown();
1025                 }
1026             }
1027 
1028             private boolean await(long timeout)
1029             {
1030                 try
1031                 {
1032                     return latch.await(timeout, TimeUnit.MILLISECONDS);
1033                 }
1034                 catch (InterruptedException x)
1035                 {
1036                     return false;
1037                 }
1038             }
1039         }
1040     }
1041 
1042     /**
1043      * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
1044      * non-blocking events by the {@link ManagedSelector}.
1045      */
1046     public interface SelectableEndPoint extends EndPoint
1047     {
1048         /**
1049          * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
1050          * for this endpoint.</p>
1051          */
1052         void onSelected();
1053     }
1054 }