1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.Socket;
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import org.eclipse.jetty.util.ConcurrentArrayQueue;
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.component.AbstractLifeCycle;
45 import org.eclipse.jetty.util.component.ContainerLifeCycle;
46 import org.eclipse.jetty.util.component.Dumpable;
47 import org.eclipse.jetty.util.log.Log;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.thread.Scheduler;
50
51
52
53
54
55
56
57 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
58 {
59 public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
60 public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
61 protected static final Logger LOG = Log.getLogger(SelectorManager.class);
62 private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
63
64 private final Executor executor;
65 private final Scheduler scheduler;
66 private final ManagedSelector[] _selectors;
67 private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
68 private long _selectorIndex;
69
70 protected SelectorManager(Executor executor, Scheduler scheduler)
71 {
72 this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
73 }
74
75 protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
76 {
77 if (selectors<=0)
78 throw new IllegalArgumentException("No selectors");
79 this.executor = executor;
80 this.scheduler = scheduler;
81 _selectors = new ManagedSelector[selectors];
82 }
83
84 public Executor getExecutor()
85 {
86 return executor;
87 }
88
89 public Scheduler getScheduler()
90 {
91 return scheduler;
92 }
93
94
95
96
97
98
99 public long getConnectTimeout()
100 {
101 return _connectTimeout;
102 }
103
104
105
106
107
108
109 public void setConnectTimeout(long milliseconds)
110 {
111 _connectTimeout = milliseconds;
112 }
113
114
115
116
117
118
119 protected void execute(Runnable task)
120 {
121 executor.execute(task);
122 }
123
124
125
126
127 public int getSelectorCount()
128 {
129 return _selectors.length;
130 }
131
132 private ManagedSelector chooseSelector()
133 {
134
135
136
137 long s = _selectorIndex++;
138 int index = (int)(s % getSelectorCount());
139 return _selectors[index];
140 }
141
142
143
144
145
146
147
148
149
150 public void connect(SocketChannel channel, Object attachment)
151 {
152 ManagedSelector set = chooseSelector();
153 set.submit(set.new Connect(channel, attachment));
154 }
155
156
157
158
159
160
161
162
163 public void accept(final SocketChannel channel)
164 {
165 final ManagedSelector selector = chooseSelector();
166 selector.submit(selector.new Accept(channel));
167 }
168
169
170
171
172
173
174
175
176
177 public void acceptor(final ServerSocketChannel server)
178 {
179 final ManagedSelector selector = chooseSelector();
180 selector.submit(selector.new Acceptor(server));
181 }
182
183
184
185
186
187
188
189
190
191
192 protected void accepted(SocketChannel channel) throws IOException
193 {
194 throw new UnsupportedOperationException();
195 }
196
197 @Override
198 protected void doStart() throws Exception
199 {
200 super.doStart();
201 for (int i = 0; i < _selectors.length; i++)
202 {
203 ManagedSelector selector = newSelector(i);
204 _selectors[i] = selector;
205 selector.start();
206 execute(selector);
207 }
208 }
209
210
211
212
213
214
215
216 protected ManagedSelector newSelector(int id)
217 {
218 return new ManagedSelector(id);
219 }
220
221 @Override
222 protected void doStop() throws Exception
223 {
224 for (ManagedSelector selector : _selectors)
225 selector.stop();
226 super.doStop();
227 }
228
229
230
231
232
233
234 protected void endPointOpened(EndPoint endpoint)
235 {
236 endpoint.onOpen();
237 }
238
239
240
241
242
243
244 protected void endPointClosed(EndPoint endpoint)
245 {
246 endpoint.onClose();
247 }
248
249
250
251
252
253
254 public void connectionOpened(Connection connection)
255 {
256 try
257 {
258 connection.onOpen();
259 }
260 catch (Throwable x)
261 {
262 if (isRunning())
263 LOG.warn("Exception while notifying connection " + connection, x);
264 else
265 LOG.debug("Exception while notifying connection {}",connection, x);
266 }
267 }
268
269
270
271
272
273
274 public void connectionClosed(Connection connection)
275 {
276 try
277 {
278 connection.onClose();
279 }
280 catch (Throwable x)
281 {
282 LOG.debug("Exception while notifying connection " + connection, x);
283 }
284 }
285
286 protected boolean finishConnect(SocketChannel channel) throws IOException
287 {
288 return channel.finishConnect();
289 }
290
291
292
293
294
295
296
297
298
299 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
300 {
301 LOG.warn(String.format("%s - %s", channel, attachment), ex);
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315
316 protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
317
318
319
320
321
322
323
324
325
326
327
328 public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
329
330 @Override
331 public String dump()
332 {
333 return ContainerLifeCycle.dump(this);
334 }
335
336 @Override
337 public void dump(Appendable out, String indent) throws IOException
338 {
339 ContainerLifeCycle.dumpObject(out, this);
340 ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
341 }
342
343 private enum State
344 {
345 CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
346 }
347
348
349
350
351
352
353
354 public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
355 {
356 private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
357 private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
358 private final int _id;
359 private Selector _selector;
360 private volatile Thread _thread;
361
362 public ManagedSelector(int id)
363 {
364 _id = id;
365 setStopTimeout(5000);
366 }
367
368 @Override
369 protected void doStart() throws Exception
370 {
371 super.doStart();
372 _selector = Selector.open();
373 _state.set(State.PROCESS);
374 }
375
376 @Override
377 protected void doStop() throws Exception
378 {
379 LOG.debug("Stopping {}", this);
380 Stop stop = new Stop();
381 submit(stop);
382 stop.await(getStopTimeout());
383 LOG.debug("Stopped {}", this);
384 }
385
386
387
388
389
390
391
392 public void updateKey(Runnable update)
393 {
394 if (__submitKeyUpdates)
395 {
396 submit(update);
397 }
398 else
399 {
400 runChange(update);
401 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
402 wakeup();
403 }
404 }
405
406
407
408
409
410
411
412
413 public void submit(Runnable change)
414 {
415
416
417
418
419
420 _changes.offer(change);
421 LOG.debug("Queued change {}", change);
422
423 out: while (true)
424 {
425 switch (_state.get())
426 {
427 case SELECT:
428
429 if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
430 continue;
431 wakeup();
432 break out;
433 case CHANGES:
434
435
436 if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
437 break out;
438 continue;
439 case WAKEUP:
440
441 break out;
442 case MORE_CHANGES:
443
444 break out;
445 case PROCESS:
446
447 break out;
448 default:
449 throw new IllegalStateException();
450 }
451 }
452 }
453
454 private void runChanges()
455 {
456 Runnable change;
457 while ((change = _changes.poll()) != null)
458 runChange(change);
459 }
460
461 protected void runChange(Runnable change)
462 {
463 try
464 {
465 LOG.debug("Running change {}", change);
466 change.run();
467 }
468 catch (Throwable x)
469 {
470 LOG.debug("Could not run change " + change, x);
471 }
472 }
473
474 @Override
475 public void run()
476 {
477 _thread = Thread.currentThread();
478 String name = _thread.getName();
479 try
480 {
481 _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id);
482 LOG.debug("Starting {} on {}", _thread, this);
483 while (isRunning())
484 select();
485 runChanges();
486 }
487 finally
488 {
489 LOG.debug("Stopped {} on {}", _thread, this);
490 _thread.setName(name);
491 }
492 }
493
494
495
496
497
498
499 public void select()
500 {
501 boolean debug = LOG.isDebugEnabled();
502 try
503 {
504 _state.set(State.CHANGES);
505
506
507 out: while(true)
508 {
509 switch (_state.get())
510 {
511 case CHANGES:
512 runChanges();
513 if (_state.compareAndSet(State.CHANGES, State.SELECT))
514 break out;
515 continue;
516 case MORE_CHANGES:
517 runChanges();
518 _state.set(State.CHANGES);
519 continue;
520 default:
521 throw new IllegalStateException();
522 }
523 }
524
525
526
527 assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
528
529 if (debug)
530 LOG.debug("Selector loop waiting on select");
531 int selected = _selector.select();
532 if (debug)
533 LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
534
535 _state.set(State.PROCESS);
536
537 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
538 for (SelectionKey key : selectedKeys)
539 {
540 if (key.isValid())
541 {
542 processKey(key);
543 }
544 else
545 {
546 if (debug)
547 LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
548 Object attachment = key.attachment();
549 if (attachment instanceof EndPoint)
550 ((EndPoint)attachment).close();
551 }
552 }
553 selectedKeys.clear();
554 }
555 catch (Throwable x)
556 {
557 if (isRunning())
558 LOG.warn(x);
559 else
560 LOG.ignore(x);
561 }
562 }
563
564 private void processKey(SelectionKey key)
565 {
566 Object attachment = key.attachment();
567 try
568 {
569 if (attachment instanceof SelectableEndPoint)
570 {
571 ((SelectableEndPoint)attachment).onSelected();
572 }
573 else if (key.isConnectable())
574 {
575 processConnect(key, (Connect)attachment);
576 }
577 else if (key.isAcceptable())
578 {
579 processAccept(key);
580 }
581 else
582 {
583 throw new IllegalStateException();
584 }
585 }
586 catch (CancelledKeyException x)
587 {
588 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
589 if (attachment instanceof EndPoint)
590 closeNoExceptions((EndPoint)attachment);
591 }
592 catch (Throwable x)
593 {
594 LOG.warn("Could not process key for channel " + key.channel(), x);
595 if (attachment instanceof EndPoint)
596 closeNoExceptions((EndPoint)attachment);
597 }
598 }
599
600 private void processConnect(SelectionKey key, Connect connect)
601 {
602 SocketChannel channel = (SocketChannel)key.channel();
603 try
604 {
605 key.attach(connect.attachment);
606 boolean connected = finishConnect(channel);
607 if (connected)
608 {
609 connect.timeout.cancel();
610 key.interestOps(0);
611 EndPoint endpoint = createEndPoint(channel, key);
612 key.attach(endpoint);
613 }
614 else
615 {
616 throw new ConnectException();
617 }
618 }
619 catch (Throwable x)
620 {
621 connect.failed(x);
622 }
623 }
624
625 private void processAccept(SelectionKey key)
626 {
627 ServerSocketChannel server = (ServerSocketChannel)key.channel();
628 SocketChannel channel = null;
629 try
630 {
631 while ((channel = server.accept()) != null)
632 {
633 accepted(channel);
634 }
635 }
636 catch (Throwable x)
637 {
638 closeNoExceptions(channel);
639 LOG.warn("Accept failed for channel " + channel, x);
640 }
641 }
642
643 private void closeNoExceptions(Closeable closeable)
644 {
645 try
646 {
647 if (closeable != null)
648 closeable.close();
649 }
650 catch (Throwable x)
651 {
652 LOG.ignore(x);
653 }
654 }
655
656 public void wakeup()
657 {
658 _selector.wakeup();
659 }
660
661 public boolean isSelectorThread()
662 {
663 return Thread.currentThread() == _thread;
664 }
665
666 private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
667 {
668 EndPoint endPoint = newEndPoint(channel, this, selectionKey);
669 endPointOpened(endPoint);
670 Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
671 endPoint.setConnection(connection);
672 connectionOpened(connection);
673 LOG.debug("Created {}", endPoint);
674 return endPoint;
675 }
676
677 public void destroyEndPoint(EndPoint endPoint)
678 {
679 LOG.debug("Destroyed {}", endPoint);
680 Connection connection = endPoint.getConnection();
681 if (connection != null)
682 connectionClosed(connection);
683 endPointClosed(endPoint);
684 }
685
686 @Override
687 public String dump()
688 {
689 return ContainerLifeCycle.dump(this);
690 }
691
692 @Override
693 public void dump(Appendable out, String indent) throws IOException
694 {
695 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
696
697 Thread selecting = _thread;
698
699 Object where = "not selecting";
700 StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
701 if (trace != null)
702 {
703 for (StackTraceElement t : trace)
704 if (t.getClassName().startsWith("org.eclipse.jetty."))
705 {
706 where = t;
707 break;
708 }
709 }
710
711 Selector selector = _selector;
712 if (selector != null && selector.isOpen())
713 {
714 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
715 dump.add(where);
716
717 DumpKeys dumpKeys = new DumpKeys(dump);
718 submit(dumpKeys);
719 dumpKeys.await(5, TimeUnit.SECONDS);
720
721 ContainerLifeCycle.dump(out, indent, dump);
722 }
723 }
724
725 public void dumpKeysState(List<Object> dumps)
726 {
727 Selector selector = _selector;
728 Set<SelectionKey> keys = selector.keys();
729 dumps.add(selector + " keys=" + keys.size());
730 for (SelectionKey key : keys)
731 {
732 if (key.isValid())
733 dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
734 else
735 dumps.add(key.attachment() + " iOps=-1 rOps=-1");
736 }
737 }
738
739 @Override
740 public String toString()
741 {
742 Selector selector = _selector;
743 return String.format("%s keys=%d selected=%d",
744 super.toString(),
745 selector != null && selector.isOpen() ? selector.keys().size() : -1,
746 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
747 }
748
749 private class DumpKeys implements Runnable
750 {
751 private final CountDownLatch latch = new CountDownLatch(1);
752 private final List<Object> _dumps;
753
754 private DumpKeys(List<Object> dumps)
755 {
756 this._dumps = dumps;
757 }
758
759 @Override
760 public void run()
761 {
762 dumpKeysState(_dumps);
763 latch.countDown();
764 }
765
766 public boolean await(long timeout, TimeUnit unit)
767 {
768 try
769 {
770 return latch.await(timeout, unit);
771 }
772 catch (InterruptedException x)
773 {
774 return false;
775 }
776 }
777 }
778
779 private class Acceptor implements Runnable
780 {
781 private final ServerSocketChannel _channel;
782
783 public Acceptor(ServerSocketChannel channel)
784 {
785 this._channel = channel;
786 }
787
788 @Override
789 public void run()
790 {
791 try
792 {
793 SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
794 LOG.debug("{} acceptor={}", this, key);
795 }
796 catch (Throwable x)
797 {
798 closeNoExceptions(_channel);
799 LOG.warn(x);
800 }
801 }
802 }
803
804 private class Accept implements Runnable
805 {
806 private final SocketChannel _channel;
807
808 public Accept(SocketChannel channel)
809 {
810 this._channel = channel;
811 }
812
813 @Override
814 public void run()
815 {
816 try
817 {
818 SelectionKey key = _channel.register(_selector, 0, null);
819 EndPoint endpoint = createEndPoint(_channel, key);
820 key.attach(endpoint);
821 }
822 catch (Throwable x)
823 {
824 closeNoExceptions(_channel);
825 LOG.debug(x);
826 }
827 }
828 }
829
830 private class Connect implements Runnable
831 {
832 private final AtomicBoolean failed = new AtomicBoolean();
833 private final SocketChannel channel;
834 private final Object attachment;
835 private final Scheduler.Task timeout;
836
837 public Connect(SocketChannel channel, Object attachment)
838 {
839 this.channel = channel;
840 this.attachment = attachment;
841 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
842 }
843
844 @Override
845 public void run()
846 {
847 try
848 {
849 channel.register(_selector, SelectionKey.OP_CONNECT, this);
850 }
851 catch (Throwable x)
852 {
853 failed(x);
854 }
855 }
856
857 protected void failed(Throwable failure)
858 {
859 if (failed.compareAndSet(false, true))
860 {
861 timeout.cancel();
862 closeNoExceptions(channel);
863 connectionFailed(channel, failure, attachment);
864 }
865 }
866 }
867
868 private class ConnectTimeout implements Runnable
869 {
870 private final Connect connect;
871
872 private ConnectTimeout(Connect connect)
873 {
874 this.connect = connect;
875 }
876
877 @Override
878 public void run()
879 {
880 SocketChannel channel = connect.channel;
881 if (channel.isConnectionPending())
882 {
883 LOG.debug("Channel {} timed out while connecting, closing it", channel);
884 connect.failed(new SocketTimeoutException());
885 }
886 }
887 }
888
889 private class Stop implements Runnable
890 {
891 private final CountDownLatch latch = new CountDownLatch(1);
892
893 @Override
894 public void run()
895 {
896 try
897 {
898 for (SelectionKey key : _selector.keys())
899 {
900 Object attachment = key.attachment();
901 if (attachment instanceof EndPoint)
902 {
903 EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
904 execute(closer);
905
906
907
908
909 closer.await(getStopTimeout());
910 }
911 }
912
913 closeNoExceptions(_selector);
914 }
915 finally
916 {
917 latch.countDown();
918 }
919 }
920
921 public boolean await(long timeout)
922 {
923 try
924 {
925 return latch.await(timeout, TimeUnit.MILLISECONDS);
926 }
927 catch (InterruptedException x)
928 {
929 return false;
930 }
931 }
932 }
933
934 private class EndPointCloser implements Runnable
935 {
936 private final CountDownLatch latch = new CountDownLatch(1);
937 private final EndPoint endPoint;
938
939 private EndPointCloser(EndPoint endPoint)
940 {
941 this.endPoint = endPoint;
942 }
943
944 @Override
945 public void run()
946 {
947 try
948 {
949 closeNoExceptions(endPoint.getConnection());
950 }
951 finally
952 {
953 latch.countDown();
954 }
955 }
956
957 private boolean await(long timeout)
958 {
959 try
960 {
961 return latch.await(timeout, TimeUnit.MILLISECONDS);
962 }
963 catch (InterruptedException x)
964 {
965 return false;
966 }
967 }
968 }
969 }
970
971
972
973
974
975 public interface SelectableEndPoint extends EndPoint
976 {
977
978
979
980
981 void onSelected();
982 }
983 }