1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Locale;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35
36 import org.eclipse.jetty.io.ArrayByteBufferPool;
37 import org.eclipse.jetty.io.ByteBufferPool;
38 import org.eclipse.jetty.io.EndPoint;
39 import org.eclipse.jetty.util.FutureCallback;
40 import org.eclipse.jetty.util.annotation.ManagedAttribute;
41 import org.eclipse.jetty.util.annotation.ManagedObject;
42 import org.eclipse.jetty.util.component.ContainerLifeCycle;
43 import org.eclipse.jetty.util.component.Dumpable;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
47 import org.eclipse.jetty.util.thread.Scheduler;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 @ManagedObject("Abstract implementation of the Connector Interface")
133 public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
134 {
135 protected final Logger LOG = Log.getLogger(getClass());
136
137 private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>();
138 private final Server _server;
139 private final Executor _executor;
140 private final Scheduler _scheduler;
141 private final ByteBufferPool _byteBufferPool;
142 private final Thread[] _acceptors;
143 private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<EndPoint, Boolean>());
144 private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
145 private volatile CountDownLatch _stopping;
146 private long _idleTimeout = 30000;
147 private String _defaultProtocol;
148 private ConnectionFactory _defaultConnectionFactory;
149 private String _name;
150 private int _acceptorPriorityDelta;
151
152
153
154
155
156
157
158
159
160
161 public AbstractConnector(
162 Server server,
163 Executor executor,
164 Scheduler scheduler,
165 ByteBufferPool pool,
166 int acceptors,
167 ConnectionFactory... factories)
168 {
169 _server=server;
170 _executor=executor!=null?executor:_server.getThreadPool();
171 if (scheduler==null)
172 scheduler=_server.getBean(Scheduler.class);
173 _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
174 if (pool==null)
175 pool=_server.getBean(ByteBufferPool.class);
176 _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
177
178 addBean(_server,false);
179 addBean(_executor);
180 if (executor==null)
181 unmanage(_executor);
182 addBean(_scheduler);
183 addBean(_byteBufferPool);
184
185 for (ConnectionFactory factory:factories)
186 addConnectionFactory(factory);
187
188 int cores = Runtime.getRuntime().availableProcessors();
189 if (acceptors < 0)
190 acceptors=Math.max(1, Math.min(4,cores/8));
191 if (acceptors > cores)
192 LOG.warn("Acceptors should be <= availableProcessors: " + this);
193 _acceptors = new Thread[acceptors];
194 }
195
196
197 @Override
198 public Server getServer()
199 {
200 return _server;
201 }
202
203 @Override
204 public Executor getExecutor()
205 {
206 return _executor;
207 }
208
209 @Override
210 public ByteBufferPool getByteBufferPool()
211 {
212 return _byteBufferPool;
213 }
214
215 @Override
216 @ManagedAttribute("Idle timeout")
217 public long getIdleTimeout()
218 {
219 return _idleTimeout;
220 }
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235 public void setIdleTimeout(long idleTimeout)
236 {
237 _idleTimeout = idleTimeout;
238 }
239
240
241
242
243 @ManagedAttribute("number of acceptor threads")
244 public int getAcceptors()
245 {
246 return _acceptors.length;
247 }
248
249 @Override
250 protected void doStart() throws Exception
251 {
252 _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
253 if(_defaultConnectionFactory==null)
254 throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
255
256 super.doStart();
257
258 _stopping=new CountDownLatch(_acceptors.length);
259 for (int i = 0; i < _acceptors.length; i++)
260 {
261 Acceptor a = new Acceptor(i);
262 addBean(a);
263 getExecutor().execute(a);
264 }
265
266 LOG.info("Started {}", this);
267 }
268
269
270 protected void interruptAcceptors()
271 {
272 synchronized (this)
273 {
274 for (Thread thread : _acceptors)
275 {
276 if (thread != null)
277 thread.interrupt();
278 }
279 }
280 }
281
282 @Override
283 public Future<Void> shutdown()
284 {
285 return new FutureCallback(true);
286 }
287
288 @Override
289 protected void doStop() throws Exception
290 {
291
292 interruptAcceptors();
293
294
295 long stopTimeout = getStopTimeout();
296 CountDownLatch stopping=_stopping;
297 if (stopTimeout > 0 && stopping!=null)
298 stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
299 _stopping=null;
300
301 super.doStop();
302
303 for (Acceptor a : getBeans(Acceptor.class))
304 removeBean(a);
305
306 LOG.info("Stopped {}", this);
307 }
308
309 public void join() throws InterruptedException
310 {
311 join(0);
312 }
313
314 public void join(long timeout) throws InterruptedException
315 {
316 synchronized (this)
317 {
318 for (Thread thread : _acceptors)
319 if (thread != null)
320 thread.join(timeout);
321 }
322 }
323
324 protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
325
326
327
328
329
330
331 protected boolean isAccepting()
332 {
333 return isRunning();
334 }
335
336 @Override
337 public ConnectionFactory getConnectionFactory(String protocol)
338 {
339 synchronized (_factories)
340 {
341 return _factories.get(protocol.toLowerCase(Locale.ENGLISH));
342 }
343 }
344
345 @Override
346 public <T> T getConnectionFactory(Class<T> factoryType)
347 {
348 synchronized (_factories)
349 {
350 for (ConnectionFactory f : _factories.values())
351 if (factoryType.isAssignableFrom(f.getClass()))
352 return (T)f;
353 return null;
354 }
355 }
356
357 public void addConnectionFactory(ConnectionFactory factory)
358 {
359 synchronized (_factories)
360 {
361 ConnectionFactory old=_factories.remove(factory.getProtocol());
362 if (old!=null)
363 removeBean(old);
364 _factories.put(factory.getProtocol().toLowerCase(Locale.ENGLISH), factory);
365 addBean(factory);
366 if (_defaultProtocol==null)
367 _defaultProtocol=factory.getProtocol();
368 }
369 }
370
371 public ConnectionFactory removeConnectionFactory(String protocol)
372 {
373 synchronized (_factories)
374 {
375 ConnectionFactory factory= _factories.remove(protocol.toLowerCase(Locale.ENGLISH));
376 removeBean(factory);
377 return factory;
378 }
379 }
380
381 @Override
382 public Collection<ConnectionFactory> getConnectionFactories()
383 {
384 synchronized (_factories)
385 {
386 return _factories.values();
387 }
388 }
389
390 public void setConnectionFactories(Collection<ConnectionFactory> factories)
391 {
392 synchronized (_factories)
393 {
394 List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
395 for (ConnectionFactory factory: existing)
396 removeConnectionFactory(factory.getProtocol());
397 for (ConnectionFactory factory: factories)
398 if (factory!=null)
399 addConnectionFactory(factory);
400 }
401 }
402
403 @ManagedAttribute("The priority delta to apply to acceptor threads")
404 public int getAcceptorPriorityDelta()
405 {
406 return _acceptorPriorityDelta;
407 }
408
409
410
411
412
413
414
415
416
417 public void setAcceptorPriorityDelta(int acceptorPriorityDelta)
418 {
419 int old=_acceptorPriorityDelta;
420 _acceptorPriorityDelta = acceptorPriorityDelta;
421 if (old!=acceptorPriorityDelta && isStarted())
422 {
423 for (Thread thread : _acceptors)
424 thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,thread.getPriority()-old+acceptorPriorityDelta)));
425 }
426 }
427
428 @Override
429 @ManagedAttribute("Protocols supported by this connector")
430 public List<String> getProtocols()
431 {
432 synchronized (_factories)
433 {
434 return new ArrayList<>(_factories.keySet());
435 }
436 }
437
438 public void clearConnectionFactories()
439 {
440 synchronized (_factories)
441 {
442 _factories.clear();
443 }
444 }
445
446 @ManagedAttribute("This connector's default protocol")
447 public String getDefaultProtocol()
448 {
449 return _defaultProtocol;
450 }
451
452 public void setDefaultProtocol(String defaultProtocol)
453 {
454 _defaultProtocol = defaultProtocol.toLowerCase(Locale.ENGLISH);
455 if (isRunning())
456 _defaultConnectionFactory=getConnectionFactory(_defaultProtocol);
457 }
458
459 @Override
460 public ConnectionFactory getDefaultConnectionFactory()
461 {
462 if (isStarted())
463 return _defaultConnectionFactory;
464 return getConnectionFactory(_defaultProtocol);
465 }
466
467 private class Acceptor implements Runnable
468 {
469 private final int _acceptor;
470 private String _name;
471
472 private Acceptor(int id)
473 {
474 _acceptor = id;
475 }
476
477 @Override
478 public void run()
479 {
480 final Thread thread = Thread.currentThread();
481 String name=thread.getName();
482 _name=String.format("%s-acceptor-%d@%x-%s",name,_acceptor,hashCode(),AbstractConnector.this.toString());
483 thread.setName(_name);
484
485 int priority=thread.getPriority();
486 if (_acceptorPriorityDelta!=0)
487 thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));
488
489 synchronized (AbstractConnector.this)
490 {
491 _acceptors[_acceptor] = thread;
492 }
493
494 try
495 {
496 while (isAccepting())
497 {
498 try
499 {
500 accept(_acceptor);
501 }
502 catch (Throwable e)
503 {
504 if (isAccepting())
505 LOG.warn(e);
506 else
507 LOG.ignore(e);
508 }
509 }
510 }
511 finally
512 {
513 thread.setName(name);
514 if (_acceptorPriorityDelta!=0)
515 thread.setPriority(priority);
516
517 synchronized (AbstractConnector.this)
518 {
519 _acceptors[_acceptor] = null;
520 }
521 CountDownLatch stopping=_stopping;
522 if (stopping!=null)
523 stopping.countDown();
524 }
525 }
526
527 @Override
528 public String toString()
529 {
530 String name=_name;
531 if (name==null)
532 return String.format("acceptor-%d@%x", _acceptor, hashCode());
533 return name;
534 }
535
536 }
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561 @Override
562 public Collection<EndPoint> getConnectedEndPoints()
563 {
564 return _immutableEndPoints;
565 }
566
567 protected void onEndPointOpened(EndPoint endp)
568 {
569 _endpoints.add(endp);
570 }
571
572 protected void onEndPointClosed(EndPoint endp)
573 {
574 _endpoints.remove(endp);
575 }
576
577 @Override
578 public Scheduler getScheduler()
579 {
580 return _scheduler;
581 }
582
583 @Override
584 public String getName()
585 {
586 return _name;
587 }
588
589
590
591
592
593
594
595
596 public void setName(String name)
597 {
598 _name=name;
599 }
600
601 @Override
602 public String toString()
603 {
604 return String.format("%s@%x{%s}",
605 _name==null?getClass().getSimpleName():_name,
606 hashCode(),
607 getDefaultProtocol());
608 }
609 }