1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.Socket;
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40
41 import org.eclipse.jetty.util.ConcurrentArrayQueue;
42 import org.eclipse.jetty.util.TypeUtil;
43 import org.eclipse.jetty.util.component.AbstractLifeCycle;
44 import org.eclipse.jetty.util.component.ContainerLifeCycle;
45 import org.eclipse.jetty.util.component.Dumpable;
46 import org.eclipse.jetty.util.log.Log;
47 import org.eclipse.jetty.util.log.Logger;
48 import org.eclipse.jetty.util.thread.Scheduler;
49
50
51
52
53
54
55
56 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
57 {
58 protected static final Logger LOG = Log.getLogger(SelectorManager.class);
59
60
61
62 public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
63
64 private final Executor executor;
65 private final Scheduler scheduler;
66 private final ManagedSelector[] _selectors;
67 private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
68 private long _selectorIndex;
69
70 protected SelectorManager(Executor executor, Scheduler scheduler)
71 {
72 this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
73 }
74
75 protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
76 {
77 this.executor = executor;
78 this.scheduler = scheduler;
79 _selectors = new ManagedSelector[selectors];
80 }
81
82 public Executor getExecutor()
83 {
84 return executor;
85 }
86
87 public Scheduler getScheduler()
88 {
89 return scheduler;
90 }
91
92
93
94
95
96
97 public long getConnectTimeout()
98 {
99 return _connectTimeout;
100 }
101
102
103
104
105
106
107 public void setConnectTimeout(long milliseconds)
108 {
109 _connectTimeout = milliseconds;
110 }
111
112
113
114
115
116
117 protected void execute(Runnable task)
118 {
119 executor.execute(task);
120 }
121
122
123
124
125 public int getSelectorCount()
126 {
127 return _selectors.length;
128 }
129
130 private ManagedSelector chooseSelector()
131 {
132
133
134
135 long s = _selectorIndex++;
136 int index = (int)(s % getSelectorCount());
137 return _selectors[index];
138 }
139
140
141
142
143
144
145
146
147
148 public void connect(SocketChannel channel, Object attachment)
149 {
150 ManagedSelector set = chooseSelector();
151 set.submit(set.new Connect(channel, attachment));
152 }
153
154
155
156
157
158
159
160
161 public void accept(final SocketChannel channel)
162 {
163 final ManagedSelector selector = chooseSelector();
164 selector.submit(selector.new Accept(channel));
165 }
166
167 @Override
168 protected void doStart() throws Exception
169 {
170 super.doStart();
171 for (int i = 0; i < _selectors.length; i++)
172 {
173 ManagedSelector selector = newSelector(i);
174 _selectors[i] = selector;
175 selector.start();
176 execute(selector);
177 }
178 }
179
180
181
182
183
184
185
186 protected ManagedSelector newSelector(int id)
187 {
188 return new ManagedSelector(id);
189 }
190
191 @Override
192 protected void doStop() throws Exception
193 {
194 for (ManagedSelector selector : _selectors)
195 selector.stop();
196 super.doStop();
197 }
198
199
200
201
202
203
204 protected void endPointOpened(EndPoint endpoint)
205 {
206 endpoint.onOpen();
207 }
208
209
210
211
212
213
214 protected void endPointClosed(EndPoint endpoint)
215 {
216 endpoint.onClose();
217 }
218
219
220
221
222
223
224 public void connectionOpened(Connection connection)
225 {
226 try
227 {
228 connection.onOpen();
229 }
230 catch (Throwable x)
231 {
232 if (isRunning())
233 LOG.warn("Exception while notifying connection " + connection, x);
234 else
235 LOG.debug("Exception while notifying connection {}",connection, x);
236 }
237 }
238
239
240
241
242
243
244 public void connectionClosed(Connection connection)
245 {
246 try
247 {
248 connection.onClose();
249 }
250 catch (Throwable x)
251 {
252 LOG.debug("Exception while notifying connection " + connection, x);
253 }
254 }
255
256 protected boolean finishConnect(SocketChannel channel) throws IOException
257 {
258 return channel.finishConnect();
259 }
260
261
262
263
264
265
266
267
268
269 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
270 {
271 LOG.warn(String.format("%s - %s", channel, attachment), ex);
272 }
273
274
275
276
277
278
279
280
281
282
283
284
285
286 protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
287
288
289
290
291
292
293
294
295
296
297
298 public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
299
300 @Override
301 public String dump()
302 {
303 return ContainerLifeCycle.dump(this);
304 }
305
306 @Override
307 public void dump(Appendable out, String indent) throws IOException
308 {
309 ContainerLifeCycle.dumpObject(out, this);
310 ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
311 }
312
313
314
315
316
317
318
319 public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
320 {
321 private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
322
323 private final int _id;
324 private Selector _selector;
325 private volatile Thread _thread;
326 private boolean _needsWakeup = true;
327 private boolean _runningChanges = false;
328
329 public ManagedSelector(int id)
330 {
331 _id = id;
332 setStopTimeout(5000);
333 }
334
335 @Override
336 protected void doStart() throws Exception
337 {
338 super.doStart();
339 _selector = Selector.open();
340 }
341
342 @Override
343 protected void doStop() throws Exception
344 {
345 LOG.debug("Stopping {}", this);
346 Stop stop = new Stop();
347 submit(stop);
348 stop.await(getStopTimeout());
349 LOG.debug("Stopped {}", this);
350 }
351
352
353
354
355
356
357
358
359 public void submit(Runnable change)
360 {
361
362 if (_thread==Thread.currentThread())
363 {
364
365
366 if (_runningChanges)
367 _changes.offer(change);
368 else
369 {
370
371 runChanges();
372
373 runChange(change);
374 }
375 }
376 else
377 {
378
379 _changes.offer(change);
380 LOG.debug("Queued change {}", change);
381 boolean wakeup = _needsWakeup;
382 if (wakeup)
383 wakeup();
384 }
385 }
386
387 private void runChanges()
388 {
389 try
390 {
391 if (_runningChanges)
392 throw new IllegalStateException();
393 _runningChanges=true;
394
395 Runnable change;
396 while ((change = _changes.poll()) != null)
397 runChange(change);
398 }
399 finally
400 {
401 _runningChanges=false;
402 }
403 }
404
405 protected void runChange(Runnable change)
406 {
407 try
408 {
409 LOG.debug("Running change {}", change);
410 change.run();
411 }
412 catch (Throwable x)
413 {
414 LOG.debug("Could not run change " + change, x);
415 }
416 }
417
418 @Override
419 public void run()
420 {
421 _thread = Thread.currentThread();
422 String name = _thread.getName();
423 try
424 {
425 _thread.setName(name + "-selector-" + _id);
426 LOG.debug("Starting {} on {}", _thread, this);
427 while (isRunning())
428 select();
429 processChanges();
430 }
431 finally
432 {
433 LOG.debug("Stopped {} on {}", _thread, this);
434 _thread.setName(name);
435 }
436 }
437
438
439
440
441
442
443 public void select()
444 {
445 boolean debug = LOG.isDebugEnabled();
446 try
447 {
448 processChanges();
449
450 if (debug)
451 LOG.debug("Selector loop waiting on select");
452 int selected = _selector.select();
453 if (debug)
454 LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
455
456 _needsWakeup = false;
457
458 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
459 for (SelectionKey key : selectedKeys)
460 {
461 if (key.isValid())
462 {
463 processKey(key);
464 }
465 else
466 {
467 if (debug)
468 LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
469 Object attachment = key.attachment();
470 if (attachment instanceof EndPoint)
471 ((EndPoint)attachment).close();
472 }
473 }
474 selectedKeys.clear();
475 }
476 catch (Throwable x)
477 {
478 if (isRunning())
479 LOG.warn(x);
480 else
481 LOG.ignore(x);
482 }
483 }
484
485 private void processChanges()
486 {
487 runChanges();
488
489
490
491
492 _needsWakeup = true;
493
494
495
496 runChanges();
497 }
498
499 private void processKey(SelectionKey key)
500 {
501 Object attachment = key.attachment();
502 try
503 {
504 if (attachment instanceof SelectableEndPoint)
505 {
506 ((SelectableEndPoint)attachment).onSelected();
507 }
508 else if (key.isConnectable())
509 {
510 processConnect(key, (Connect)attachment);
511 }
512 else
513 {
514 throw new IllegalStateException();
515 }
516 }
517 catch (CancelledKeyException x)
518 {
519 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
520 if (attachment instanceof EndPoint)
521 ((EndPoint)attachment).close();
522 }
523 catch (Throwable x)
524 {
525 LOG.warn("Could not process key for channel " + key.channel(), x);
526 if (attachment instanceof EndPoint)
527 ((EndPoint)attachment).close();
528 }
529 }
530
531 private void processConnect(SelectionKey key, Connect connect)
532 {
533 SocketChannel channel = (SocketChannel)key.channel();
534 try
535 {
536 key.attach(connect.attachment);
537 boolean connected = finishConnect(channel);
538 if (connected)
539 {
540 connect.timeout.cancel();
541 key.interestOps(0);
542 EndPoint endpoint = createEndPoint(channel, key);
543 key.attach(endpoint);
544 }
545 else
546 {
547 throw new ConnectException();
548 }
549 }
550 catch (Throwable x)
551 {
552 connect.failed(x);
553 }
554 }
555
556 private void closeNoExceptions(Closeable closeable)
557 {
558 try
559 {
560 closeable.close();
561 }
562 catch (Throwable x)
563 {
564 LOG.ignore(x);
565 }
566 }
567
568 public void wakeup()
569 {
570 _selector.wakeup();
571 }
572
573 public boolean isSelectorThread()
574 {
575 return Thread.currentThread() == _thread;
576 }
577
578 private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
579 {
580 EndPoint endPoint = newEndPoint(channel, this, selectionKey);
581 endPointOpened(endPoint);
582 Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
583 endPoint.setConnection(connection);
584 connectionOpened(connection);
585 LOG.debug("Created {}", endPoint);
586 return endPoint;
587 }
588
589 public void destroyEndPoint(EndPoint endPoint)
590 {
591 LOG.debug("Destroyed {}", endPoint);
592 Connection connection = endPoint.getConnection();
593 if (connection != null)
594 connectionClosed(connection);
595 endPointClosed(endPoint);
596 }
597
598 @Override
599 public String dump()
600 {
601 return ContainerLifeCycle.dump(this);
602 }
603
604 @Override
605 public void dump(Appendable out, String indent) throws IOException
606 {
607 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
608
609 Thread selecting = _thread;
610
611 Object where = "not selecting";
612 StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
613 if (trace != null)
614 {
615 for (StackTraceElement t : trace)
616 if (t.getClassName().startsWith("org.eclipse.jetty."))
617 {
618 where = t;
619 break;
620 }
621 }
622
623 Selector selector = _selector;
624 if (selector != null && selector.isOpen())
625 {
626 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
627 dump.add(where);
628
629 DumpKeys dumpKeys = new DumpKeys(dump);
630 submit(dumpKeys);
631 dumpKeys.await(5, TimeUnit.SECONDS);
632
633 ContainerLifeCycle.dump(out, indent, dump);
634 }
635 }
636
637 public void dumpKeysState(List<Object> dumps)
638 {
639 Selector selector = _selector;
640 Set<SelectionKey> keys = selector.keys();
641 dumps.add(selector + " keys=" + keys.size());
642 for (SelectionKey key : keys)
643 {
644 if (key.isValid())
645 dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
646 else
647 dumps.add(key.attachment() + " iOps=-1 rOps=-1");
648 }
649 }
650
651 @Override
652 public String toString()
653 {
654 Selector selector = _selector;
655 return String.format("%s keys=%d selected=%d",
656 super.toString(),
657 selector != null && selector.isOpen() ? selector.keys().size() : -1,
658 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
659 }
660
661 private class DumpKeys implements Runnable
662 {
663 private final CountDownLatch latch = new CountDownLatch(1);
664 private final List<Object> _dumps;
665
666 private DumpKeys(List<Object> dumps)
667 {
668 this._dumps = dumps;
669 }
670
671 @Override
672 public void run()
673 {
674 dumpKeysState(_dumps);
675 latch.countDown();
676 }
677
678 public boolean await(long timeout, TimeUnit unit)
679 {
680 try
681 {
682 return latch.await(timeout, unit);
683 }
684 catch (InterruptedException x)
685 {
686 return false;
687 }
688 }
689 }
690
691 private class Accept implements Runnable
692 {
693 private final SocketChannel _channel;
694
695 public Accept(SocketChannel channel)
696 {
697 this._channel = channel;
698 }
699
700 @Override
701 public void run()
702 {
703 try
704 {
705 SelectionKey key = _channel.register(_selector, 0, null);
706 EndPoint endpoint = createEndPoint(_channel, key);
707 key.attach(endpoint);
708 }
709 catch (Throwable x)
710 {
711 closeNoExceptions(_channel);
712 LOG.debug(x);
713 }
714 }
715 }
716
717 private class Connect implements Runnable
718 {
719 private final AtomicBoolean failed = new AtomicBoolean();
720 private final SocketChannel channel;
721 private final Object attachment;
722 private final Scheduler.Task timeout;
723
724 public Connect(SocketChannel channel, Object attachment)
725 {
726 this.channel = channel;
727 this.attachment = attachment;
728 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
729 }
730
731 @Override
732 public void run()
733 {
734 try
735 {
736 channel.register(_selector, SelectionKey.OP_CONNECT, this);
737 }
738 catch (Throwable x)
739 {
740 failed(x);
741 }
742 }
743
744 protected void failed(Throwable failure)
745 {
746 if (failed.compareAndSet(false, true))
747 {
748 timeout.cancel();
749 closeNoExceptions(channel);
750 connectionFailed(channel, failure, attachment);
751 }
752 }
753 }
754
755 private class ConnectTimeout implements Runnable
756 {
757 private final Connect connect;
758
759 private ConnectTimeout(Connect connect)
760 {
761 this.connect = connect;
762 }
763
764 @Override
765 public void run()
766 {
767 SocketChannel channel = connect.channel;
768 if (channel.isConnectionPending())
769 {
770 LOG.debug("Channel {} timed out while connecting, closing it", channel);
771 connect.failed(new SocketTimeoutException());
772 }
773 }
774 }
775
776 private class Stop implements Runnable
777 {
778 private final CountDownLatch latch = new CountDownLatch(1);
779
780 @Override
781 public void run()
782 {
783 try
784 {
785 for (SelectionKey key : _selector.keys())
786 {
787 Object attachment = key.attachment();
788 if (attachment instanceof EndPoint)
789 {
790 EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
791 execute(closer);
792
793
794
795
796 closer.await(getStopTimeout());
797 }
798 }
799
800 closeNoExceptions(_selector);
801 }
802 finally
803 {
804 latch.countDown();
805 }
806 }
807
808 public boolean await(long timeout)
809 {
810 try
811 {
812 return latch.await(timeout, TimeUnit.MILLISECONDS);
813 }
814 catch (InterruptedException x)
815 {
816 return false;
817 }
818 }
819 }
820
821 private class EndPointCloser implements Runnable
822 {
823 private final CountDownLatch latch = new CountDownLatch(1);
824 private final EndPoint endPoint;
825
826 private EndPointCloser(EndPoint endPoint)
827 {
828 this.endPoint = endPoint;
829 }
830
831 @Override
832 public void run()
833 {
834 try
835 {
836 closeNoExceptions(endPoint.getConnection());
837 }
838 finally
839 {
840 latch.countDown();
841 }
842 }
843
844 private boolean await(long timeout)
845 {
846 try
847 {
848 return latch.await(timeout, TimeUnit.MILLISECONDS);
849 }
850 catch (InterruptedException x)
851 {
852 return false;
853 }
854 }
855 }
856 }
857
858
859
860
861
862 public interface SelectableEndPoint extends EndPoint
863 {
864
865
866
867
868 void onSelected();
869 }
870 }