1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.Socket;
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.ClosedChannelException;
29 import java.nio.channels.SelectionKey;
30 import java.nio.channels.Selector;
31 import java.nio.channels.ServerSocketChannel;
32 import java.nio.channels.SocketChannel;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.Set;
37 import java.util.concurrent.ConcurrentLinkedQueue;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.component.AbstractLifeCycle;
45 import org.eclipse.jetty.util.component.ContainerLifeCycle;
46 import org.eclipse.jetty.util.component.Dumpable;
47 import org.eclipse.jetty.util.log.Log;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.thread.Scheduler;
50
51
52
53
54
55
56
57 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
58 {
59 protected static final Logger LOG = Log.getLogger(SelectorManager.class);
60
61
62
63 public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
64
65 private final Executor executor;
66 private final Scheduler scheduler;
67 private final ManagedSelector[] _selectors;
68 private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
69 private long _selectorIndex;
70
71 protected SelectorManager(Executor executor, Scheduler scheduler)
72 {
73 this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
74 }
75
76 protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
77 {
78 this.executor = executor;
79 this.scheduler = scheduler;
80 _selectors = new ManagedSelector[selectors];
81 }
82
83 public Executor getExecutor()
84 {
85 return executor;
86 }
87
88 public Scheduler getScheduler()
89 {
90 return scheduler;
91 }
92
93
94
95
96
97
98 public long getConnectTimeout()
99 {
100 return _connectTimeout;
101 }
102
103
104
105
106
107
108 public void setConnectTimeout(long milliseconds)
109 {
110 _connectTimeout = milliseconds;
111 }
112
113
114
115
116
117
118 protected void execute(Runnable task)
119 {
120 executor.execute(task);
121 }
122
123
124
125
126 public int getSelectorCount()
127 {
128 return _selectors.length;
129 }
130
131 private ManagedSelector chooseSelector()
132 {
133
134
135
136 long s = _selectorIndex++;
137 int index = (int)(s % getSelectorCount());
138 return _selectors[index];
139 }
140
141
142
143
144
145
146
147
148
149 public void connect(SocketChannel channel, Object attachment)
150 {
151 ManagedSelector set = chooseSelector();
152 set.submit(set.new Connect(channel, attachment));
153 }
154
155
156
157
158
159
160
161
162 public void accept(final SocketChannel channel)
163 {
164 final ManagedSelector selector = chooseSelector();
165 selector.submit(selector.new Accept(channel));
166 }
167
168 @Override
169 protected void doStart() throws Exception
170 {
171 super.doStart();
172 for (int i = 0; i < _selectors.length; i++)
173 {
174 ManagedSelector selector = newSelector(i);
175 _selectors[i] = selector;
176 selector.start();
177 execute(selector);
178 }
179 }
180
181
182
183
184
185
186
187 protected ManagedSelector newSelector(int id)
188 {
189 return new ManagedSelector(id);
190 }
191
192 @Override
193 protected void doStop() throws Exception
194 {
195 for (ManagedSelector selector : _selectors)
196 selector.stop();
197 super.doStop();
198 }
199
200
201
202
203
204
205 protected void endPointOpened(EndPoint endpoint)
206 {
207 endpoint.onOpen();
208 }
209
210
211
212
213
214
215 protected void endPointClosed(EndPoint endpoint)
216 {
217 endpoint.onClose();
218 }
219
220
221
222
223
224
225 public void connectionOpened(Connection connection)
226 {
227 try
228 {
229 connection.onOpen();
230 }
231 catch (Exception x)
232 {
233 LOG.info("Exception while notifying connection " + connection, x);
234 }
235 }
236
237
238
239
240
241
242 public void connectionClosed(Connection connection)
243 {
244 try
245 {
246 connection.onClose();
247 }
248 catch (Exception x)
249 {
250 LOG.info("Exception while notifying connection " + connection, x);
251 }
252 }
253
254 protected boolean finishConnect(SocketChannel channel) throws IOException
255 {
256 return channel.finishConnect();
257 }
258
259
260
261
262
263
264
265
266
267 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
268 {
269 LOG.warn(String.format("%s - %s", channel, attachment), ex);
270 }
271
272
273
274
275
276
277
278
279
280
281
282
283
284 protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
285
286
287
288
289
290
291
292
293
294
295
296 public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
297
298 @Override
299 public String dump()
300 {
301 return ContainerLifeCycle.dump(this);
302 }
303
304 @Override
305 public void dump(Appendable out, String indent) throws IOException
306 {
307 ContainerLifeCycle.dumpObject(out, this);
308 ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
309 }
310
311
312
313
314
315
316
317 public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
318 {
319 private final Queue<Runnable> _changes = new ConcurrentLinkedQueue<>();
320 private final int _id;
321 private Selector _selector;
322 private volatile Thread _thread;
323 private boolean _needsWakeup = true;
324 private boolean _runningChanges = false;
325
326 public ManagedSelector(int id)
327 {
328 _id = id;
329 setStopTimeout(5000);
330 }
331
332 @Override
333 protected void doStart() throws Exception
334 {
335 super.doStart();
336 _selector = Selector.open();
337 }
338
339 @Override
340 protected void doStop() throws Exception
341 {
342 LOG.debug("Stopping {}", this);
343 Stop stop = new Stop();
344 submit(stop);
345 stop.await(getStopTimeout());
346 LOG.debug("Stopped {}", this);
347 }
348
349
350
351
352
353
354
355
356 public void submit(Runnable change)
357 {
358
359 if (_thread==Thread.currentThread())
360 {
361
362
363 if (_runningChanges)
364 _changes.offer(change);
365 else
366 {
367
368 runChanges();
369
370 runChange(change);
371 }
372 }
373 else
374 {
375
376 _changes.offer(change);
377 LOG.debug("Queued change {}", change);
378 boolean wakeup = _needsWakeup;
379 if (wakeup)
380 wakeup();
381 }
382 }
383
384 private void runChanges()
385 {
386 try
387 {
388 if (_runningChanges)
389 throw new IllegalStateException();
390 _runningChanges=true;
391
392 Runnable change;
393 while ((change = _changes.poll()) != null)
394 runChange(change);
395 }
396 finally
397 {
398 _runningChanges=false;
399 }
400 }
401
402 protected void runChange(Runnable change)
403 {
404 LOG.debug("Running change {}", change);
405 change.run();
406 }
407
408 @Override
409 public void run()
410 {
411 _thread = Thread.currentThread();
412 String name = _thread.getName();
413 try
414 {
415 _thread.setName(name + "-selector-" + _id);
416 LOG.debug("Starting {} on {}", _thread, this);
417 while (isRunning())
418 select();
419 processChanges();
420 }
421 finally
422 {
423 LOG.debug("Stopped {} on {}", _thread, this);
424 _thread.setName(name);
425 }
426 }
427
428
429
430
431
432
433 public void select()
434 {
435 boolean debug = LOG.isDebugEnabled();
436 try
437 {
438 processChanges();
439
440 if (debug)
441 LOG.debug("Selector loop waiting on select");
442 int selected = _selector.select();
443 if (debug)
444 LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
445
446 _needsWakeup = false;
447
448 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
449 for (SelectionKey key : selectedKeys)
450 {
451 if (key.isValid())
452 {
453 processKey(key);
454 }
455 else
456 {
457 if (debug)
458 LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
459 Object attachment = key.attachment();
460 if (attachment instanceof EndPoint)
461 ((EndPoint)attachment).close();
462 }
463 }
464 selectedKeys.clear();
465 }
466 catch (Exception x)
467 {
468 if (isRunning())
469 LOG.warn(x);
470 else
471 LOG.ignore(x);
472 }
473 }
474
475 private void processChanges()
476 {
477 runChanges();
478
479
480
481
482 _needsWakeup = true;
483
484
485
486 runChanges();
487 }
488
489 private void processKey(SelectionKey key)
490 {
491 Object attachment = key.attachment();
492 try
493 {
494 if (attachment instanceof SelectableEndPoint)
495 {
496 ((SelectableEndPoint)attachment).onSelected();
497 }
498 else if (key.isConnectable())
499 {
500 processConnect(key, (Connect)attachment);
501 }
502 else
503 {
504 throw new IllegalStateException();
505 }
506 }
507 catch (CancelledKeyException x)
508 {
509 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
510 if (attachment instanceof EndPoint)
511 ((EndPoint)attachment).close();
512 }
513 catch (Exception x)
514 {
515 LOG.warn("Could not process key for channel " + key.channel(), x);
516 if (attachment instanceof EndPoint)
517 ((EndPoint)attachment).close();
518 }
519 }
520
521 private void processConnect(SelectionKey key, Connect connect)
522 {
523 key.attach(connect.attachment);
524 SocketChannel channel = (SocketChannel)key.channel();
525 try
526 {
527 boolean connected = finishConnect(channel);
528 if (connected)
529 {
530 connect.timeout.cancel();
531 key.interestOps(0);
532 EndPoint endpoint = createEndPoint(channel, key);
533 key.attach(endpoint);
534 }
535 else
536 {
537 throw new ConnectException();
538 }
539 }
540 catch (Exception x)
541 {
542 connect.failed(x);
543 closeNoExceptions(channel);
544 }
545 }
546
547 private void closeNoExceptions(Closeable closeable)
548 {
549 try
550 {
551 closeable.close();
552 }
553 catch (IOException x)
554 {
555 LOG.ignore(x);
556 }
557 }
558
559 public void wakeup()
560 {
561 _selector.wakeup();
562 }
563
564 public boolean isSelectorThread()
565 {
566 return Thread.currentThread() == _thread;
567 }
568
569 private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
570 {
571 EndPoint endPoint = newEndPoint(channel, this, selectionKey);
572 endPointOpened(endPoint);
573 Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
574 endPoint.setConnection(connection);
575 connectionOpened(connection);
576 LOG.debug("Created {}", endPoint);
577 return endPoint;
578 }
579
580 public void destroyEndPoint(EndPoint endPoint)
581 {
582 LOG.debug("Destroyed {}", endPoint);
583 Connection connection = endPoint.getConnection();
584 if (connection != null)
585 connectionClosed(connection);
586 endPointClosed(endPoint);
587 }
588
589 @Override
590 public String dump()
591 {
592 return ContainerLifeCycle.dump(this);
593 }
594
595 @Override
596 public void dump(Appendable out, String indent) throws IOException
597 {
598 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
599
600 Thread selecting = _thread;
601
602 Object where = "not selecting";
603 StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
604 if (trace != null)
605 {
606 for (StackTraceElement t : trace)
607 if (t.getClassName().startsWith("org.eclipse.jetty."))
608 {
609 where = t;
610 break;
611 }
612 }
613
614 Selector selector = _selector;
615 if (selector != null && selector.isOpen())
616 {
617 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
618 dump.add(where);
619
620 DumpKeys dumpKeys = new DumpKeys(dump);
621 submit(dumpKeys);
622 dumpKeys.await(5, TimeUnit.SECONDS);
623
624 ContainerLifeCycle.dump(out, indent, dump);
625 }
626 }
627
628 public void dumpKeysState(List<Object> dumps)
629 {
630 Selector selector = _selector;
631 Set<SelectionKey> keys = selector.keys();
632 dumps.add(selector + " keys=" + keys.size());
633 for (SelectionKey key : keys)
634 {
635 if (key.isValid())
636 dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
637 else
638 dumps.add(key.attachment() + " iOps=-1 rOps=-1");
639 }
640 }
641
642 @Override
643 public String toString()
644 {
645 Selector selector = _selector;
646 return String.format("%s keys=%d selected=%d",
647 super.toString(),
648 selector != null && selector.isOpen() ? selector.keys().size() : -1,
649 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
650 }
651
652 private class DumpKeys implements Runnable
653 {
654 private final CountDownLatch latch = new CountDownLatch(1);
655 private final List<Object> _dumps;
656
657 private DumpKeys(List<Object> dumps)
658 {
659 this._dumps = dumps;
660 }
661
662 @Override
663 public void run()
664 {
665 dumpKeysState(_dumps);
666 latch.countDown();
667 }
668
669 public boolean await(long timeout, TimeUnit unit)
670 {
671 try
672 {
673 return latch.await(timeout, unit);
674 }
675 catch (InterruptedException x)
676 {
677 return false;
678 }
679 }
680 }
681
682 private class Accept implements Runnable
683 {
684 private final SocketChannel _channel;
685
686 public Accept(SocketChannel channel)
687 {
688 this._channel = channel;
689 }
690
691 @Override
692 public void run()
693 {
694 try
695 {
696 SelectionKey key = _channel.register(_selector, 0, null);
697 EndPoint endpoint = createEndPoint(_channel, key);
698 key.attach(endpoint);
699 }
700 catch (IOException x)
701 {
702 LOG.debug(x);
703 }
704 }
705 }
706
707 private class Connect implements Runnable
708 {
709 private final AtomicBoolean failed = new AtomicBoolean();
710 private final SocketChannel channel;
711 private final Object attachment;
712 private final Scheduler.Task timeout;
713
714 public Connect(SocketChannel channel, Object attachment)
715 {
716 this.channel = channel;
717 this.attachment = attachment;
718 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
719 }
720
721 @Override
722 public void run()
723 {
724 try
725 {
726 channel.register(_selector, SelectionKey.OP_CONNECT, this);
727 }
728 catch (ClosedChannelException x)
729 {
730 LOG.debug(x);
731 }
732 }
733
734 protected void failed(Throwable failure)
735 {
736 if (failed.compareAndSet(false, true))
737 connectionFailed(channel, failure, attachment);
738 }
739 }
740
741 private class ConnectTimeout implements Runnable
742 {
743 private final Connect connect;
744
745 private ConnectTimeout(Connect connect)
746 {
747 this.connect = connect;
748 }
749
750 @Override
751 public void run()
752 {
753 SocketChannel channel = connect.channel;
754 if (channel.isConnectionPending())
755 {
756 LOG.debug("Channel {} timed out while connecting, closing it", channel);
757 try
758 {
759
760 channel.close();
761 }
762 catch (IOException x)
763 {
764 LOG.ignore(x);
765 }
766 finally
767 {
768 connect.failed(new SocketTimeoutException());
769 }
770 }
771 }
772 }
773
774 private class Stop implements Runnable
775 {
776 private final CountDownLatch latch = new CountDownLatch(1);
777
778 @Override
779 public void run()
780 {
781 try
782 {
783 for (SelectionKey key : _selector.keys())
784 {
785 Object attachment = key.attachment();
786 if (attachment instanceof EndPoint)
787 {
788 EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
789 execute(closer);
790
791
792
793
794 closer.await(getStopTimeout());
795 }
796 }
797
798 closeNoExceptions(_selector);
799 }
800 finally
801 {
802 latch.countDown();
803 }
804 }
805
806 public boolean await(long timeout)
807 {
808 try
809 {
810 return latch.await(timeout, TimeUnit.MILLISECONDS);
811 }
812 catch (InterruptedException x)
813 {
814 return false;
815 }
816 }
817 }
818
819 private class EndPointCloser implements Runnable
820 {
821 private final CountDownLatch latch = new CountDownLatch(1);
822 private final EndPoint endPoint;
823
824 private EndPointCloser(EndPoint endPoint)
825 {
826 this.endPoint = endPoint;
827 }
828
829 @Override
830 public void run()
831 {
832 try
833 {
834 endPoint.getConnection().close();
835 }
836 finally
837 {
838 latch.countDown();
839 }
840 }
841
842 private boolean await(long timeout)
843 {
844 try
845 {
846 return latch.await(timeout, TimeUnit.MILLISECONDS);
847 }
848 catch (InterruptedException x)
849 {
850 return false;
851 }
852 }
853 }
854 }
855
856
857
858
859
860 public interface SelectableEndPoint extends EndPoint
861 {
862
863
864
865
866 void onSelected();
867 }
868 }