1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.io;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.SocketTimeoutException;
25 import java.nio.channels.CancelledKeyException;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.ServerSocketChannel;
29 import java.nio.channels.SocketChannel;
30 import java.util.ArrayDeque;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40
41 import org.eclipse.jetty.util.component.AbstractLifeCycle;
42 import org.eclipse.jetty.util.component.ContainerLifeCycle;
43 import org.eclipse.jetty.util.component.Dumpable;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46 import org.eclipse.jetty.util.thread.ExecutionStrategy;
47 import org.eclipse.jetty.util.thread.Locker;
48 import org.eclipse.jetty.util.thread.Scheduler;
49
50
51
52
53
54
55
56 public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
57 {
58 private static final Logger LOG = Log.getLogger(ManagedSelector.class);
59
60 private final Locker _locker = new Locker();
61 private boolean _selecting = false;
62 private final Queue<Runnable> _actions = new ArrayDeque<>();
63 private final SelectorManager _selectorManager;
64 private final int _id;
65 private final ExecutionStrategy _strategy;
66 private Selector _selector;
67
68 public ManagedSelector(SelectorManager selectorManager, int id)
69 {
70 _selectorManager = selectorManager;
71 _id = id;
72 _strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor());
73 setStopTimeout(5000);
74 }
75
76 @Override
77 protected void doStart() throws Exception
78 {
79 super.doStart();
80 _selector = newSelector();
81 }
82
83 protected Selector newSelector() throws IOException
84 {
85 return Selector.open();
86 }
87
88 public int size()
89 {
90 Selector s = _selector;
91 if (s == null)
92 return 0;
93 return s.keys().size();
94 }
95
96 @Override
97 protected void doStop() throws Exception
98 {
99 if (LOG.isDebugEnabled())
100 LOG.debug("Stopping {}", this);
101 CloseEndPoints close_endps = new CloseEndPoints();
102 submit(close_endps);
103 close_endps.await(getStopTimeout());
104 super.doStop();
105 CloseSelector close_selector = new CloseSelector();
106 submit(close_selector);
107 close_selector.await(getStopTimeout());
108
109 if (LOG.isDebugEnabled())
110 LOG.debug("Stopped {}", this);
111 }
112
113 public void submit(Runnable change)
114 {
115 if (LOG.isDebugEnabled())
116 LOG.debug("Queued change {} on {}", change, this);
117
118 Selector selector = null;
119 try (Locker.Lock lock = _locker.lock())
120 {
121 _actions.offer(change);
122 if (_selecting)
123 {
124 selector = _selector;
125
126 _selecting = false;
127 }
128 }
129 if (selector != null)
130 selector.wakeup();
131 }
132
133 @Override
134 public void run()
135 {
136 _strategy.execute();
137 }
138
139
140
141
142
143 public interface SelectableEndPoint extends EndPoint
144 {
145
146
147
148
149
150
151 Runnable onSelected();
152
153
154
155
156
157 void updateKey();
158 }
159
160 private class SelectorProducer implements ExecutionStrategy.Producer
161 {
162 private Set<SelectionKey> _keys = Collections.emptySet();
163 private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
164
165 @Override
166 public Runnable produce()
167 {
168 while (true)
169 {
170 Runnable task = processSelected();
171 if (task != null)
172 return task;
173
174 Runnable action = runActions();
175 if (action != null)
176 return action;
177
178 update();
179
180 if (!select())
181 return null;
182 }
183 }
184
185 private Runnable runActions()
186 {
187 while (true)
188 {
189 Runnable action;
190 try (Locker.Lock lock = _locker.lock())
191 {
192 action = _actions.poll();
193 if (action == null)
194 {
195
196 _selecting = true;
197 return null;
198 }
199 }
200
201 if (action instanceof Product)
202 return action;
203
204
205 runChange(action);
206 }
207 }
208
209 private void runChange(Runnable change)
210 {
211 try
212 {
213 if (LOG.isDebugEnabled())
214 LOG.debug("Running change {}", change);
215 change.run();
216 }
217 catch (Throwable x)
218 {
219 LOG.debug("Could not run change " + change, x);
220 }
221 }
222
223 private boolean select()
224 {
225 try
226 {
227 Selector selector = _selector;
228 if (selector != null && selector.isOpen())
229 {
230 if (LOG.isDebugEnabled())
231 LOG.debug("Selector loop waiting on select");
232 int selected = selector.select();
233 if (LOG.isDebugEnabled())
234 LOG.debug("Selector loop woken up from select, {}/{} selected", selected, selector.keys().size());
235
236 try (Locker.Lock lock = _locker.lock())
237 {
238
239 _selecting = false;
240 }
241
242 _keys = selector.selectedKeys();
243 _cursor = _keys.iterator();
244
245 return true;
246 }
247 }
248 catch (Throwable x)
249 {
250 closeNoExceptions(_selector);
251 if (isRunning())
252 LOG.warn(x);
253 else
254 LOG.debug(x);
255 }
256 return false;
257 }
258
259 private Runnable processSelected()
260 {
261 while (_cursor.hasNext())
262 {
263 SelectionKey key = _cursor.next();
264 if (key.isValid())
265 {
266 Object attachment = key.attachment();
267 try
268 {
269 if (attachment instanceof SelectableEndPoint)
270 {
271
272 Runnable task = ((SelectableEndPoint)attachment).onSelected();
273 if (task != null)
274 return task;
275 }
276 else if (key.isConnectable())
277 {
278 Runnable task = processConnect(key, (Connect)attachment);
279 if (task != null)
280 return task;
281 }
282 else if (key.isAcceptable())
283 {
284 processAccept(key);
285 }
286 else
287 {
288 throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps());
289 }
290 }
291 catch (CancelledKeyException x)
292 {
293 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
294 if (attachment instanceof EndPoint)
295 closeNoExceptions((EndPoint)attachment);
296 }
297 catch (Throwable x)
298 {
299 LOG.warn("Could not process key for channel " + key.channel(), x);
300 if (attachment instanceof EndPoint)
301 closeNoExceptions((EndPoint)attachment);
302 }
303 }
304 else
305 {
306 if (LOG.isDebugEnabled())
307 LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
308 Object attachment = key.attachment();
309 if (attachment instanceof EndPoint)
310 closeNoExceptions((EndPoint)attachment);
311 }
312 }
313 return null;
314 }
315
316 private void update()
317 {
318 for (SelectionKey key : _keys)
319 updateKey(key);
320 _keys.clear();
321 }
322
323 private void updateKey(SelectionKey key)
324 {
325 Object attachment = key.attachment();
326 if (attachment instanceof SelectableEndPoint)
327 ((SelectableEndPoint)attachment).updateKey();
328 }
329 }
330
331 private interface Product extends Runnable
332 {
333 }
334
335 private Runnable processConnect(SelectionKey key, final Connect connect)
336 {
337 SocketChannel channel = (SocketChannel)key.channel();
338 try
339 {
340 key.attach(connect.attachment);
341 boolean connected = _selectorManager.finishConnect(channel);
342 if (LOG.isDebugEnabled())
343 LOG.debug("Connected {} {}", connected, channel);
344 if (connected)
345 {
346 if (connect.timeout.cancel())
347 {
348 key.interestOps(0);
349 return new CreateEndPoint(channel, key)
350 {
351 @Override
352 protected void failed(Throwable failure)
353 {
354 super.failed(failure);
355 connect.failed(failure);
356 }
357 };
358 }
359 else
360 {
361 throw new SocketTimeoutException("Concurrent Connect Timeout");
362 }
363 }
364 else
365 {
366 throw new ConnectException();
367 }
368 }
369 catch (Throwable x)
370 {
371 connect.failed(x);
372 return null;
373 }
374 }
375
376 private void processAccept(SelectionKey key)
377 {
378 ServerSocketChannel server = (ServerSocketChannel)key.channel();
379 SocketChannel channel = null;
380 try
381 {
382 while ((channel = server.accept()) != null)
383 {
384 _selectorManager.accepted(channel);
385 }
386 }
387 catch (Throwable x)
388 {
389 closeNoExceptions(channel);
390 LOG.warn("Accept failed for channel " + channel, x);
391 }
392 }
393
394 private void closeNoExceptions(Closeable closeable)
395 {
396 try
397 {
398 if (closeable != null)
399 closeable.close();
400 }
401 catch (Throwable x)
402 {
403 LOG.ignore(x);
404 }
405 }
406
407 private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
408 {
409 EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
410 _selectorManager.endPointOpened(endPoint);
411 Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
412 endPoint.setConnection(connection);
413 selectionKey.attach(endPoint);
414 _selectorManager.connectionOpened(connection);
415 if (LOG.isDebugEnabled())
416 LOG.debug("Created {}", endPoint);
417 return endPoint;
418 }
419
420 public void destroyEndPoint(final EndPoint endPoint)
421 {
422 final Connection connection = endPoint.getConnection();
423 submit(new Product()
424 {
425 @Override
426 public void run()
427 {
428 if (LOG.isDebugEnabled())
429 LOG.debug("Destroyed {}", endPoint);
430 if (connection != null)
431 _selectorManager.connectionClosed(connection);
432 _selectorManager.endPointClosed(endPoint);
433 }
434 });
435 }
436
437 @Override
438 public String dump()
439 {
440 return ContainerLifeCycle.dump(this);
441 }
442
443 @Override
444 public void dump(Appendable out, String indent) throws IOException
445 {
446 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator());
447
448 Selector selector = _selector;
449 if (selector != null && selector.isOpen())
450 {
451 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
452
453 DumpKeys dumpKeys = new DumpKeys(dump);
454 submit(dumpKeys);
455 dumpKeys.await(5, TimeUnit.SECONDS);
456
457 ContainerLifeCycle.dump(out, indent, dump);
458 }
459 }
460
461 @Override
462 public String toString()
463 {
464 Selector selector = _selector;
465 return String.format("%s id=%s keys=%d selected=%d",
466 super.toString(),
467 _id,
468 selector != null && selector.isOpen() ? selector.keys().size() : -1,
469 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
470 }
471
472 private class DumpKeys implements Runnable
473 {
474 private final CountDownLatch latch = new CountDownLatch(1);
475 private final List<Object> _dumps;
476
477 private DumpKeys(List<Object> dumps)
478 {
479 this._dumps = dumps;
480 }
481
482 @Override
483 public void run()
484 {
485 Selector selector = _selector;
486 if (selector != null && selector.isOpen())
487 {
488 Set<SelectionKey> keys = selector.keys();
489 _dumps.add(selector + " keys=" + keys.size());
490 for (SelectionKey key : keys)
491 {
492 try
493 {
494 _dumps.add(String.format("SelectionKey@%x{i=%d}->%s", key.hashCode(), key.interestOps(), key.attachment()));
495 }
496 catch (Throwable x)
497 {
498 LOG.ignore(x);
499 }
500 }
501 }
502 latch.countDown();
503 }
504
505 public boolean await(long timeout, TimeUnit unit)
506 {
507 try
508 {
509 return latch.await(timeout, unit);
510 }
511 catch (InterruptedException x)
512 {
513 return false;
514 }
515 }
516 }
517
518 class Acceptor implements Runnable
519 {
520 private final ServerSocketChannel _channel;
521
522 public Acceptor(ServerSocketChannel channel)
523 {
524 this._channel = channel;
525 }
526
527 @Override
528 public void run()
529 {
530 try
531 {
532 SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
533 if (LOG.isDebugEnabled())
534 LOG.debug("{} acceptor={}", this, key);
535 }
536 catch (Throwable x)
537 {
538 closeNoExceptions(_channel);
539 LOG.warn(x);
540 }
541 }
542 }
543
544 class Accept implements Runnable
545 {
546 private final SocketChannel channel;
547 private final Object attachment;
548
549 Accept(SocketChannel channel, Object attachment)
550 {
551 this.channel = channel;
552 this.attachment = attachment;
553 }
554
555 @Override
556 public void run()
557 {
558 try
559 {
560 final SelectionKey key = channel.register(_selector, 0, attachment);
561 submit(new CreateEndPoint(channel, key));
562 }
563 catch (Throwable x)
564 {
565 closeNoExceptions(channel);
566 LOG.debug(x);
567 }
568 }
569 }
570
571 private class CreateEndPoint implements Product
572 {
573 private final SocketChannel channel;
574 private final SelectionKey key;
575
576 public CreateEndPoint(SocketChannel channel, SelectionKey key)
577 {
578 this.channel = channel;
579 this.key = key;
580 }
581
582 @Override
583 public void run()
584 {
585 try
586 {
587 createEndPoint(channel, key);
588 }
589 catch (Throwable x)
590 {
591 LOG.debug(x);
592 failed(x);
593 }
594 }
595
596 protected void failed(Throwable failure)
597 {
598 closeNoExceptions(channel);
599 LOG.debug(failure);
600 }
601 }
602
603 class Connect implements Runnable
604 {
605 private final AtomicBoolean failed = new AtomicBoolean();
606 private final SocketChannel channel;
607 private final Object attachment;
608 private final Scheduler.Task timeout;
609
610 Connect(SocketChannel channel, Object attachment)
611 {
612 this.channel = channel;
613 this.attachment = attachment;
614 this.timeout = ManagedSelector.this._selectorManager.getScheduler().schedule(new ConnectTimeout(this), ManagedSelector.this._selectorManager.getConnectTimeout(), TimeUnit.MILLISECONDS);
615 }
616
617 @Override
618 public void run()
619 {
620 try
621 {
622 channel.register(_selector, SelectionKey.OP_CONNECT, this);
623 }
624 catch (Throwable x)
625 {
626 failed(x);
627 }
628 }
629
630 private void failed(Throwable failure)
631 {
632 if (failed.compareAndSet(false, true))
633 {
634 timeout.cancel();
635 closeNoExceptions(channel);
636 ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment);
637 }
638 }
639 }
640
641 private class ConnectTimeout implements Runnable
642 {
643 private final Connect connect;
644
645 private ConnectTimeout(Connect connect)
646 {
647 this.connect = connect;
648 }
649
650 @Override
651 public void run()
652 {
653 SocketChannel channel = connect.channel;
654 if (channel.isConnectionPending())
655 {
656 if (LOG.isDebugEnabled())
657 LOG.debug("Channel {} timed out while connecting, closing it", channel);
658 connect.failed(new SocketTimeoutException("Connect Timeout"));
659 }
660 }
661 }
662
663 private class CloseEndPoints implements Runnable
664 {
665 private final CountDownLatch _latch = new CountDownLatch(1);
666 private CountDownLatch _allClosed;
667
668 @Override
669 public void run()
670 {
671 List<EndPoint> end_points = new ArrayList<>();
672 for (SelectionKey key : _selector.keys())
673 {
674 if (key.isValid())
675 {
676 Object attachment = key.attachment();
677 if (attachment instanceof EndPoint)
678 end_points.add((EndPoint)attachment);
679 }
680 }
681
682 int size = end_points.size();
683 if (LOG.isDebugEnabled())
684 LOG.debug("Closing {} endPoints on {}", size, ManagedSelector.this);
685
686 _allClosed = new CountDownLatch(size);
687 _latch.countDown();
688
689 for (EndPoint endp : end_points)
690 submit(new EndPointCloser(endp, _allClosed));
691
692 if (LOG.isDebugEnabled())
693 LOG.debug("Closed {} endPoints on {}", size, ManagedSelector.this);
694 }
695
696 public boolean await(long timeout)
697 {
698 try
699 {
700 return _latch.await(timeout, TimeUnit.MILLISECONDS) &&
701 _allClosed.await(timeout, TimeUnit.MILLISECONDS);
702 }
703 catch (InterruptedException x)
704 {
705 return false;
706 }
707 }
708 }
709
710 private class EndPointCloser implements Product
711 {
712 private final EndPoint _endPoint;
713 private final CountDownLatch _latch;
714
715 private EndPointCloser(EndPoint endPoint, CountDownLatch latch)
716 {
717 _endPoint = endPoint;
718 _latch = latch;
719 }
720
721 @Override
722 public void run()
723 {
724 closeNoExceptions(_endPoint.getConnection());
725 _latch.countDown();
726 }
727 }
728
729 private class CloseSelector implements Runnable
730 {
731 private CountDownLatch _latch = new CountDownLatch(1);
732
733 @Override
734 public void run()
735 {
736 Selector selector = _selector;
737 _selector = null;
738 closeNoExceptions(selector);
739 _latch.countDown();
740 }
741
742 public boolean await(long timeout)
743 {
744 try
745 {
746 return _latch.await(timeout, TimeUnit.MILLISECONDS);
747 }
748 catch (InterruptedException x)
749 {
750 return false;
751 }
752 }
753 }
754 }