1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.Socket;
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import org.eclipse.jetty.util.ConcurrentArrayQueue;
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.annotation.ManagedAttribute;
45 import org.eclipse.jetty.util.component.AbstractLifeCycle;
46 import org.eclipse.jetty.util.component.ContainerLifeCycle;
47 import org.eclipse.jetty.util.component.Dumpable;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50 import org.eclipse.jetty.util.thread.NonBlockingThread;
51 import org.eclipse.jetty.util.thread.Scheduler;
52
53
54
55
56
57
58
59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
60 {
61 public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
62 public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
63 protected static final Logger LOG = Log.getLogger(SelectorManager.class);
64 private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
65
66 private final Executor executor;
67 private final Scheduler scheduler;
68 private final ManagedSelector[] _selectors;
69 private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
70 private long _selectorIndex;
71 private int _priorityDelta;
72
73 protected SelectorManager(Executor executor, Scheduler scheduler)
74 {
75 this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
76 }
77
78 protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
79 {
80 if (selectors<=0)
81 throw new IllegalArgumentException("No selectors");
82 this.executor = executor;
83 this.scheduler = scheduler;
84 _selectors = new ManagedSelector[selectors];
85 }
86
87 public Executor getExecutor()
88 {
89 return executor;
90 }
91
92 public Scheduler getScheduler()
93 {
94 return scheduler;
95 }
96
97
98
99
100
101
102 public long getConnectTimeout()
103 {
104 return _connectTimeout;
105 }
106
107
108
109
110
111
112 public void setConnectTimeout(long milliseconds)
113 {
114 _connectTimeout = milliseconds;
115 }
116
117
118 @ManagedAttribute("The priority delta to apply to selector threads")
119 public int getSelectorPriorityDelta()
120 {
121 return _priorityDelta;
122 }
123
124
125
126
127
128
129
130
131
132
133
134
135 public void setSelectorPriorityDelta(int selectorPriorityDelta)
136 {
137 int oldDelta = _priorityDelta;
138 _priorityDelta = selectorPriorityDelta;
139 if (oldDelta != selectorPriorityDelta && isStarted())
140 {
141 for (ManagedSelector selector : _selectors)
142 {
143 Thread thread = selector._thread;
144 if (thread != null)
145 {
146 int deltaDiff = selectorPriorityDelta - oldDelta;
147 thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff)));
148 }
149 }
150 }
151 }
152
153
154
155
156
157
158 protected void execute(Runnable task)
159 {
160 executor.execute(task);
161 }
162
163
164
165
166 public int getSelectorCount()
167 {
168 return _selectors.length;
169 }
170
171 private ManagedSelector chooseSelector()
172 {
173
174
175
176 long s = _selectorIndex++;
177 int index = (int)(s % getSelectorCount());
178 return _selectors[index];
179 }
180
181
182
183
184
185
186
187
188
189
190
191 public void connect(SocketChannel channel, Object attachment)
192 {
193 ManagedSelector set = chooseSelector();
194 set.submit(set.new Connect(channel, attachment));
195 }
196
197
198
199
200 public void accept(SocketChannel channel)
201 {
202 accept(channel, null);
203 }
204
205
206
207
208
209
210
211
212
213
214
215 public void accept(SocketChannel channel, Object attachment)
216 {
217 final ManagedSelector selector = chooseSelector();
218 selector.submit(selector.new Accept(channel, attachment));
219 }
220
221
222
223
224
225
226
227
228
229 public void acceptor(ServerSocketChannel server)
230 {
231 final ManagedSelector selector = chooseSelector();
232 selector.submit(selector.new Acceptor(server));
233 }
234
235
236
237
238
239
240
241
242
243
244 protected void accepted(SocketChannel channel) throws IOException
245 {
246 throw new UnsupportedOperationException();
247 }
248
249 @Override
250 protected void doStart() throws Exception
251 {
252 super.doStart();
253 for (int i = 0; i < _selectors.length; i++)
254 {
255 ManagedSelector selector = newSelector(i);
256 _selectors[i] = selector;
257 selector.start();
258 execute(new NonBlockingThread(selector));
259 }
260 }
261
262
263
264
265
266
267
268 protected ManagedSelector newSelector(int id)
269 {
270 return new ManagedSelector(id);
271 }
272
273 @Override
274 protected void doStop() throws Exception
275 {
276 for (ManagedSelector selector : _selectors)
277 selector.stop();
278 super.doStop();
279 }
280
281
282
283
284
285
286 protected void endPointOpened(EndPoint endpoint)
287 {
288 endpoint.onOpen();
289 }
290
291
292
293
294
295
296 protected void endPointClosed(EndPoint endpoint)
297 {
298 endpoint.onClose();
299 }
300
301
302
303
304
305
306 public void connectionOpened(Connection connection)
307 {
308 try
309 {
310 connection.onOpen();
311 }
312 catch (Throwable x)
313 {
314 if (isRunning())
315 LOG.warn("Exception while notifying connection " + connection, x);
316 else
317 LOG.debug("Exception while notifying connection {}",connection, x);
318 }
319 }
320
321
322
323
324
325
326 public void connectionClosed(Connection connection)
327 {
328 try
329 {
330 connection.onClose();
331 }
332 catch (Throwable x)
333 {
334 LOG.debug("Exception while notifying connection " + connection, x);
335 }
336 }
337
338 protected boolean finishConnect(SocketChannel channel) throws IOException
339 {
340 return channel.finishConnect();
341 }
342
343
344
345
346
347
348
349
350
351 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
352 {
353 LOG.warn(String.format("%s - %s", channel, attachment), ex);
354 }
355
356
357
358
359
360
361
362
363
364
365
366
367
368 protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
369
370
371
372
373
374
375
376
377
378
379
380 public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
381
382 @Override
383 public String dump()
384 {
385 return ContainerLifeCycle.dump(this);
386 }
387
388 @Override
389 public void dump(Appendable out, String indent) throws IOException
390 {
391 ContainerLifeCycle.dumpObject(out, this);
392 ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
393 }
394
395 private enum State
396 {
397 CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
398 }
399
400
401
402
403
404
405
406 public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
407 {
408 private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
409 private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
410 private final int _id;
411 private Selector _selector;
412 private volatile Thread _thread;
413
414 public ManagedSelector(int id)
415 {
416 _id = id;
417 setStopTimeout(5000);
418 }
419
420 @Override
421 protected void doStart() throws Exception
422 {
423 super.doStart();
424 _selector = Selector.open();
425 _state.set(State.PROCESS);
426 }
427
428 @Override
429 protected void doStop() throws Exception
430 {
431 if (LOG.isDebugEnabled())
432 LOG.debug("Stopping {}", this);
433 Stop stop = new Stop();
434 submit(stop);
435 stop.await(getStopTimeout());
436 if (LOG.isDebugEnabled())
437 LOG.debug("Stopped {}", this);
438 }
439
440
441
442
443
444
445
446 public void updateKey(Runnable update)
447 {
448 if (__submitKeyUpdates)
449 {
450 submit(update);
451 }
452 else
453 {
454 runChange(update);
455 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
456 wakeup();
457 }
458 }
459
460
461
462
463
464
465
466
467 public void submit(Runnable change)
468 {
469
470
471
472
473
474 _changes.offer(change);
475 if (LOG.isDebugEnabled())
476 LOG.debug("Queued change {}", change);
477
478 out: while (true)
479 {
480 switch (_state.get())
481 {
482 case SELECT:
483
484 if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
485 continue;
486 wakeup();
487 break out;
488 case CHANGES:
489
490
491 if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
492 break out;
493 continue;
494 case WAKEUP:
495
496 break out;
497 case MORE_CHANGES:
498
499 break out;
500 case PROCESS:
501
502 break out;
503 default:
504 throw new IllegalStateException();
505 }
506 }
507 }
508
509 private void runChanges()
510 {
511 Runnable change;
512 while ((change = _changes.poll()) != null)
513 runChange(change);
514 }
515
516 protected void runChange(Runnable change)
517 {
518 try
519 {
520 if (LOG.isDebugEnabled())
521 LOG.debug("Running change {}", change);
522 change.run();
523 }
524 catch (Throwable x)
525 {
526 LOG.debug("Could not run change " + change, x);
527 }
528 }
529
530 @Override
531 public void run()
532 {
533 _thread = Thread.currentThread();
534 String name = _thread.getName();
535 int priority = _thread.getPriority();
536 try
537 {
538 if (_priorityDelta != 0)
539 _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _priorityDelta)));
540
541 _thread.setName(String.format("%s-selector-%s@%h/%d", name, SelectorManager.this.getClass().getSimpleName(), SelectorManager.this.hashCode(), _id));
542 if (LOG.isDebugEnabled())
543 LOG.debug("Starting {} on {}", _thread, this);
544 while (isRunning())
545 select();
546 while(isStopping())
547 runChanges();
548 }
549 finally
550 {
551 if (LOG.isDebugEnabled())
552 LOG.debug("Stopped {} on {}", _thread, this);
553 _thread.setName(name);
554 if (_priorityDelta != 0)
555 _thread.setPriority(priority);
556 }
557 }
558
559
560
561
562
563
564 public void select()
565 {
566 boolean debug = LOG.isDebugEnabled();
567 try
568 {
569 _state.set(State.CHANGES);
570
571
572 out: while(true)
573 {
574 switch (_state.get())
575 {
576 case CHANGES:
577 runChanges();
578 if (_state.compareAndSet(State.CHANGES, State.SELECT))
579 break out;
580 continue;
581 case MORE_CHANGES:
582 runChanges();
583 _state.set(State.CHANGES);
584 continue;
585 default:
586 throw new IllegalStateException();
587 }
588 }
589
590
591
592 assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
593
594 if (debug)
595 LOG.debug("Selector loop waiting on select");
596 int selected = _selector.select();
597 if (debug)
598 LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
599
600 _state.set(State.PROCESS);
601
602 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
603 for (SelectionKey key : selectedKeys)
604 {
605 if (key.isValid())
606 {
607 processKey(key);
608 }
609 else
610 {
611 if (debug)
612 LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
613 Object attachment = key.attachment();
614 if (attachment instanceof EndPoint)
615 ((EndPoint)attachment).close();
616 }
617 }
618 selectedKeys.clear();
619 }
620 catch (Throwable x)
621 {
622 if (isRunning())
623 LOG.warn(x);
624 else
625 LOG.ignore(x);
626 }
627 }
628
629 private void processKey(SelectionKey key)
630 {
631 Object attachment = key.attachment();
632 try
633 {
634 if (attachment instanceof SelectableEndPoint)
635 {
636 ((SelectableEndPoint)attachment).onSelected();
637 }
638 else if (key.isConnectable())
639 {
640 processConnect(key, (Connect)attachment);
641 }
642 else if (key.isAcceptable())
643 {
644 processAccept(key);
645 }
646 else
647 {
648 throw new IllegalStateException();
649 }
650 }
651 catch (CancelledKeyException x)
652 {
653 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
654 if (attachment instanceof EndPoint)
655 closeNoExceptions((EndPoint)attachment);
656 }
657 catch (Throwable x)
658 {
659 LOG.warn("Could not process key for channel " + key.channel(), x);
660 if (attachment instanceof EndPoint)
661 closeNoExceptions((EndPoint)attachment);
662 }
663 }
664
665 private void processConnect(SelectionKey key, Connect connect)
666 {
667 SocketChannel channel = (SocketChannel)key.channel();
668 try
669 {
670 key.attach(connect.attachment);
671 boolean connected = finishConnect(channel);
672 if (connected)
673 {
674 connect.timeout.cancel();
675 key.interestOps(0);
676 EndPoint endpoint = createEndPoint(channel, key);
677 key.attach(endpoint);
678 }
679 else
680 {
681 throw new ConnectException();
682 }
683 }
684 catch (Throwable x)
685 {
686 connect.failed(x);
687 }
688 }
689
690 private void processAccept(SelectionKey key)
691 {
692 ServerSocketChannel server = (ServerSocketChannel)key.channel();
693 SocketChannel channel = null;
694 try
695 {
696 while ((channel = server.accept()) != null)
697 {
698 accepted(channel);
699 }
700 }
701 catch (Throwable x)
702 {
703 closeNoExceptions(channel);
704 LOG.warn("Accept failed for channel " + channel, x);
705 }
706 }
707
708 private void closeNoExceptions(Closeable closeable)
709 {
710 try
711 {
712 if (closeable != null)
713 closeable.close();
714 }
715 catch (Throwable x)
716 {
717 LOG.ignore(x);
718 }
719 }
720
721 public void wakeup()
722 {
723 _selector.wakeup();
724 }
725
726 public boolean isSelectorThread()
727 {
728 return Thread.currentThread() == _thread;
729 }
730
731 private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
732 {
733 EndPoint endPoint = newEndPoint(channel, this, selectionKey);
734 endPointOpened(endPoint);
735 Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
736 endPoint.setConnection(connection);
737 connectionOpened(connection);
738 if (LOG.isDebugEnabled())
739 LOG.debug("Created {}", endPoint);
740 return endPoint;
741 }
742
743 public void destroyEndPoint(EndPoint endPoint)
744 {
745 if (LOG.isDebugEnabled())
746 LOG.debug("Destroyed {}", endPoint);
747 Connection connection = endPoint.getConnection();
748 if (connection != null)
749 connectionClosed(connection);
750 endPointClosed(endPoint);
751 }
752
753 @Override
754 public String dump()
755 {
756 return ContainerLifeCycle.dump(this);
757 }
758
759 @Override
760 public void dump(Appendable out, String indent) throws IOException
761 {
762 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
763
764 Thread selecting = _thread;
765
766 Object where = "not selecting";
767 StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
768 if (trace != null)
769 {
770 for (StackTraceElement t : trace)
771 if (t.getClassName().startsWith("org.eclipse.jetty."))
772 {
773 where = t;
774 break;
775 }
776 }
777
778 Selector selector = _selector;
779 if (selector != null && selector.isOpen())
780 {
781 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
782 dump.add(where);
783
784 DumpKeys dumpKeys = new DumpKeys(dump);
785 submit(dumpKeys);
786 dumpKeys.await(5, TimeUnit.SECONDS);
787
788 ContainerLifeCycle.dump(out, indent, dump);
789 }
790 }
791
792 public void dumpKeysState(List<Object> dumps)
793 {
794 Selector selector = _selector;
795 Set<SelectionKey> keys = selector.keys();
796 dumps.add(selector + " keys=" + keys.size());
797 for (SelectionKey key : keys)
798 {
799 if (key.isValid())
800 dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
801 else
802 dumps.add(key.attachment() + " iOps=-1 rOps=-1");
803 }
804 }
805
806 @Override
807 public String toString()
808 {
809 Selector selector = _selector;
810 return String.format("%s keys=%d selected=%d",
811 super.toString(),
812 selector != null && selector.isOpen() ? selector.keys().size() : -1,
813 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
814 }
815
816 private class DumpKeys implements Runnable
817 {
818 private final CountDownLatch latch = new CountDownLatch(1);
819 private final List<Object> _dumps;
820
821 private DumpKeys(List<Object> dumps)
822 {
823 this._dumps = dumps;
824 }
825
826 @Override
827 public void run()
828 {
829 dumpKeysState(_dumps);
830 latch.countDown();
831 }
832
833 public boolean await(long timeout, TimeUnit unit)
834 {
835 try
836 {
837 return latch.await(timeout, unit);
838 }
839 catch (InterruptedException x)
840 {
841 return false;
842 }
843 }
844 }
845
846 private class Acceptor implements Runnable
847 {
848 private final ServerSocketChannel _channel;
849
850 public Acceptor(ServerSocketChannel channel)
851 {
852 this._channel = channel;
853 }
854
855 @Override
856 public void run()
857 {
858 try
859 {
860 SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
861 if (LOG.isDebugEnabled())
862 LOG.debug("{} acceptor={}", this, key);
863 }
864 catch (Throwable x)
865 {
866 closeNoExceptions(_channel);
867 LOG.warn(x);
868 }
869 }
870 }
871
872 private class Accept implements Runnable
873 {
874 private final SocketChannel channel;
875 private final Object attachment;
876
877 private Accept(SocketChannel channel, Object attachment)
878 {
879 this.channel = channel;
880 this.attachment = attachment;
881 }
882
883 @Override
884 public void run()
885 {
886 try
887 {
888 SelectionKey key = channel.register(_selector, 0, attachment);
889 EndPoint endpoint = createEndPoint(channel, key);
890 key.attach(endpoint);
891 }
892 catch (Throwable x)
893 {
894 closeNoExceptions(channel);
895 LOG.debug(x);
896 }
897 }
898 }
899
900 private class Connect implements Runnable
901 {
902 private final AtomicBoolean failed = new AtomicBoolean();
903 private final SocketChannel channel;
904 private final Object attachment;
905 private final Scheduler.Task timeout;
906
907 private Connect(SocketChannel channel, Object attachment)
908 {
909 this.channel = channel;
910 this.attachment = attachment;
911 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
912 }
913
914 @Override
915 public void run()
916 {
917 try
918 {
919 channel.register(_selector, SelectionKey.OP_CONNECT, this);
920 }
921 catch (Throwable x)
922 {
923 failed(x);
924 }
925 }
926
927 private void failed(Throwable failure)
928 {
929 if (failed.compareAndSet(false, true))
930 {
931 timeout.cancel();
932 closeNoExceptions(channel);
933 connectionFailed(channel, failure, attachment);
934 }
935 }
936 }
937
938 private class ConnectTimeout implements Runnable
939 {
940 private final Connect connect;
941
942 private ConnectTimeout(Connect connect)
943 {
944 this.connect = connect;
945 }
946
947 @Override
948 public void run()
949 {
950 SocketChannel channel = connect.channel;
951 if (channel.isConnectionPending())
952 {
953 if (LOG.isDebugEnabled())
954 LOG.debug("Channel {} timed out while connecting, closing it", channel);
955 connect.failed(new SocketTimeoutException());
956 }
957 }
958 }
959
960 private class Stop implements Runnable
961 {
962 private final CountDownLatch latch = new CountDownLatch(1);
963
964 @Override
965 public void run()
966 {
967 try
968 {
969 for (SelectionKey key : _selector.keys())
970 {
971 Object attachment = key.attachment();
972 if (attachment instanceof EndPoint)
973 {
974 EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
975 execute(closer);
976
977
978
979
980 closer.await(getStopTimeout());
981 }
982 }
983
984 closeNoExceptions(_selector);
985 }
986 finally
987 {
988 latch.countDown();
989 }
990 }
991
992 public boolean await(long timeout)
993 {
994 try
995 {
996 return latch.await(timeout, TimeUnit.MILLISECONDS);
997 }
998 catch (InterruptedException x)
999 {
1000 return false;
1001 }
1002 }
1003 }
1004
1005 private class EndPointCloser implements Runnable
1006 {
1007 private final CountDownLatch latch = new CountDownLatch(1);
1008 private final EndPoint endPoint;
1009
1010 private EndPointCloser(EndPoint endPoint)
1011 {
1012 this.endPoint = endPoint;
1013 }
1014
1015 @Override
1016 public void run()
1017 {
1018 try
1019 {
1020 closeNoExceptions(endPoint.getConnection());
1021 }
1022 finally
1023 {
1024 latch.countDown();
1025 }
1026 }
1027
1028 private boolean await(long timeout)
1029 {
1030 try
1031 {
1032 return latch.await(timeout, TimeUnit.MILLISECONDS);
1033 }
1034 catch (InterruptedException x)
1035 {
1036 return false;
1037 }
1038 }
1039 }
1040 }
1041
1042
1043
1044
1045
1046 public interface SelectableEndPoint extends EndPoint
1047 {
1048
1049
1050
1051
1052 void onSelected();
1053 }
1054 }