View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.io;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.net.ConnectException;
24  import java.net.SocketTimeoutException;
25  import java.nio.channels.CancelledKeyException;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.ServerSocketChannel;
29  import java.nio.channels.SocketChannel;
30  import java.util.ArrayDeque;
31  import java.util.ArrayList;
32  import java.util.Collections;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.Set;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  
41  import org.eclipse.jetty.util.component.AbstractLifeCycle;
42  import org.eclipse.jetty.util.component.ContainerLifeCycle;
43  import org.eclipse.jetty.util.component.Dumpable;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  import org.eclipse.jetty.util.thread.ExecutionStrategy;
47  import org.eclipse.jetty.util.thread.Locker;
48  import org.eclipse.jetty.util.thread.Scheduler;
49  
50  /**
51   * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
52   * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
53   * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
54   * with the channel.</p>
55   */
56  public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
57  {
58      private static final Logger LOG = Log.getLogger(ManagedSelector.class);
59  
60      private final Locker _locker = new Locker();
61      private boolean _selecting = false;
62      private final Queue<Runnable> _actions = new ArrayDeque<>();
63      private final SelectorManager _selectorManager;
64      private final int _id;
65      private final ExecutionStrategy _strategy;
66      private Selector _selector;
67  
68      public ManagedSelector(SelectorManager selectorManager, int id)
69      {
70          _selectorManager = selectorManager;
71          _id = id;
72          _strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor());
73          setStopTimeout(5000);
74      }
75  
76      @Override
77      protected void doStart() throws Exception
78      {
79          super.doStart();
80          _selector = newSelector();
81      }
82  
83      protected Selector newSelector() throws IOException
84      {
85          return Selector.open();
86      }
87  
88      public int size()
89      {
90          Selector s = _selector;
91          if (s == null)
92              return 0;
93          return s.keys().size();
94      }
95  
96      @Override
97      protected void doStop() throws Exception
98      {
99          if (LOG.isDebugEnabled())
100             LOG.debug("Stopping {}", this);
101         CloseEndPoints close_endps = new CloseEndPoints();
102         submit(close_endps);
103         close_endps.await(getStopTimeout());
104         super.doStop();
105         CloseSelector close_selector = new CloseSelector();
106         submit(close_selector);
107         close_selector.await(getStopTimeout());
108 
109         if (LOG.isDebugEnabled())
110             LOG.debug("Stopped {}", this);
111     }
112 
113     public void submit(Runnable change)
114     {
115         if (LOG.isDebugEnabled())
116             LOG.debug("Queued change {} on {}", change, this);
117 
118         Selector selector = null;
119         try (Locker.Lock lock = _locker.lock())
120         {
121             _actions.offer(change);
122             if (_selecting)
123             {
124                 selector = _selector;
125                 // To avoid the extra select wakeup.
126                 _selecting = false;
127             }
128         }
129         if (selector != null)
130             selector.wakeup();
131     }
132 
133     @Override
134     public void run()
135     {
136         _strategy.execute();
137     }
138 
139     /**
140      * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be
141      * notified of non-blocking events by the {@link ManagedSelector}.
142      */
143     public interface SelectableEndPoint extends EndPoint
144     {
145         /**
146          * Callback method invoked when a read or write events has been
147          * detected by the {@link ManagedSelector} for this endpoint.
148          *
149          * @return a job that may block or null
150          */
151         Runnable onSelected();
152 
153         /**
154          * Callback method invoked when all the keys selected by the
155          * {@link ManagedSelector} for this endpoint have been processed.
156          */
157         void updateKey();
158     }
159 
160     private class SelectorProducer implements ExecutionStrategy.Producer
161     {
162         private Set<SelectionKey> _keys = Collections.emptySet();
163         private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
164 
165         @Override
166         public Runnable produce()
167         {
168             while (true)
169             {
170                 Runnable task = processSelected();
171                 if (task != null)
172                     return task;
173 
174                 Runnable action = runActions();
175                 if (action != null)
176                     return action;
177 
178                 update();
179 
180                 if (!select())
181                     return null;
182             }
183         }
184 
185         private Runnable runActions()
186         {
187             while (true)
188             {
189                 Runnable action;
190                 try (Locker.Lock lock = _locker.lock())
191                 {
192                     action = _actions.poll();
193                     if (action == null)
194                     {
195                         // No more actions, so we need to select
196                         _selecting = true;
197                         return null;
198                     }
199                 }
200 
201                 if (action instanceof Product)
202                     return action;
203 
204                 // Running the change may queue another action.
205                 runChange(action);
206             }
207         }
208 
209         private void runChange(Runnable change)
210         {
211             try
212             {
213                 if (LOG.isDebugEnabled())
214                     LOG.debug("Running change {}", change);
215                 change.run();
216             }
217             catch (Throwable x)
218             {
219                 LOG.debug("Could not run change " + change, x);
220             }
221         }
222 
223         private boolean select()
224         {
225             try
226             {
227                 Selector selector = _selector;
228                 if (selector != null && selector.isOpen())
229                 {
230                     if (LOG.isDebugEnabled())
231                         LOG.debug("Selector loop waiting on select");
232                     int selected = selector.select();
233                     if (LOG.isDebugEnabled())
234                         LOG.debug("Selector loop woken up from select, {}/{} selected", selected, selector.keys().size());
235 
236                     try (Locker.Lock lock = _locker.lock())
237                     {
238                         // finished selecting
239                         _selecting = false;
240                     }
241 
242                     _keys = selector.selectedKeys();
243                     _cursor = _keys.iterator();
244 
245                     return true;
246                 }
247             }
248             catch (Throwable x)
249             {
250                 closeNoExceptions(_selector);
251                 if (isRunning())
252                     LOG.warn(x);
253                 else
254                     LOG.debug(x);
255             }
256             return false;
257         }
258 
259         private Runnable processSelected()
260         {
261             while (_cursor.hasNext())
262             {
263                 SelectionKey key = _cursor.next();
264                 if (key.isValid())
265                 {
266                     Object attachment = key.attachment();
267                     try
268                     {
269                         if (attachment instanceof SelectableEndPoint)
270                         {
271                             // Try to produce a task
272                             Runnable task = ((SelectableEndPoint)attachment).onSelected();
273                             if (task != null)
274                                 return task;
275                         }
276                         else if (key.isConnectable())
277                         {
278                             Runnable task = processConnect(key, (Connect)attachment);
279                             if (task != null)
280                                 return task;
281                         }
282                         else if (key.isAcceptable())
283                         {
284                             processAccept(key);
285                         }
286                         else
287                         {
288                             throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps());
289                         }
290                     }
291                     catch (CancelledKeyException x)
292                     {
293                         LOG.debug("Ignoring cancelled key for channel {}", key.channel());
294                         if (attachment instanceof EndPoint)
295                             closeNoExceptions((EndPoint)attachment);
296                     }
297                     catch (Throwable x)
298                     {
299                         LOG.warn("Could not process key for channel " + key.channel(), x);
300                         if (attachment instanceof EndPoint)
301                             closeNoExceptions((EndPoint)attachment);
302                     }
303                 }
304                 else
305                 {
306                     if (LOG.isDebugEnabled())
307                         LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
308                     Object attachment = key.attachment();
309                     if (attachment instanceof EndPoint)
310                         closeNoExceptions((EndPoint)attachment);
311                 }
312             }
313             return null;
314         }
315 
316         private void update()
317         {
318             for (SelectionKey key : _keys)
319                 updateKey(key);
320             _keys.clear();
321         }
322 
323         private void updateKey(SelectionKey key)
324         {
325             Object attachment = key.attachment();
326             if (attachment instanceof SelectableEndPoint)
327                 ((SelectableEndPoint)attachment).updateKey();
328         }
329     }
330 
331     private interface Product extends Runnable
332     {
333     }
334 
335     private Runnable processConnect(SelectionKey key, final Connect connect)
336     {
337         SocketChannel channel = (SocketChannel)key.channel();
338         try
339         {
340             key.attach(connect.attachment);
341             boolean connected = _selectorManager.finishConnect(channel);
342             if (LOG.isDebugEnabled())
343                 LOG.debug("Connected {} {}", connected, channel);
344             if (connected)
345             {
346                 if (connect.timeout.cancel())
347                 {
348                     key.interestOps(0);
349                     return new CreateEndPoint(channel, key)
350                     {
351                         @Override
352                         protected void failed(Throwable failure)
353                         {
354                             super.failed(failure);
355                             connect.failed(failure);
356                         }
357                     };
358                 }
359                 else
360                 {
361                     throw new SocketTimeoutException("Concurrent Connect Timeout");
362                 }
363             }
364             else
365             {
366                 throw new ConnectException();
367             }
368         }
369         catch (Throwable x)
370         {
371             connect.failed(x);
372             return null;
373         }
374     }
375 
376     private void processAccept(SelectionKey key)
377     {
378         ServerSocketChannel server = (ServerSocketChannel)key.channel();
379         SocketChannel channel = null;
380         try
381         {
382             while ((channel = server.accept()) != null)
383             {
384                 _selectorManager.accepted(channel);
385             }
386         }
387         catch (Throwable x)
388         {
389             closeNoExceptions(channel);
390             LOG.warn("Accept failed for channel " + channel, x);
391         }
392     }
393 
394     private void closeNoExceptions(Closeable closeable)
395     {
396         try
397         {
398             if (closeable != null)
399                 closeable.close();
400         }
401         catch (Throwable x)
402         {
403             LOG.ignore(x);
404         }
405     }
406 
407     private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
408     {
409         EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
410         _selectorManager.endPointOpened(endPoint);
411         Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
412         endPoint.setConnection(connection);
413         selectionKey.attach(endPoint);
414         _selectorManager.connectionOpened(connection);
415         if (LOG.isDebugEnabled())
416             LOG.debug("Created {}", endPoint);
417         return endPoint;
418     }
419 
420     public void destroyEndPoint(final EndPoint endPoint)
421     {
422         final Connection connection = endPoint.getConnection();
423         submit(new Product()
424         {
425             @Override
426             public void run()
427             {
428                 if (LOG.isDebugEnabled())
429                     LOG.debug("Destroyed {}", endPoint);
430                 if (connection != null)
431                     _selectorManager.connectionClosed(connection);
432                 _selectorManager.endPointClosed(endPoint);
433             }
434         });
435     }
436 
437     @Override
438     public String dump()
439     {
440         return ContainerLifeCycle.dump(this);
441     }
442 
443     @Override
444     public void dump(Appendable out, String indent) throws IOException
445     {
446         out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator());
447 
448         Selector selector = _selector;
449         if (selector != null && selector.isOpen())
450         {
451             final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
452 
453             DumpKeys dumpKeys = new DumpKeys(dump);
454             submit(dumpKeys);
455             dumpKeys.await(5, TimeUnit.SECONDS);
456 
457             ContainerLifeCycle.dump(out, indent, dump);
458         }
459     }
460 
461     @Override
462     public String toString()
463     {
464         Selector selector = _selector;
465         return String.format("%s id=%s keys=%d selected=%d",
466                 super.toString(),
467                 _id,
468                 selector != null && selector.isOpen() ? selector.keys().size() : -1,
469                 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
470     }
471 
472     private class DumpKeys implements Runnable
473     {
474         private final CountDownLatch latch = new CountDownLatch(1);
475         private final List<Object> _dumps;
476 
477         private DumpKeys(List<Object> dumps)
478         {
479             this._dumps = dumps;
480         }
481 
482         @Override
483         public void run()
484         {
485             Selector selector = _selector;
486             if (selector != null && selector.isOpen())
487             {
488                 Set<SelectionKey> keys = selector.keys();
489                 _dumps.add(selector + " keys=" + keys.size());
490                 for (SelectionKey key : keys)
491                 {
492                     try
493                     {
494                         _dumps.add(String.format("SelectionKey@%x{i=%d}->%s", key.hashCode(), key.interestOps(), key.attachment()));
495                     }
496                     catch (Throwable x)
497                     {
498                         LOG.ignore(x);
499                     }
500                 }
501             }
502             latch.countDown();
503         }
504 
505         public boolean await(long timeout, TimeUnit unit)
506         {
507             try
508             {
509                 return latch.await(timeout, unit);
510             }
511             catch (InterruptedException x)
512             {
513                 return false;
514             }
515         }
516     }
517 
518     class Acceptor implements Runnable
519     {
520         private final ServerSocketChannel _channel;
521 
522         public Acceptor(ServerSocketChannel channel)
523         {
524             this._channel = channel;
525         }
526 
527         @Override
528         public void run()
529         {
530             try
531             {
532                 SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
533                 if (LOG.isDebugEnabled())
534                     LOG.debug("{} acceptor={}", this, key);
535             }
536             catch (Throwable x)
537             {
538                 closeNoExceptions(_channel);
539                 LOG.warn(x);
540             }
541         }
542     }
543 
544     class Accept implements Runnable
545     {
546         private final SocketChannel channel;
547         private final Object attachment;
548 
549         Accept(SocketChannel channel, Object attachment)
550         {
551             this.channel = channel;
552             this.attachment = attachment;
553         }
554 
555         @Override
556         public void run()
557         {
558             try
559             {
560                 final SelectionKey key = channel.register(_selector, 0, attachment);
561                 submit(new CreateEndPoint(channel, key));
562             }
563             catch (Throwable x)
564             {
565                 closeNoExceptions(channel);
566                 LOG.debug(x);
567             }
568         }
569     }
570 
571     private class CreateEndPoint implements Product
572     {
573         private final SocketChannel channel;
574         private final SelectionKey key;
575 
576         public CreateEndPoint(SocketChannel channel, SelectionKey key)
577         {
578             this.channel = channel;
579             this.key = key;
580         }
581 
582         @Override
583         public void run()
584         {
585             try
586             {
587                 createEndPoint(channel, key);
588             }
589             catch (Throwable x)
590             {
591                 LOG.debug(x);
592                 failed(x);
593             }
594         }
595 
596         protected void failed(Throwable failure)
597         {
598             closeNoExceptions(channel);
599             LOG.debug(failure);
600         }
601     }
602 
603     class Connect implements Runnable
604     {
605         private final AtomicBoolean failed = new AtomicBoolean();
606         private final SocketChannel channel;
607         private final Object attachment;
608         private final Scheduler.Task timeout;
609 
610         Connect(SocketChannel channel, Object attachment)
611         {
612             this.channel = channel;
613             this.attachment = attachment;
614             this.timeout = ManagedSelector.this._selectorManager.getScheduler().schedule(new ConnectTimeout(this), ManagedSelector.this._selectorManager.getConnectTimeout(), TimeUnit.MILLISECONDS);
615         }
616 
617         @Override
618         public void run()
619         {
620             try
621             {
622                 channel.register(_selector, SelectionKey.OP_CONNECT, this);
623             }
624             catch (Throwable x)
625             {
626                 failed(x);
627             }
628         }
629 
630         private void failed(Throwable failure)
631         {
632             if (failed.compareAndSet(false, true))
633             {
634                 timeout.cancel();
635                 closeNoExceptions(channel);
636                 ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment);
637             }
638         }
639     }
640 
641     private class ConnectTimeout implements Runnable
642     {
643         private final Connect connect;
644 
645         private ConnectTimeout(Connect connect)
646         {
647             this.connect = connect;
648         }
649 
650         @Override
651         public void run()
652         {
653             SocketChannel channel = connect.channel;
654             if (channel.isConnectionPending())
655             {
656                 if (LOG.isDebugEnabled())
657                     LOG.debug("Channel {} timed out while connecting, closing it", channel);
658                 connect.failed(new SocketTimeoutException("Connect Timeout"));
659             }
660         }
661     }
662 
663     private class CloseEndPoints implements Runnable
664     {
665         private final CountDownLatch _latch = new CountDownLatch(1);
666         private CountDownLatch _allClosed;
667 
668         @Override
669         public void run()
670         {
671             List<EndPoint> end_points = new ArrayList<>();
672             for (SelectionKey key : _selector.keys())
673             {
674                 if (key.isValid())
675                 {
676                     Object attachment = key.attachment();
677                     if (attachment instanceof EndPoint)
678                         end_points.add((EndPoint)attachment);
679                 }
680             }
681 
682             int size = end_points.size();
683             if (LOG.isDebugEnabled())
684                 LOG.debug("Closing {} endPoints on {}", size, ManagedSelector.this);
685 
686             _allClosed = new CountDownLatch(size);
687             _latch.countDown();
688 
689             for (EndPoint endp : end_points)
690                 submit(new EndPointCloser(endp, _allClosed));
691 
692             if (LOG.isDebugEnabled())
693                 LOG.debug("Closed {} endPoints on {}", size, ManagedSelector.this);
694         }
695 
696         public boolean await(long timeout)
697         {
698             try
699             {
700                 return _latch.await(timeout, TimeUnit.MILLISECONDS) &&
701                         _allClosed.await(timeout, TimeUnit.MILLISECONDS);
702             }
703             catch (InterruptedException x)
704             {
705                 return false;
706             }
707         }
708     }
709 
710     private class EndPointCloser implements Product
711     {
712         private final EndPoint _endPoint;
713         private final CountDownLatch _latch;
714 
715         private EndPointCloser(EndPoint endPoint, CountDownLatch latch)
716         {
717             _endPoint = endPoint;
718             _latch = latch;
719         }
720 
721         @Override
722         public void run()
723         {
724             closeNoExceptions(_endPoint.getConnection());
725             _latch.countDown();
726         }
727     }
728 
729     private class CloseSelector implements Runnable
730     {
731         private CountDownLatch _latch = new CountDownLatch(1);
732 
733         @Override
734         public void run()
735         {
736             Selector selector = _selector;
737             _selector = null;
738             closeNoExceptions(selector);
739             _latch.countDown();
740         }
741 
742         public boolean await(long timeout)
743         {
744             try
745             {
746                 return _latch.await(timeout, TimeUnit.MILLISECONDS);
747             }
748             catch (InterruptedException x)
749             {
750                 return false;
751             }
752         }
753     }
754 }