View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.net.ConnectException;
24  import java.net.Socket;
25  import java.net.SocketAddress;
26  import java.net.SocketTimeoutException;
27  import java.nio.channels.CancelledKeyException;
28  import java.nio.channels.SelectionKey;
29  import java.nio.channels.Selector;
30  import java.nio.channels.ServerSocketChannel;
31  import java.nio.channels.SocketChannel;
32  import java.util.ArrayList;
33  import java.util.List;
34  import java.util.Queue;
35  import java.util.Set;
36  import java.util.concurrent.CountDownLatch;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  
41  import org.eclipse.jetty.util.ConcurrentArrayQueue;
42  import org.eclipse.jetty.util.TypeUtil;
43  import org.eclipse.jetty.util.component.AbstractLifeCycle;
44  import org.eclipse.jetty.util.component.ContainerLifeCycle;
45  import org.eclipse.jetty.util.component.Dumpable;
46  import org.eclipse.jetty.util.log.Log;
47  import org.eclipse.jetty.util.log.Logger;
48  import org.eclipse.jetty.util.thread.Scheduler;
49  
50  /**
51   * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
52   * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
53   * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
54   * {@link EndPoint}s and {@link Connection}s.</p>
55   */
56  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
57  {
58      protected static final Logger LOG = Log.getLogger(SelectorManager.class);
59      /**
60       * The default connect timeout, in milliseconds
61       */
62      public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
63  
64      private final Executor executor;
65      private final Scheduler scheduler;
66      private final ManagedSelector[] _selectors;
67      private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
68      private long _selectorIndex;
69  
70      protected SelectorManager(Executor executor, Scheduler scheduler)
71      {
72          this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
73      }
74  
75      protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
76      {
77          this.executor = executor;
78          this.scheduler = scheduler;
79          _selectors = new ManagedSelector[selectors];
80      }
81  
82      public Executor getExecutor()
83      {
84          return executor;
85      }
86  
87      public Scheduler getScheduler()
88      {
89          return scheduler;
90      }
91  
92      /**
93       * Get the connect timeout
94       *
95       * @return the connect timeout (in milliseconds)
96       */
97      public long getConnectTimeout()
98      {
99          return _connectTimeout;
100     }
101 
102     /**
103      * Set the connect timeout (in milliseconds)
104      *
105      * @param milliseconds the number of milliseconds for the timeout
106      */
107     public void setConnectTimeout(long milliseconds)
108     {
109         _connectTimeout = milliseconds;
110     }
111 
112     /**
113      * Executes the given task in a different thread.
114      *
115      * @param task the task to execute
116      */
117     protected void execute(Runnable task)
118     {
119         executor.execute(task);
120     }
121 
122     /**
123      * @return the number of selectors in use
124      */
125     public int getSelectorCount()
126     {
127         return _selectors.length;
128     }
129 
130     private ManagedSelector chooseSelector()
131     {
132         // The ++ increment here is not atomic, but it does not matter,
133         // so long as the value changes sometimes, then connections will
134         // be distributed over the available selectors.
135         long s = _selectorIndex++;
136         int index = (int)(s % getSelectorCount());
137         return _selectors[index];
138     }
139 
140     /**
141      * <p>Registers a channel to perform a non-blocking connect.</p>
142      * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
143      * must be called prior to calling this method.</p>
144      *
145      * @param channel    the channel to register
146      * @param attachment the attachment object
147      */
148     public void connect(SocketChannel channel, Object attachment)
149     {
150         ManagedSelector set = chooseSelector();
151         set.submit(set.new Connect(channel, attachment));
152     }
153 
154     /**
155      * <p>Registers a channel to perform non-blocking read/write operations.</p>
156      * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
157      * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
158      *
159      * @param channel the channel to register
160      */
161     public void accept(final SocketChannel channel)
162     {
163         final ManagedSelector selector = chooseSelector();
164         selector.submit(selector.new Accept(channel));
165     }
166 
167     @Override
168     protected void doStart() throws Exception
169     {
170         super.doStart();
171         for (int i = 0; i < _selectors.length; i++)
172         {
173             ManagedSelector selector = newSelector(i);
174             _selectors[i] = selector;
175             selector.start();
176             execute(selector);
177         }
178     }
179 
180     /**
181      * <p>Factory method for {@link ManagedSelector}.</p>
182      *
183      * @param id an identifier for the {@link ManagedSelector to create}
184      * @return a new {@link ManagedSelector}
185      */
186     protected ManagedSelector newSelector(int id)
187     {
188         return new ManagedSelector(id);
189     }
190 
191     @Override
192     protected void doStop() throws Exception
193     {
194         for (ManagedSelector selector : _selectors)
195             selector.stop();
196         super.doStop();
197     }
198 
199     /**
200      * <p>Callback method invoked when an endpoint is opened.</p>
201      *
202      * @param endpoint the endpoint being opened
203      */
204     protected void endPointOpened(EndPoint endpoint)
205     {
206         endpoint.onOpen();
207     }
208 
209     /**
210      * <p>Callback method invoked when an endpoint is closed.</p>
211      *
212      * @param endpoint the endpoint being closed
213      */
214     protected void endPointClosed(EndPoint endpoint)
215     {
216         endpoint.onClose();
217     }
218 
219     /**
220      * <p>Callback method invoked when a connection is opened.</p>
221      *
222      * @param connection the connection just opened
223      */
224     public void connectionOpened(Connection connection)
225     {
226         try
227         {
228             connection.onOpen();
229         }
230         catch (Throwable x)
231         {
232             if (isRunning())
233                 LOG.warn("Exception while notifying connection " + connection, x);
234             else
235                 LOG.debug("Exception while notifying connection {}",connection, x);
236         }
237     }
238 
239     /**
240      * <p>Callback method invoked when a connection is closed.</p>
241      *
242      * @param connection the connection just closed
243      */
244     public void connectionClosed(Connection connection)
245     {
246         try
247         {
248             connection.onClose();
249         }
250         catch (Throwable x)
251         {
252             LOG.debug("Exception while notifying connection " + connection, x);
253         }
254     }
255 
256     protected boolean finishConnect(SocketChannel channel) throws IOException
257     {
258         return channel.finishConnect();
259     }
260 
261     /**
262      * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
263      * <p>By default it just logs with level warning.</p>
264      *
265      * @param channel the channel that attempted the connect
266      * @param ex the exception that caused the connect to fail
267      * @param attachment the attachment object associated at registration
268      */
269     protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
270     {
271         LOG.warn(String.format("%s - %s", channel, attachment), ex);
272     }
273 
274     /**
275      * <p>Factory method to create {@link EndPoint}.</p>
276      * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
277      * or {@link #accept(SocketChannel)}.</p>
278      *
279      * @param channel   the channel associated to the endpoint
280      * @param selector the selector the channel is registered to
281      * @param selectionKey      the selection key
282      * @return a new endpoint
283      * @throws IOException if the endPoint cannot be created
284      * @see #newConnection(SocketChannel, EndPoint, Object)
285      */
286     protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
287 
288     /**
289      * <p>Factory method to create {@link Connection}.</p>
290      *
291      * @param channel    the channel associated to the connection
292      * @param endpoint   the endpoint
293      * @param attachment the attachment
294      * @return a new connection
295      * @throws IOException
296      * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
297      */
298     public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
299 
300     @Override
301     public String dump()
302     {
303         return ContainerLifeCycle.dump(this);
304     }
305 
306     @Override
307     public void dump(Appendable out, String indent) throws IOException
308     {
309         ContainerLifeCycle.dumpObject(out, this);
310         ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
311     }
312 
313     /**
314      * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
315      * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
316      * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
317      * with the channel.</p>
318      */
319     public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
320     {
321         private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
322 
323         private final int _id;
324         private Selector _selector;
325         private volatile Thread _thread;
326         private boolean _needsWakeup = true;
327         private boolean _runningChanges = false;
328 
329         public ManagedSelector(int id)
330         {
331             _id = id;
332             setStopTimeout(5000);
333         }
334 
335         @Override
336         protected void doStart() throws Exception
337         {
338             super.doStart();
339             _selector = Selector.open();
340         }
341 
342         @Override
343         protected void doStop() throws Exception
344         {
345             LOG.debug("Stopping {}", this);
346             Stop stop = new Stop();
347             submit(stop);
348             stop.await(getStopTimeout());
349             LOG.debug("Stopped {}", this);
350         }
351 
352         /**
353          * <p>Submits a change to be executed in the selector thread.</p>
354          * <p>Changes may be submitted from any thread, and the selector thread woken up
355          * (if necessary) to execute the change.</p>
356          *
357          * @param change the change to submit
358          */
359         public void submit(Runnable change)
360         {
361             // if we have been called by the selector thread we can directly run the change
362             if (_thread==Thread.currentThread())
363             {
364                 // If we are already iterating over the changes, just add this change to the list.
365                 // No race here because it is this thread that is iterating over the changes.
366                 if (_runningChanges)
367                     _changes.offer(change);
368                 else
369                 {
370                     // Otherwise we run the queued changes
371                     runChanges();
372                     // and then directly run the passed change
373                     runChange(change);
374                 }
375             }
376             else
377             {
378                 // otherwise we have to queue the change and wakeup the selector
379                 _changes.offer(change);
380                 LOG.debug("Queued change {}", change);
381                 boolean wakeup = _needsWakeup;
382                 if (wakeup)
383                     wakeup();
384             }
385         }
386 
387         private void runChanges()
388         {
389             try
390             {
391                 if (_runningChanges)
392                     throw new IllegalStateException();
393                 _runningChanges=true;
394 
395                 Runnable change;
396                 while ((change = _changes.poll()) != null)
397                     runChange(change);
398             }
399             finally
400             {
401                 _runningChanges=false;
402             }
403         }
404 
405         protected void runChange(Runnable change)
406         {
407             try
408             {
409                 LOG.debug("Running change {}", change);
410                 change.run();
411             }
412             catch (Throwable x)
413             {
414                 LOG.debug("Could not run change " + change, x);
415             }
416         }
417 
418         @Override
419         public void run()
420         {
421             _thread = Thread.currentThread();
422             String name = _thread.getName();
423             try
424             {
425                 _thread.setName(name + "-selector-" + _id);
426                 LOG.debug("Starting {} on {}", _thread, this);
427                 while (isRunning())
428                     select();
429                 processChanges();
430             }
431             finally
432             {
433                 LOG.debug("Stopped {} on {}", _thread, this);
434                 _thread.setName(name);
435             }
436         }
437 
438         /**
439          * <p>Process changes and waits on {@link Selector#select()}.</p>
440          *
441          * @see #submit(Runnable)
442          */
443         public void select()
444         {
445             boolean debug = LOG.isDebugEnabled();
446             try
447             {
448                 processChanges();
449 
450                 if (debug)
451                     LOG.debug("Selector loop waiting on select");
452                 int selected = _selector.select();
453                 if (debug)
454                     LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
455 
456                 _needsWakeup = false;
457 
458                 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
459                 for (SelectionKey key : selectedKeys)
460                 {
461                     if (key.isValid())
462                     {
463                         processKey(key);
464                     }
465                     else
466                     {
467                         if (debug)
468                             LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
469                         Object attachment = key.attachment();
470                         if (attachment instanceof EndPoint)
471                             ((EndPoint)attachment).close();
472                     }
473                 }
474                 selectedKeys.clear();
475             }
476             catch (Throwable x)
477             {
478                 if (isRunning())
479                     LOG.warn(x);
480                 else
481                     LOG.ignore(x);
482             }
483         }
484 
485         private void processChanges()
486         {
487             runChanges();
488 
489             // If tasks are submitted between these 2 statements, they will not
490             // wakeup the selector, therefore below we run again the tasks
491 
492             _needsWakeup = true;
493 
494             // Run again the tasks to avoid the race condition where a task is
495             // submitted but will not wake up the selector
496             runChanges();
497         }
498 
499         private void processKey(SelectionKey key)
500         {
501             Object attachment = key.attachment();
502             try
503             {
504                 if (attachment instanceof SelectableEndPoint)
505                 {
506                     ((SelectableEndPoint)attachment).onSelected();
507                 }
508                 else if (key.isConnectable())
509                 {
510                     processConnect(key, (Connect)attachment);
511                 }
512                 else
513                 {
514                     throw new IllegalStateException();
515                 }
516             }
517             catch (CancelledKeyException x)
518             {
519                 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
520                 if (attachment instanceof EndPoint)
521                     ((EndPoint)attachment).close();
522             }
523             catch (Throwable x)
524             {
525                 LOG.warn("Could not process key for channel " + key.channel(), x);
526                 if (attachment instanceof EndPoint)
527                     ((EndPoint)attachment).close();
528             }
529         }
530 
531         private void processConnect(SelectionKey key, Connect connect)
532         {
533             SocketChannel channel = (SocketChannel)key.channel();
534             try
535             {
536                 key.attach(connect.attachment);
537                 boolean connected = finishConnect(channel);
538                 if (connected)
539                 {
540                     connect.timeout.cancel();
541                     key.interestOps(0);
542                     EndPoint endpoint = createEndPoint(channel, key);
543                     key.attach(endpoint);
544                 }
545                 else
546                 {
547                     throw new ConnectException();
548                 }
549             }
550             catch (Throwable x)
551             {
552                 connect.failed(x);
553             }
554         }
555 
556         private void closeNoExceptions(Closeable closeable)
557         {
558             try
559             {
560                 closeable.close();
561             }
562             catch (Throwable x)
563             {
564                 LOG.ignore(x);
565             }
566         }
567 
568         public void wakeup()
569         {
570             _selector.wakeup();
571         }
572 
573         public boolean isSelectorThread()
574         {
575             return Thread.currentThread() == _thread;
576         }
577 
578         private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
579         {
580             EndPoint endPoint = newEndPoint(channel, this, selectionKey);
581             endPointOpened(endPoint);
582             Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
583             endPoint.setConnection(connection);
584             connectionOpened(connection);
585             LOG.debug("Created {}", endPoint);
586             return endPoint;
587         }
588 
589         public void destroyEndPoint(EndPoint endPoint)
590         {
591             LOG.debug("Destroyed {}", endPoint);
592             Connection connection = endPoint.getConnection();
593             if (connection != null)
594                 connectionClosed(connection);
595             endPointClosed(endPoint);
596         }
597 
598         @Override
599         public String dump()
600         {
601             return ContainerLifeCycle.dump(this);
602         }
603 
604         @Override
605         public void dump(Appendable out, String indent) throws IOException
606         {
607             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
608 
609             Thread selecting = _thread;
610 
611             Object where = "not selecting";
612             StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
613             if (trace != null)
614             {
615                 for (StackTraceElement t : trace)
616                     if (t.getClassName().startsWith("org.eclipse.jetty."))
617                     {
618                         where = t;
619                         break;
620                     }
621             }
622 
623             Selector selector = _selector;
624             if (selector != null && selector.isOpen())
625             {
626                 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
627                 dump.add(where);
628 
629                 DumpKeys dumpKeys = new DumpKeys(dump);
630                 submit(dumpKeys);
631                 dumpKeys.await(5, TimeUnit.SECONDS);
632 
633                 ContainerLifeCycle.dump(out, indent, dump);
634             }
635         }
636 
637         public void dumpKeysState(List<Object> dumps)
638         {
639             Selector selector = _selector;
640             Set<SelectionKey> keys = selector.keys();
641             dumps.add(selector + " keys=" + keys.size());
642             for (SelectionKey key : keys)
643             {
644                 if (key.isValid())
645                     dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
646                 else
647                     dumps.add(key.attachment() + " iOps=-1 rOps=-1");
648             }
649         }
650 
651         @Override
652         public String toString()
653         {
654             Selector selector = _selector;
655             return String.format("%s keys=%d selected=%d",
656                     super.toString(),
657                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
658                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
659         }
660 
661         private class DumpKeys implements Runnable
662         {
663             private final CountDownLatch latch = new CountDownLatch(1);
664             private final List<Object> _dumps;
665 
666             private DumpKeys(List<Object> dumps)
667             {
668                 this._dumps = dumps;
669             }
670 
671             @Override
672             public void run()
673             {
674                 dumpKeysState(_dumps);
675                 latch.countDown();
676             }
677 
678             public boolean await(long timeout, TimeUnit unit)
679             {
680                 try
681                 {
682                     return latch.await(timeout, unit);
683                 }
684                 catch (InterruptedException x)
685                 {
686                     return false;
687                 }
688             }
689         }
690 
691         private class Accept implements Runnable
692         {
693             private final SocketChannel _channel;
694 
695             public Accept(SocketChannel channel)
696             {
697                 this._channel = channel;
698             }
699 
700             @Override
701             public void run()
702             {
703                 try
704                 {
705                     SelectionKey key = _channel.register(_selector, 0, null);
706                     EndPoint endpoint = createEndPoint(_channel, key);
707                     key.attach(endpoint);
708                 }
709                 catch (Throwable x)
710                 {
711                     closeNoExceptions(_channel);
712                     LOG.debug(x);
713                 }
714             }
715         }
716 
717         private class Connect implements Runnable
718         {
719             private final AtomicBoolean failed = new AtomicBoolean();
720             private final SocketChannel channel;
721             private final Object attachment;
722             private final Scheduler.Task timeout;
723 
724             public Connect(SocketChannel channel, Object attachment)
725             {
726                 this.channel = channel;
727                 this.attachment = attachment;
728                 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
729             }
730 
731             @Override
732             public void run()
733             {
734                 try
735                 {
736                     channel.register(_selector, SelectionKey.OP_CONNECT, this);
737                 }
738                 catch (Throwable x)
739                 {
740                     failed(x);
741                 }
742             }
743 
744             protected void failed(Throwable failure)
745             {
746                 if (failed.compareAndSet(false, true))
747                 {
748                     timeout.cancel();
749                     closeNoExceptions(channel);
750                     connectionFailed(channel, failure, attachment);
751                 }
752             }
753         }
754 
755         private class ConnectTimeout implements Runnable
756         {
757             private final Connect connect;
758 
759             private ConnectTimeout(Connect connect)
760             {
761                 this.connect = connect;
762             }
763 
764             @Override
765             public void run()
766             {
767                 SocketChannel channel = connect.channel;
768                 if (channel.isConnectionPending())
769                 {
770                     LOG.debug("Channel {} timed out while connecting, closing it", channel);
771                     connect.failed(new SocketTimeoutException());
772                 }
773             }
774         }
775 
776         private class Stop implements Runnable
777         {
778             private final CountDownLatch latch = new CountDownLatch(1);
779 
780             @Override
781             public void run()
782             {
783                 try
784                 {
785                     for (SelectionKey key : _selector.keys())
786                     {
787                         Object attachment = key.attachment();
788                         if (attachment instanceof EndPoint)
789                         {
790                             EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
791                             execute(closer);
792                             // We are closing the SelectorManager, so we want to block the
793                             // selector thread here until we have closed all EndPoints.
794                             // This is different than calling close() directly, because close()
795                             // can wait forever, while here we are limited by the stop timeout.
796                             closer.await(getStopTimeout());
797                         }
798                     }
799 
800                     closeNoExceptions(_selector);
801                 }
802                 finally
803                 {
804                     latch.countDown();
805                 }
806             }
807 
808             public boolean await(long timeout)
809             {
810                 try
811                 {
812                     return latch.await(timeout, TimeUnit.MILLISECONDS);
813                 }
814                 catch (InterruptedException x)
815                 {
816                     return false;
817                 }
818             }
819         }
820 
821         private class EndPointCloser implements Runnable
822         {
823             private final CountDownLatch latch = new CountDownLatch(1);
824             private final EndPoint endPoint;
825 
826             private EndPointCloser(EndPoint endPoint)
827             {
828                 this.endPoint = endPoint;
829             }
830 
831             @Override
832             public void run()
833             {
834                 try
835                 {
836                     closeNoExceptions(endPoint.getConnection());
837                 }
838                 finally
839                 {
840                     latch.countDown();
841                 }
842             }
843 
844             private boolean await(long timeout)
845             {
846                 try
847                 {
848                     return latch.await(timeout, TimeUnit.MILLISECONDS);
849                 }
850                 catch (InterruptedException x)
851                 {
852                     return false;
853                 }
854             }
855         }
856     }
857 
858     /**
859      * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
860      * non-blocking events by the {@link ManagedSelector}.
861      */
862     public interface SelectableEndPoint extends EndPoint
863     {
864         /**
865          * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
866          * for this endpoint.</p>
867          */
868         void onSelected();
869     }
870 }