View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.net.ConnectException;
24  import java.net.Socket;
25  import java.net.SocketAddress;
26  import java.net.SocketTimeoutException;
27  import java.nio.channels.CancelledKeyException;
28  import java.nio.channels.ClosedChannelException;
29  import java.nio.channels.SelectionKey;
30  import java.nio.channels.Selector;
31  import java.nio.channels.ServerSocketChannel;
32  import java.nio.channels.SocketChannel;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.Set;
37  import java.util.concurrent.ConcurrentLinkedQueue;
38  import java.util.concurrent.CountDownLatch;
39  import java.util.concurrent.Executor;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  
43  import org.eclipse.jetty.util.TypeUtil;
44  import org.eclipse.jetty.util.component.AbstractLifeCycle;
45  import org.eclipse.jetty.util.component.ContainerLifeCycle;
46  import org.eclipse.jetty.util.component.Dumpable;
47  import org.eclipse.jetty.util.log.Log;
48  import org.eclipse.jetty.util.log.Logger;
49  import org.eclipse.jetty.util.thread.Scheduler;
50  
51  /**
52   * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
53   * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
54   * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
55   * {@link EndPoint}s and {@link Connection}s.</p>
56   */
57  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
58  {
59      protected static final Logger LOG = Log.getLogger(SelectorManager.class);
60      /**
61       * The default connect timeout, in milliseconds
62       */
63      public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
64  
65      private final Executor executor;
66      private final Scheduler scheduler;
67      private final ManagedSelector[] _selectors;
68      private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
69      private long _selectorIndex;
70  
71      protected SelectorManager(Executor executor, Scheduler scheduler)
72      {
73          this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
74      }
75  
76      protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
77      {
78          this.executor = executor;
79          this.scheduler = scheduler;
80          _selectors = new ManagedSelector[selectors];
81      }
82  
83      public Executor getExecutor()
84      {
85          return executor;
86      }
87  
88      public Scheduler getScheduler()
89      {
90          return scheduler;
91      }
92  
93      /**
94       * Get the connect timeout
95       * 
96       * @return the connect timeout (in milliseconds)
97       */
98      public long getConnectTimeout()
99      {
100         return _connectTimeout;
101     }
102 
103     /**
104      * Set the connect timeout (in milliseconds)
105      * 
106      * @param milliseconds the number of milliseconds for the timeout
107      */
108     public void setConnectTimeout(long milliseconds)
109     {
110         _connectTimeout = milliseconds;
111     }
112 
113     /**
114      * Executes the given task in a different thread.
115      *
116      * @param task the task to execute
117      */
118     protected void execute(Runnable task)
119     {
120         executor.execute(task);
121     }
122 
123     /**
124      * @return the number of selectors in use
125      */
126     public int getSelectorCount()
127     {
128         return _selectors.length;
129     }
130 
131     private ManagedSelector chooseSelector()
132     {
133         // The ++ increment here is not atomic, but it does not matter,
134         // so long as the value changes sometimes, then connections will
135         // be distributed over the available selectors.
136         long s = _selectorIndex++;
137         int index = (int)(s % getSelectorCount());
138         return _selectors[index];
139     }
140 
141     /**
142      * <p>Registers a channel to perform a non-blocking connect.</p>
143      * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
144      * must be called prior to calling this method.</p>
145      *
146      * @param channel    the channel to register
147      * @param attachment the attachment object
148      */
149     public void connect(SocketChannel channel, Object attachment)
150     {
151         ManagedSelector set = chooseSelector();
152         set.submit(set.new Connect(channel, attachment));
153     }
154 
155     /**
156      * <p>Registers a channel to perform non-blocking read/write operations.</p>
157      * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
158      * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
159      *
160      * @param channel the channel to register
161      */
162     public void accept(final SocketChannel channel)
163     {
164         final ManagedSelector selector = chooseSelector();
165         selector.submit(selector.new Accept(channel));
166     }
167 
168     @Override
169     protected void doStart() throws Exception
170     {
171         super.doStart();
172         for (int i = 0; i < _selectors.length; i++)
173         {
174             ManagedSelector selector = newSelector(i);
175             _selectors[i] = selector;
176             selector.start();
177             execute(selector);
178         }
179     }
180 
181     /**
182      * <p>Factory method for {@link ManagedSelector}.</p>
183      *
184      * @param id an identifier for the {@link ManagedSelector to create}
185      * @return a new {@link ManagedSelector}
186      */
187     protected ManagedSelector newSelector(int id)
188     {
189         return new ManagedSelector(id);
190     }
191 
192     @Override
193     protected void doStop() throws Exception
194     {
195         for (ManagedSelector selector : _selectors)
196             selector.stop();
197         super.doStop();
198     }
199 
200     /**
201      * <p>Callback method invoked when an endpoint is opened.</p>
202      *
203      * @param endpoint the endpoint being opened
204      */
205     protected void endPointOpened(EndPoint endpoint)
206     {
207         endpoint.onOpen();
208     }
209 
210     /**
211      * <p>Callback method invoked when an endpoint is closed.</p>
212      *
213      * @param endpoint the endpoint being closed
214      */
215     protected void endPointClosed(EndPoint endpoint)
216     {
217         endpoint.onClose();
218     }
219 
220     /**
221      * <p>Callback method invoked when a connection is opened.</p>
222      *
223      * @param connection the connection just opened
224      */
225     public void connectionOpened(Connection connection)
226     {
227         try
228         {
229             connection.onOpen();
230         }
231         catch (Exception x)
232         {
233             LOG.info("Exception while notifying connection " + connection, x);
234         }
235     }
236 
237     /**
238      * <p>Callback method invoked when a connection is closed.</p>
239      *
240      * @param connection the connection just closed
241      */
242     public void connectionClosed(Connection connection)
243     {
244         try
245         {
246             connection.onClose();
247         }
248         catch (Exception x)
249         {
250             LOG.info("Exception while notifying connection " + connection, x);
251         }
252     }
253 
254     protected boolean finishConnect(SocketChannel channel) throws IOException
255     {
256         return channel.finishConnect();
257     }
258 
259     /**
260      * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
261      * <p>By default it just logs with level warning.</p>
262      *
263      * @param channel the channel that attempted the connect
264      * @param ex the exception that caused the connect to fail
265      * @param attachment the attachment object associated at registration
266      */
267     protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
268     {
269         LOG.warn(String.format("%s - %s", channel, attachment), ex);
270     }
271 
272     /**
273      * <p>Factory method to create {@link EndPoint}.</p>
274      * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
275      * or {@link #accept(SocketChannel)}.</p>
276      *
277      * @param channel   the channel associated to the endpoint
278      * @param selector the selector the channel is registered to
279      * @param selectionKey      the selection key
280      * @return a new endpoint
281      * @throws IOException if the endPoint cannot be created
282      * @see #newConnection(SocketChannel, EndPoint, Object)
283      */
284     protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
285 
286     /**
287      * <p>Factory method to create {@link Connection}.</p>
288      *
289      * @param channel    the channel associated to the connection
290      * @param endpoint   the endpoint
291      * @param attachment the attachment
292      * @return a new connection
293      * @throws IOException
294      * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
295      */
296     public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
297 
298     @Override
299     public String dump()
300     {
301         return ContainerLifeCycle.dump(this);
302     }
303 
304     @Override
305     public void dump(Appendable out, String indent) throws IOException
306     {
307         ContainerLifeCycle.dumpObject(out, this);
308         ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
309     }
310 
311     /**
312      * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
313      * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
314      * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
315      * with the channel.</p>
316      */
317     public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
318     {
319         private final Queue<Runnable> _changes = new ConcurrentLinkedQueue<>();
320         private final int _id;
321         private Selector _selector;
322         private volatile Thread _thread;
323         private boolean _needsWakeup = true;
324         private boolean _runningChanges = false;
325 
326         public ManagedSelector(int id)
327         {
328             _id = id;
329             setStopTimeout(5000);
330         }
331 
332         @Override
333         protected void doStart() throws Exception
334         {
335             super.doStart();
336             _selector = Selector.open();
337         }
338 
339         @Override
340         protected void doStop() throws Exception
341         {
342             LOG.debug("Stopping {}", this);
343             Stop stop = new Stop();
344             submit(stop);
345             stop.await(getStopTimeout());
346             LOG.debug("Stopped {}", this);
347         }
348 
349         /**
350          * <p>Submits a change to be executed in the selector thread.</p>
351          * <p>Changes may be submitted from any thread, and the selector thread woken up
352          * (if necessary) to execute the change.</p>
353          *
354          * @param change the change to submit
355          */
356         public void submit(Runnable change)
357         {
358             // if we have been called by the selector thread we can directly run the change
359             if (_thread==Thread.currentThread())
360             {
361                 // If we are already iterating over the changes, just add this change to the list.
362                 // No race here because it is this thread that is iterating over the changes.
363                 if (_runningChanges)
364                     _changes.offer(change);
365                 else
366                 {       
367                     // Otherwise we run the queued changes
368                     runChanges();
369                     // and then directly run the passed change
370                     runChange(change);
371                 }
372             }
373             else
374             {
375                 // otherwise we have to queue the change and wakeup the selector
376                 _changes.offer(change);
377                 LOG.debug("Queued change {}", change);
378                 boolean wakeup = _needsWakeup;
379                 if (wakeup)
380                     wakeup();
381             }
382         }
383 
384         private void runChanges()
385         {
386             try
387             {
388                 if (_runningChanges)
389                     throw new IllegalStateException();
390                 _runningChanges=true;
391 
392                 Runnable change;
393                 while ((change = _changes.poll()) != null)
394                     runChange(change);
395             }
396             finally
397             {
398                 _runningChanges=false;
399             }
400         }
401 
402         protected void runChange(Runnable change)
403         {
404             LOG.debug("Running change {}", change);
405             change.run();
406         }
407 
408         @Override
409         public void run()
410         {
411             _thread = Thread.currentThread();
412             String name = _thread.getName();
413             try
414             {
415                 _thread.setName(name + "-selector-" + _id);
416                 LOG.debug("Starting {} on {}", _thread, this);
417                 while (isRunning())
418                     select();
419                 processChanges();
420             }
421             finally
422             {
423                 LOG.debug("Stopped {} on {}", _thread, this);
424                 _thread.setName(name);
425             }
426         }
427 
428         /**
429          * <p>Process changes and waits on {@link Selector#select()}.</p>
430          *
431          * @see #submit(Runnable)
432          */
433         public void select()
434         {
435             boolean debug = LOG.isDebugEnabled();
436             try
437             {
438                 processChanges();
439 
440                 if (debug)
441                     LOG.debug("Selector loop waiting on select");
442                 int selected = _selector.select();
443                 if (debug)
444                     LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
445 
446                 _needsWakeup = false;
447 
448                 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
449                 for (SelectionKey key : selectedKeys)
450                 {
451                     if (key.isValid())
452                     {
453                         processKey(key);
454                     }
455                     else
456                     {
457                         if (debug)
458                             LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
459                         Object attachment = key.attachment();
460                         if (attachment instanceof EndPoint)
461                             ((EndPoint)attachment).close();
462                     }
463                 }
464                 selectedKeys.clear();
465             }
466             catch (Exception x)
467             {
468                 if (isRunning())
469                     LOG.warn(x);
470                 else
471                     LOG.ignore(x);
472             }
473         }
474 
475         private void processChanges()
476         {
477             runChanges();
478 
479             // If tasks are submitted between these 2 statements, they will not
480             // wakeup the selector, therefore below we run again the tasks
481 
482             _needsWakeup = true;
483 
484             // Run again the tasks to avoid the race condition where a task is
485             // submitted but will not wake up the selector
486             runChanges();
487         }
488 
489         private void processKey(SelectionKey key)
490         {
491             Object attachment = key.attachment();
492             try
493             {
494                 if (attachment instanceof SelectableEndPoint)
495                 {
496                     ((SelectableEndPoint)attachment).onSelected();
497                 }
498                 else if (key.isConnectable())
499                 {
500                     processConnect(key, (Connect)attachment);
501                 }
502                 else
503                 {
504                     throw new IllegalStateException();
505                 }
506             }
507             catch (CancelledKeyException x)
508             {
509                 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
510                 if (attachment instanceof EndPoint)
511                     ((EndPoint)attachment).close();
512             }
513             catch (Exception x)
514             {
515                 LOG.warn("Could not process key for channel " + key.channel(), x);
516                 if (attachment instanceof EndPoint)
517                     ((EndPoint)attachment).close();
518             }
519         }
520 
521         private void processConnect(SelectionKey key, Connect connect)
522         {
523             key.attach(connect.attachment);
524             SocketChannel channel = (SocketChannel)key.channel();
525             try
526             {
527                 boolean connected = finishConnect(channel);
528                 if (connected)
529                 {
530                     connect.timeout.cancel();
531                     key.interestOps(0);
532                     EndPoint endpoint = createEndPoint(channel, key);
533                     key.attach(endpoint);
534                 }
535                 else
536                 {
537                     throw new ConnectException();
538                 }
539             }
540             catch (Exception x)
541             {
542                 connect.failed(x);
543                 closeNoExceptions(channel);
544             }
545         }
546 
547         private void closeNoExceptions(Closeable closeable)
548         {
549             try
550             {
551                 closeable.close();
552             }
553             catch (IOException x)
554             {
555                 LOG.ignore(x);
556             }
557         }
558 
559         public void wakeup()
560         {
561             _selector.wakeup();
562         }
563 
564         public boolean isSelectorThread()
565         {
566             return Thread.currentThread() == _thread;
567         }
568 
569         private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
570         {
571             EndPoint endPoint = newEndPoint(channel, this, selectionKey);
572             endPointOpened(endPoint);
573             Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
574             endPoint.setConnection(connection);
575             connectionOpened(connection);
576             LOG.debug("Created {}", endPoint);
577             return endPoint;
578         }
579 
580         public void destroyEndPoint(EndPoint endPoint)
581         {
582             LOG.debug("Destroyed {}", endPoint);
583             Connection connection = endPoint.getConnection();
584             if (connection != null)
585                 connectionClosed(connection);
586             endPointClosed(endPoint);
587         }
588 
589         @Override
590         public String dump()
591         {
592             return ContainerLifeCycle.dump(this);
593         }
594 
595         @Override
596         public void dump(Appendable out, String indent) throws IOException
597         {
598             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
599 
600             Thread selecting = _thread;
601 
602             Object where = "not selecting";
603             StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
604             if (trace != null)
605             {
606                 for (StackTraceElement t : trace)
607                     if (t.getClassName().startsWith("org.eclipse.jetty."))
608                     {
609                         where = t;
610                         break;
611                     }
612             }
613 
614             Selector selector = _selector;
615             if (selector != null && selector.isOpen())
616             {
617                 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
618                 dump.add(where);
619 
620                 DumpKeys dumpKeys = new DumpKeys(dump);
621                 submit(dumpKeys);
622                 dumpKeys.await(5, TimeUnit.SECONDS);
623 
624                 ContainerLifeCycle.dump(out, indent, dump);
625             }
626         }
627 
628         public void dumpKeysState(List<Object> dumps)
629         {
630             Selector selector = _selector;
631             Set<SelectionKey> keys = selector.keys();
632             dumps.add(selector + " keys=" + keys.size());
633             for (SelectionKey key : keys)
634             {
635                 if (key.isValid())
636                     dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
637                 else
638                     dumps.add(key.attachment() + " iOps=-1 rOps=-1");
639             }
640         }
641 
642         @Override
643         public String toString()
644         {
645             Selector selector = _selector;
646             return String.format("%s keys=%d selected=%d",
647                     super.toString(),
648                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
649                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
650         }
651 
652         private class DumpKeys implements Runnable
653         {
654             private final CountDownLatch latch = new CountDownLatch(1);
655             private final List<Object> _dumps;
656 
657             private DumpKeys(List<Object> dumps)
658             {
659                 this._dumps = dumps;
660             }
661 
662             @Override
663             public void run()
664             {
665                 dumpKeysState(_dumps);
666                 latch.countDown();
667             }
668 
669             public boolean await(long timeout, TimeUnit unit)
670             {
671                 try
672                 {
673                     return latch.await(timeout, unit);
674                 }
675                 catch (InterruptedException x)
676                 {
677                     return false;
678                 }
679             }
680         }
681 
682         private class Accept implements Runnable
683         {
684             private final SocketChannel _channel;
685 
686             public Accept(SocketChannel channel)
687             {
688                 this._channel = channel;
689             }
690 
691             @Override
692             public void run()
693             {
694                 try
695                 {
696                     SelectionKey key = _channel.register(_selector, 0, null);
697                     EndPoint endpoint = createEndPoint(_channel, key);
698                     key.attach(endpoint);
699                 }
700                 catch (IOException x)
701                 {
702                     LOG.debug(x);
703                 }
704             }
705         }
706 
707         private class Connect implements Runnable
708         {
709             private final AtomicBoolean failed = new AtomicBoolean();
710             private final SocketChannel channel;
711             private final Object attachment;
712             private final Scheduler.Task timeout;
713 
714             public Connect(SocketChannel channel, Object attachment)
715             {
716                 this.channel = channel;
717                 this.attachment = attachment;
718                 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
719             }
720 
721             @Override
722             public void run()
723             {
724                 try
725                 {
726                     channel.register(_selector, SelectionKey.OP_CONNECT, this);
727                 }
728                 catch (ClosedChannelException x)
729                 {
730                     LOG.debug(x);
731                 }
732             }
733 
734             protected void failed(Throwable failure)
735             {
736                 if (failed.compareAndSet(false, true))
737                     connectionFailed(channel, failure, attachment);
738             }
739         }
740 
741         private class ConnectTimeout implements Runnable
742         {
743             private final Connect connect;
744 
745             private ConnectTimeout(Connect connect)
746             {
747                 this.connect = connect;
748             }
749 
750             @Override
751             public void run()
752             {
753                 SocketChannel channel = connect.channel;
754                 if (channel.isConnectionPending())
755                 {
756                     LOG.debug("Channel {} timed out while connecting, closing it", channel);
757                     try
758                     {
759                         // This will unregister the channel from the selector
760                         channel.close();
761                     }
762                     catch (IOException x)
763                     {
764                         LOG.ignore(x);
765                     }
766                     finally
767                     {
768                         connect.failed(new SocketTimeoutException());
769                     }
770                 }
771             }
772         }
773 
774         private class Stop implements Runnable
775         {
776             private final CountDownLatch latch = new CountDownLatch(1);
777 
778             @Override
779             public void run()
780             {
781                 try
782                 {
783                     for (SelectionKey key : _selector.keys())
784                     {
785                         Object attachment = key.attachment();
786                         if (attachment instanceof EndPoint)
787                         {
788                             EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
789                             execute(closer);
790                             // We are closing the SelectorManager, so we want to block the
791                             // selector thread here until we have closed all EndPoints.
792                             // This is different than calling close() directly, because close()
793                             // can wait forever, while here we are limited by the stop timeout.
794                             closer.await(getStopTimeout());
795                         }
796                     }
797 
798                     closeNoExceptions(_selector);
799                 }
800                 finally
801                 {
802                     latch.countDown();
803                 }
804             }
805 
806             public boolean await(long timeout)
807             {
808                 try
809                 {
810                     return latch.await(timeout, TimeUnit.MILLISECONDS);
811                 }
812                 catch (InterruptedException x)
813                 {
814                     return false;
815                 }
816             }
817         }
818 
819         private class EndPointCloser implements Runnable
820         {
821             private final CountDownLatch latch = new CountDownLatch(1);
822             private final EndPoint endPoint;
823 
824             private EndPointCloser(EndPoint endPoint)
825             {
826                 this.endPoint = endPoint;
827             }
828 
829             @Override
830             public void run()
831             {
832                 try
833                 {
834                     endPoint.getConnection().close();
835                 }
836                 finally
837                 {
838                     latch.countDown();
839                 }
840             }
841 
842             private boolean await(long timeout)
843             {
844                 try
845                 {
846                     return latch.await(timeout, TimeUnit.MILLISECONDS);
847                 }
848                 catch (InterruptedException x)
849                 {
850                     return false;
851                 }
852             }
853         }
854     }
855 
856     /**
857      * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
858      * non-blocking events by the {@link ManagedSelector}.
859      */
860     public interface SelectableEndPoint extends EndPoint
861     {
862         /**
863          * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
864          * for this endpoint.</p>
865          */
866         void onSelected();
867     }
868 }