1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.net.Socket;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashSet;
28 import java.util.LinkedHashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.TimeUnit;
37
38 import org.eclipse.jetty.io.ArrayByteBufferPool;
39 import org.eclipse.jetty.io.ByteBufferPool;
40 import org.eclipse.jetty.io.EndPoint;
41 import org.eclipse.jetty.io.ssl.SslConnection;
42 import org.eclipse.jetty.util.FutureCallback;
43 import org.eclipse.jetty.util.StringUtil;
44 import org.eclipse.jetty.util.annotation.ManagedAttribute;
45 import org.eclipse.jetty.util.annotation.ManagedObject;
46 import org.eclipse.jetty.util.component.ContainerLifeCycle;
47 import org.eclipse.jetty.util.component.Dumpable;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50 import org.eclipse.jetty.util.ssl.SslContextFactory;
51 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
52 import org.eclipse.jetty.util.thread.Scheduler;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 @ManagedObject("Abstract implementation of the Connector Interface")
137 public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
138 {
139 protected final Logger LOG = Log.getLogger(getClass());
140
141 private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>();
142 private final Server _server;
143 private final Executor _executor;
144 private final Scheduler _scheduler;
145 private final ByteBufferPool _byteBufferPool;
146 private final Thread[] _acceptors;
147 private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());
148 private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
149 private volatile CountDownLatch _stopping;
150 private long _idleTimeout = 30000;
151 private String _defaultProtocol;
152 private ConnectionFactory _defaultConnectionFactory;
153 private String _name;
154 private int _acceptorPriorityDelta;
155
156
157
158
159
160
161
162
163
164
165 public AbstractConnector(
166 Server server,
167 Executor executor,
168 Scheduler scheduler,
169 ByteBufferPool pool,
170 int acceptors,
171 ConnectionFactory... factories)
172 {
173 _server=server;
174 _executor=executor!=null?executor:_server.getThreadPool();
175 if (scheduler==null)
176 scheduler=_server.getBean(Scheduler.class);
177 _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
178 if (pool==null)
179 pool=_server.getBean(ByteBufferPool.class);
180 _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
181
182 addBean(_server,false);
183 addBean(_executor);
184 if (executor==null)
185 unmanage(_executor);
186 addBean(_scheduler);
187 addBean(_byteBufferPool);
188
189 for (ConnectionFactory factory:factories)
190 addConnectionFactory(factory);
191
192 int cores = Runtime.getRuntime().availableProcessors();
193 if (acceptors < 0)
194 acceptors=Math.max(1, Math.min(4,cores/8));
195 if (acceptors > cores)
196 LOG.warn("Acceptors should be <= availableProcessors: " + this);
197 _acceptors = new Thread[acceptors];
198 }
199
200
201 @Override
202 public Server getServer()
203 {
204 return _server;
205 }
206
207 @Override
208 public Executor getExecutor()
209 {
210 return _executor;
211 }
212
213 @Override
214 public ByteBufferPool getByteBufferPool()
215 {
216 return _byteBufferPool;
217 }
218
219 @Override
220 @ManagedAttribute("Idle timeout")
221 public long getIdleTimeout()
222 {
223 return _idleTimeout;
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 public void setIdleTimeout(long idleTimeout)
240 {
241 _idleTimeout = idleTimeout;
242 }
243
244
245
246
247 @ManagedAttribute("number of acceptor threads")
248 public int getAcceptors()
249 {
250 return _acceptors.length;
251 }
252
253 @Override
254 protected void doStart() throws Exception
255 {
256 _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
257 if(_defaultConnectionFactory==null)
258 throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
259
260 super.doStart();
261
262 _stopping=new CountDownLatch(_acceptors.length);
263 for (int i = 0; i < _acceptors.length; i++)
264 {
265 Acceptor a = new Acceptor(i);
266 addBean(a);
267 getExecutor().execute(a);
268 }
269
270 LOG.info("Started {}", this);
271 }
272
273
274 protected void interruptAcceptors()
275 {
276 synchronized (this)
277 {
278 for (Thread thread : _acceptors)
279 {
280 if (thread != null)
281 thread.interrupt();
282 }
283 }
284 }
285
286 @Override
287 public Future<Void> shutdown()
288 {
289 return new FutureCallback(true);
290 }
291
292 @Override
293 protected void doStop() throws Exception
294 {
295
296 interruptAcceptors();
297
298
299 long stopTimeout = getStopTimeout();
300 CountDownLatch stopping=_stopping;
301 if (stopTimeout > 0 && stopping!=null)
302 stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
303 _stopping=null;
304
305 super.doStop();
306
307 for (Acceptor a : getBeans(Acceptor.class))
308 removeBean(a);
309
310 LOG.info("Stopped {}", this);
311 }
312
313 public void join() throws InterruptedException
314 {
315 join(0);
316 }
317
318 public void join(long timeout) throws InterruptedException
319 {
320 synchronized (this)
321 {
322 for (Thread thread : _acceptors)
323 if (thread != null)
324 thread.join(timeout);
325 }
326 }
327
328 protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
329
330
331
332
333
334
335 protected boolean isAccepting()
336 {
337 return isRunning();
338 }
339
340 @Override
341 public ConnectionFactory getConnectionFactory(String protocol)
342 {
343 synchronized (_factories)
344 {
345 return _factories.get(StringUtil.asciiToLowerCase(protocol));
346 }
347 }
348
349 @Override
350 public <T> T getConnectionFactory(Class<T> factoryType)
351 {
352 synchronized (_factories)
353 {
354 for (ConnectionFactory f : _factories.values())
355 if (factoryType.isAssignableFrom(f.getClass()))
356 return (T)f;
357 return null;
358 }
359 }
360
361 public void addConnectionFactory(ConnectionFactory factory)
362 {
363 synchronized (_factories)
364 {
365 Set<ConnectionFactory> to_remove = new HashSet<>();
366 for (String key:factory.getProtocols())
367 {
368 key=StringUtil.asciiToLowerCase(key);
369 ConnectionFactory old=_factories.remove(key);
370 if (old!=null)
371 {
372 if (old.getProtocol().equals(_defaultProtocol))
373 _defaultProtocol=null;
374 to_remove.add(old);
375 }
376 _factories.put(key, factory);
377 }
378
379
380 for (ConnectionFactory f : _factories.values())
381 to_remove.remove(f);
382
383
384 for (ConnectionFactory old: to_remove)
385 {
386 removeBean(old);
387 if (LOG.isDebugEnabled())
388 LOG.debug("{} removed {}", this, old);
389 }
390
391
392 addBean(factory);
393 if (_defaultProtocol==null)
394 _defaultProtocol=factory.getProtocol();
395 if (LOG.isDebugEnabled())
396 LOG.debug("{} added {}", this, factory);
397 }
398 }
399
400 public void addFirstConnectionFactory(ConnectionFactory factory)
401 {
402 synchronized (_factories)
403 {
404 List<ConnectionFactory> existings = new ArrayList<>(_factories.values());
405 _factories.clear();
406 addConnectionFactory(factory);
407 for (ConnectionFactory existing : existings)
408 addConnectionFactory(existing);
409 _defaultProtocol = factory.getProtocol();
410 }
411 }
412
413 public void addIfAbsentConnectionFactory(ConnectionFactory factory)
414 {
415 synchronized (_factories)
416 {
417 String key=StringUtil.asciiToLowerCase(factory.getProtocol());
418 if (_factories.containsKey(key))
419 {
420 if (LOG.isDebugEnabled())
421 LOG.debug("{} addIfAbsent ignored {}", this, factory);
422 }
423 else
424 {
425 _factories.put(key, factory);
426 addBean(factory);
427 if (_defaultProtocol==null)
428 _defaultProtocol=factory.getProtocol();
429 if (LOG.isDebugEnabled())
430 LOG.debug("{} addIfAbsent added {}", this, factory);
431 }
432 }
433 }
434
435 public ConnectionFactory removeConnectionFactory(String protocol)
436 {
437 synchronized (_factories)
438 {
439 ConnectionFactory factory= _factories.remove(StringUtil.asciiToLowerCase(protocol));
440 removeBean(factory);
441 return factory;
442 }
443 }
444
445 @Override
446 public Collection<ConnectionFactory> getConnectionFactories()
447 {
448 synchronized (_factories)
449 {
450 return _factories.values();
451 }
452 }
453
454 public void setConnectionFactories(Collection<ConnectionFactory> factories)
455 {
456 synchronized (_factories)
457 {
458 List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
459 for (ConnectionFactory factory: existing)
460 removeConnectionFactory(factory.getProtocol());
461 for (ConnectionFactory factory: factories)
462 if (factory!=null)
463 addConnectionFactory(factory);
464 }
465 }
466
467 @ManagedAttribute("The priority delta to apply to acceptor threads")
468 public int getAcceptorPriorityDelta()
469 {
470 return _acceptorPriorityDelta;
471 }
472
473
474
475
476
477
478
479
480
481 public void setAcceptorPriorityDelta(int acceptorPriorityDelta)
482 {
483 int old=_acceptorPriorityDelta;
484 _acceptorPriorityDelta = acceptorPriorityDelta;
485 if (old!=acceptorPriorityDelta && isStarted())
486 {
487 for (Thread thread : _acceptors)
488 thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,thread.getPriority()-old+acceptorPriorityDelta)));
489 }
490 }
491
492 @Override
493 @ManagedAttribute("Protocols supported by this connector")
494 public List<String> getProtocols()
495 {
496 synchronized (_factories)
497 {
498 return new ArrayList<>(_factories.keySet());
499 }
500 }
501
502 public void clearConnectionFactories()
503 {
504 synchronized (_factories)
505 {
506 _factories.clear();
507 }
508 }
509
510 @ManagedAttribute("This connector's default protocol")
511 public String getDefaultProtocol()
512 {
513 return _defaultProtocol;
514 }
515
516 public void setDefaultProtocol(String defaultProtocol)
517 {
518 _defaultProtocol = StringUtil.asciiToLowerCase(defaultProtocol);
519 if (isRunning())
520 _defaultConnectionFactory=getConnectionFactory(_defaultProtocol);
521 }
522
523 @Override
524 public ConnectionFactory getDefaultConnectionFactory()
525 {
526 if (isStarted())
527 return _defaultConnectionFactory;
528 return getConnectionFactory(_defaultProtocol);
529 }
530
531 private class Acceptor implements Runnable
532 {
533 private final int _id;
534 private String _name;
535
536 private Acceptor(int id)
537 {
538 _id = id;
539 }
540
541 @Override
542 public void run()
543 {
544 final Thread thread = Thread.currentThread();
545 String name=thread.getName();
546 _name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
547 thread.setName(_name);
548
549 int priority=thread.getPriority();
550 if (_acceptorPriorityDelta!=0)
551 thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));
552
553 synchronized (AbstractConnector.this)
554 {
555 _acceptors[_id] = thread;
556 }
557
558 try
559 {
560 while (isAccepting())
561 {
562 try
563 {
564 accept(_id);
565 }
566 catch (Throwable e)
567 {
568 if (isAccepting())
569 LOG.warn(e);
570 else
571 LOG.ignore(e);
572 }
573 }
574 }
575 finally
576 {
577 thread.setName(name);
578 if (_acceptorPriorityDelta!=0)
579 thread.setPriority(priority);
580
581 synchronized (AbstractConnector.this)
582 {
583 _acceptors[_id] = null;
584 }
585 CountDownLatch stopping=_stopping;
586 if (stopping!=null)
587 stopping.countDown();
588 }
589 }
590
591 @Override
592 public String toString()
593 {
594 String name=_name;
595 if (name==null)
596 return String.format("acceptor-%d@%x", _id, hashCode());
597 return name;
598 }
599
600 }
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625 @Override
626 public Collection<EndPoint> getConnectedEndPoints()
627 {
628 return _immutableEndPoints;
629 }
630
631 protected void onEndPointOpened(EndPoint endp)
632 {
633 _endpoints.add(endp);
634 }
635
636 protected void onEndPointClosed(EndPoint endp)
637 {
638 _endpoints.remove(endp);
639 }
640
641 @Override
642 public Scheduler getScheduler()
643 {
644 return _scheduler;
645 }
646
647 @Override
648 public String getName()
649 {
650 return _name;
651 }
652
653
654
655
656
657
658
659
660 public void setName(String name)
661 {
662 _name=name;
663 }
664
665 @Override
666 public String toString()
667 {
668 return String.format("%s@%x{%s,%s}",
669 _name==null?getClass().getSimpleName():_name,
670 hashCode(),
671 getDefaultProtocol(),getProtocols());
672 }
673 }