1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.Socket;
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import org.eclipse.jetty.util.ConcurrentArrayQueue;
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.component.AbstractLifeCycle;
45 import org.eclipse.jetty.util.component.ContainerLifeCycle;
46 import org.eclipse.jetty.util.component.Dumpable;
47 import org.eclipse.jetty.util.log.Log;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.thread.NonBlockingThread;
50 import org.eclipse.jetty.util.thread.Scheduler;
51
52
53
54
55
56
57
58 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
59 {
60 public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
61 public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
62 protected static final Logger LOG = Log.getLogger(SelectorManager.class);
63 private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
64
65 private final Executor executor;
66 private final Scheduler scheduler;
67 private final ManagedSelector[] _selectors;
68 private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
69 private long _selectorIndex;
70
71 protected SelectorManager(Executor executor, Scheduler scheduler)
72 {
73 this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
74 }
75
76 protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
77 {
78 if (selectors<=0)
79 throw new IllegalArgumentException("No selectors");
80 this.executor = executor;
81 this.scheduler = scheduler;
82 _selectors = new ManagedSelector[selectors];
83 }
84
85 public Executor getExecutor()
86 {
87 return executor;
88 }
89
90 public Scheduler getScheduler()
91 {
92 return scheduler;
93 }
94
95
96
97
98
99
100 public long getConnectTimeout()
101 {
102 return _connectTimeout;
103 }
104
105
106
107
108
109
110 public void setConnectTimeout(long milliseconds)
111 {
112 _connectTimeout = milliseconds;
113 }
114
115
116
117
118
119
120 protected void execute(Runnable task)
121 {
122 executor.execute(task);
123 }
124
125
126
127
128 public int getSelectorCount()
129 {
130 return _selectors.length;
131 }
132
133 private ManagedSelector chooseSelector()
134 {
135
136
137
138 long s = _selectorIndex++;
139 int index = (int)(s % getSelectorCount());
140 return _selectors[index];
141 }
142
143
144
145
146
147
148
149
150
151 public void connect(SocketChannel channel, Object attachment)
152 {
153 ManagedSelector set = chooseSelector();
154 set.submit(set.new Connect(channel, attachment));
155 }
156
157
158
159
160
161
162
163
164 public void accept(final SocketChannel channel)
165 {
166 final ManagedSelector selector = chooseSelector();
167 selector.submit(selector.new Accept(channel));
168 }
169
170
171
172
173
174
175
176
177
178 public void acceptor(final ServerSocketChannel server)
179 {
180 final ManagedSelector selector = chooseSelector();
181 selector.submit(selector.new Acceptor(server));
182 }
183
184
185
186
187
188
189
190
191
192
193 protected void accepted(SocketChannel channel) throws IOException
194 {
195 throw new UnsupportedOperationException();
196 }
197
198 @Override
199 protected void doStart() throws Exception
200 {
201 super.doStart();
202 for (int i = 0; i < _selectors.length; i++)
203 {
204 ManagedSelector selector = newSelector(i);
205 _selectors[i] = selector;
206 selector.start();
207 execute(new NonBlockingThread(selector));
208 }
209 }
210
211
212
213
214
215
216
217 protected ManagedSelector newSelector(int id)
218 {
219 return new ManagedSelector(id);
220 }
221
222 @Override
223 protected void doStop() throws Exception
224 {
225 for (ManagedSelector selector : _selectors)
226 selector.stop();
227 super.doStop();
228 }
229
230
231
232
233
234
235 protected void endPointOpened(EndPoint endpoint)
236 {
237 endpoint.onOpen();
238 }
239
240
241
242
243
244
245 protected void endPointClosed(EndPoint endpoint)
246 {
247 endpoint.onClose();
248 }
249
250
251
252
253
254
255 public void connectionOpened(Connection connection)
256 {
257 try
258 {
259 connection.onOpen();
260 }
261 catch (Throwable x)
262 {
263 if (isRunning())
264 LOG.warn("Exception while notifying connection " + connection, x);
265 else
266 LOG.debug("Exception while notifying connection {}",connection, x);
267 }
268 }
269
270
271
272
273
274
275 public void connectionClosed(Connection connection)
276 {
277 try
278 {
279 connection.onClose();
280 }
281 catch (Throwable x)
282 {
283 LOG.debug("Exception while notifying connection " + connection, x);
284 }
285 }
286
287 protected boolean finishConnect(SocketChannel channel) throws IOException
288 {
289 return channel.finishConnect();
290 }
291
292
293
294
295
296
297
298
299
300 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
301 {
302 LOG.warn(String.format("%s - %s", channel, attachment), ex);
303 }
304
305
306
307
308
309
310
311
312
313
314
315
316
317 protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
318
319
320
321
322
323
324
325
326
327
328
329 public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
330
331 @Override
332 public String dump()
333 {
334 return ContainerLifeCycle.dump(this);
335 }
336
337 @Override
338 public void dump(Appendable out, String indent) throws IOException
339 {
340 ContainerLifeCycle.dumpObject(out, this);
341 ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
342 }
343
344 private enum State
345 {
346 CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
347 }
348
349
350
351
352
353
354
355 public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
356 {
357 private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
358 private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
359 private final int _id;
360 private Selector _selector;
361 private volatile Thread _thread;
362
363 public ManagedSelector(int id)
364 {
365 _id = id;
366 setStopTimeout(5000);
367 }
368
369 @Override
370 protected void doStart() throws Exception
371 {
372 super.doStart();
373 _selector = Selector.open();
374 _state.set(State.PROCESS);
375 }
376
377 @Override
378 protected void doStop() throws Exception
379 {
380 LOG.debug("Stopping {}", this);
381 Stop stop = new Stop();
382 submit(stop);
383 stop.await(getStopTimeout());
384 LOG.debug("Stopped {}", this);
385 }
386
387
388
389
390
391
392
393 public void updateKey(Runnable update)
394 {
395 if (__submitKeyUpdates)
396 {
397 submit(update);
398 }
399 else
400 {
401 runChange(update);
402 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
403 wakeup();
404 }
405 }
406
407
408
409
410
411
412
413
414 public void submit(Runnable change)
415 {
416
417
418
419
420
421 _changes.offer(change);
422 LOG.debug("Queued change {}", change);
423
424 out: while (true)
425 {
426 switch (_state.get())
427 {
428 case SELECT:
429
430 if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
431 continue;
432 wakeup();
433 break out;
434 case CHANGES:
435
436
437 if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
438 break out;
439 continue;
440 case WAKEUP:
441
442 break out;
443 case MORE_CHANGES:
444
445 break out;
446 case PROCESS:
447
448 break out;
449 default:
450 throw new IllegalStateException();
451 }
452 }
453 }
454
455 private void runChanges()
456 {
457 Runnable change;
458 while ((change = _changes.poll()) != null)
459 runChange(change);
460 }
461
462 protected void runChange(Runnable change)
463 {
464 try
465 {
466 LOG.debug("Running change {}", change);
467 change.run();
468 }
469 catch (Throwable x)
470 {
471 LOG.debug("Could not run change " + change, x);
472 }
473 }
474
475 @Override
476 public void run()
477 {
478 _thread = Thread.currentThread();
479 String name = _thread.getName();
480 try
481 {
482 _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id);
483 LOG.debug("Starting {} on {}", _thread, this);
484 while (isRunning())
485 select();
486 runChanges();
487 }
488 finally
489 {
490 LOG.debug("Stopped {} on {}", _thread, this);
491 _thread.setName(name);
492 }
493 }
494
495
496
497
498
499
500 public void select()
501 {
502 boolean debug = LOG.isDebugEnabled();
503 try
504 {
505 _state.set(State.CHANGES);
506
507
508 out: while(true)
509 {
510 switch (_state.get())
511 {
512 case CHANGES:
513 runChanges();
514 if (_state.compareAndSet(State.CHANGES, State.SELECT))
515 break out;
516 continue;
517 case MORE_CHANGES:
518 runChanges();
519 _state.set(State.CHANGES);
520 continue;
521 default:
522 throw new IllegalStateException();
523 }
524 }
525
526
527
528 assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
529
530 if (debug)
531 LOG.debug("Selector loop waiting on select");
532 int selected = _selector.select();
533 if (debug)
534 LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
535
536 _state.set(State.PROCESS);
537
538 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
539 for (SelectionKey key : selectedKeys)
540 {
541 if (key.isValid())
542 {
543 processKey(key);
544 }
545 else
546 {
547 if (debug)
548 LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
549 Object attachment = key.attachment();
550 if (attachment instanceof EndPoint)
551 ((EndPoint)attachment).close();
552 }
553 }
554 selectedKeys.clear();
555 }
556 catch (Throwable x)
557 {
558 if (isRunning())
559 LOG.warn(x);
560 else
561 LOG.ignore(x);
562 }
563 }
564
565 private void processKey(SelectionKey key)
566 {
567 Object attachment = key.attachment();
568 try
569 {
570 if (attachment instanceof SelectableEndPoint)
571 {
572 ((SelectableEndPoint)attachment).onSelected();
573 }
574 else if (key.isConnectable())
575 {
576 processConnect(key, (Connect)attachment);
577 }
578 else if (key.isAcceptable())
579 {
580 processAccept(key);
581 }
582 else
583 {
584 throw new IllegalStateException();
585 }
586 }
587 catch (CancelledKeyException x)
588 {
589 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
590 if (attachment instanceof EndPoint)
591 closeNoExceptions((EndPoint)attachment);
592 }
593 catch (Throwable x)
594 {
595 LOG.warn("Could not process key for channel " + key.channel(), x);
596 if (attachment instanceof EndPoint)
597 closeNoExceptions((EndPoint)attachment);
598 }
599 }
600
601 private void processConnect(SelectionKey key, Connect connect)
602 {
603 SocketChannel channel = (SocketChannel)key.channel();
604 try
605 {
606 key.attach(connect.attachment);
607 boolean connected = finishConnect(channel);
608 if (connected)
609 {
610 connect.timeout.cancel();
611 key.interestOps(0);
612 EndPoint endpoint = createEndPoint(channel, key);
613 key.attach(endpoint);
614 }
615 else
616 {
617 throw new ConnectException();
618 }
619 }
620 catch (Throwable x)
621 {
622 connect.failed(x);
623 }
624 }
625
626 private void processAccept(SelectionKey key)
627 {
628 ServerSocketChannel server = (ServerSocketChannel)key.channel();
629 SocketChannel channel = null;
630 try
631 {
632 while ((channel = server.accept()) != null)
633 {
634 accepted(channel);
635 }
636 }
637 catch (Throwable x)
638 {
639 closeNoExceptions(channel);
640 LOG.warn("Accept failed for channel " + channel, x);
641 }
642 }
643
644 private void closeNoExceptions(Closeable closeable)
645 {
646 try
647 {
648 if (closeable != null)
649 closeable.close();
650 }
651 catch (Throwable x)
652 {
653 LOG.ignore(x);
654 }
655 }
656
657 public void wakeup()
658 {
659 _selector.wakeup();
660 }
661
662 public boolean isSelectorThread()
663 {
664 return Thread.currentThread() == _thread;
665 }
666
667 private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
668 {
669 EndPoint endPoint = newEndPoint(channel, this, selectionKey);
670 endPointOpened(endPoint);
671 Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
672 endPoint.setConnection(connection);
673 connectionOpened(connection);
674 LOG.debug("Created {}", endPoint);
675 return endPoint;
676 }
677
678 public void destroyEndPoint(EndPoint endPoint)
679 {
680 LOG.debug("Destroyed {}", endPoint);
681 Connection connection = endPoint.getConnection();
682 if (connection != null)
683 connectionClosed(connection);
684 endPointClosed(endPoint);
685 }
686
687 @Override
688 public String dump()
689 {
690 return ContainerLifeCycle.dump(this);
691 }
692
693 @Override
694 public void dump(Appendable out, String indent) throws IOException
695 {
696 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
697
698 Thread selecting = _thread;
699
700 Object where = "not selecting";
701 StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
702 if (trace != null)
703 {
704 for (StackTraceElement t : trace)
705 if (t.getClassName().startsWith("org.eclipse.jetty."))
706 {
707 where = t;
708 break;
709 }
710 }
711
712 Selector selector = _selector;
713 if (selector != null && selector.isOpen())
714 {
715 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
716 dump.add(where);
717
718 DumpKeys dumpKeys = new DumpKeys(dump);
719 submit(dumpKeys);
720 dumpKeys.await(5, TimeUnit.SECONDS);
721
722 ContainerLifeCycle.dump(out, indent, dump);
723 }
724 }
725
726 public void dumpKeysState(List<Object> dumps)
727 {
728 Selector selector = _selector;
729 Set<SelectionKey> keys = selector.keys();
730 dumps.add(selector + " keys=" + keys.size());
731 for (SelectionKey key : keys)
732 {
733 if (key.isValid())
734 dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
735 else
736 dumps.add(key.attachment() + " iOps=-1 rOps=-1");
737 }
738 }
739
740 @Override
741 public String toString()
742 {
743 Selector selector = _selector;
744 return String.format("%s keys=%d selected=%d",
745 super.toString(),
746 selector != null && selector.isOpen() ? selector.keys().size() : -1,
747 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
748 }
749
750 private class DumpKeys implements Runnable
751 {
752 private final CountDownLatch latch = new CountDownLatch(1);
753 private final List<Object> _dumps;
754
755 private DumpKeys(List<Object> dumps)
756 {
757 this._dumps = dumps;
758 }
759
760 @Override
761 public void run()
762 {
763 dumpKeysState(_dumps);
764 latch.countDown();
765 }
766
767 public boolean await(long timeout, TimeUnit unit)
768 {
769 try
770 {
771 return latch.await(timeout, unit);
772 }
773 catch (InterruptedException x)
774 {
775 return false;
776 }
777 }
778 }
779
780 private class Acceptor implements Runnable
781 {
782 private final ServerSocketChannel _channel;
783
784 public Acceptor(ServerSocketChannel channel)
785 {
786 this._channel = channel;
787 }
788
789 @Override
790 public void run()
791 {
792 try
793 {
794 SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
795 LOG.debug("{} acceptor={}", this, key);
796 }
797 catch (Throwable x)
798 {
799 closeNoExceptions(_channel);
800 LOG.warn(x);
801 }
802 }
803 }
804
805 private class Accept implements Runnable
806 {
807 private final SocketChannel _channel;
808
809 public Accept(SocketChannel channel)
810 {
811 this._channel = channel;
812 }
813
814 @Override
815 public void run()
816 {
817 try
818 {
819 SelectionKey key = _channel.register(_selector, 0, null);
820 EndPoint endpoint = createEndPoint(_channel, key);
821 key.attach(endpoint);
822 }
823 catch (Throwable x)
824 {
825 closeNoExceptions(_channel);
826 LOG.debug(x);
827 }
828 }
829 }
830
831 private class Connect implements Runnable
832 {
833 private final AtomicBoolean failed = new AtomicBoolean();
834 private final SocketChannel channel;
835 private final Object attachment;
836 private final Scheduler.Task timeout;
837
838 public Connect(SocketChannel channel, Object attachment)
839 {
840 this.channel = channel;
841 this.attachment = attachment;
842 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
843 }
844
845 @Override
846 public void run()
847 {
848 try
849 {
850 channel.register(_selector, SelectionKey.OP_CONNECT, this);
851 }
852 catch (Throwable x)
853 {
854 failed(x);
855 }
856 }
857
858 protected void failed(Throwable failure)
859 {
860 if (failed.compareAndSet(false, true))
861 {
862 timeout.cancel();
863 closeNoExceptions(channel);
864 connectionFailed(channel, failure, attachment);
865 }
866 }
867 }
868
869 private class ConnectTimeout implements Runnable
870 {
871 private final Connect connect;
872
873 private ConnectTimeout(Connect connect)
874 {
875 this.connect = connect;
876 }
877
878 @Override
879 public void run()
880 {
881 SocketChannel channel = connect.channel;
882 if (channel.isConnectionPending())
883 {
884 LOG.debug("Channel {} timed out while connecting, closing it", channel);
885 connect.failed(new SocketTimeoutException());
886 }
887 }
888 }
889
890 private class Stop implements Runnable
891 {
892 private final CountDownLatch latch = new CountDownLatch(1);
893
894 @Override
895 public void run()
896 {
897 try
898 {
899 for (SelectionKey key : _selector.keys())
900 {
901 Object attachment = key.attachment();
902 if (attachment instanceof EndPoint)
903 {
904 EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
905 execute(closer);
906
907
908
909
910 closer.await(getStopTimeout());
911 }
912 }
913
914 closeNoExceptions(_selector);
915 }
916 finally
917 {
918 latch.countDown();
919 }
920 }
921
922 public boolean await(long timeout)
923 {
924 try
925 {
926 return latch.await(timeout, TimeUnit.MILLISECONDS);
927 }
928 catch (InterruptedException x)
929 {
930 return false;
931 }
932 }
933 }
934
935 private class EndPointCloser implements Runnable
936 {
937 private final CountDownLatch latch = new CountDownLatch(1);
938 private final EndPoint endPoint;
939
940 private EndPointCloser(EndPoint endPoint)
941 {
942 this.endPoint = endPoint;
943 }
944
945 @Override
946 public void run()
947 {
948 try
949 {
950 closeNoExceptions(endPoint.getConnection());
951 }
952 finally
953 {
954 latch.countDown();
955 }
956 }
957
958 private boolean await(long timeout)
959 {
960 try
961 {
962 return latch.await(timeout, TimeUnit.MILLISECONDS);
963 }
964 catch (InterruptedException x)
965 {
966 return false;
967 }
968 }
969 }
970 }
971
972
973
974
975
976 public interface SelectableEndPoint extends EndPoint
977 {
978
979
980
981
982 void onSelected();
983 }
984 }