1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.net.Socket;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.LinkedHashMap;
28 import java.util.List;
29 import java.util.Locale;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.TimeUnit;
37
38 import org.eclipse.jetty.io.ArrayByteBufferPool;
39 import org.eclipse.jetty.io.ByteBufferPool;
40 import org.eclipse.jetty.io.Connection;
41 import org.eclipse.jetty.io.EndPoint;
42 import org.eclipse.jetty.io.ssl.SslConnection;
43 import org.eclipse.jetty.util.FutureCallback;
44 import org.eclipse.jetty.util.annotation.ManagedAttribute;
45 import org.eclipse.jetty.util.annotation.ManagedObject;
46 import org.eclipse.jetty.util.component.ContainerLifeCycle;
47 import org.eclipse.jetty.util.component.Dumpable;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50 import org.eclipse.jetty.util.ssl.SslContextFactory;
51 import org.eclipse.jetty.util.thread.Scheduler;
52 import org.eclipse.jetty.util.thread.TimerScheduler;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 @ManagedObject("Abstract implementation of the Connector Interface")
138 public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
139 {
140 protected final Logger LOG = Log.getLogger(getClass());
141
142 private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>();
143 private final Server _server;
144 private final Executor _executor;
145 private final Scheduler _scheduler;
146 private final ByteBufferPool _byteBufferPool;
147 private final Thread[] _acceptors;
148 private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap());
149 private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
150 private volatile CountDownLatch _stopping;
151 private long _idleTimeout = 30000;
152 private String _defaultProtocol;
153 private ConnectionFactory _defaultConnectionFactory;
154
155
156
157
158
159
160
161
162
163
164 public AbstractConnector(
165 Server server,
166 Executor executor,
167 Scheduler scheduler,
168 ByteBufferPool pool,
169 int acceptors,
170 ConnectionFactory... factories)
171 {
172 _server=server;
173 _executor=executor!=null?executor:_server.getThreadPool();
174 if (scheduler==null)
175 scheduler=_server.getBean(Scheduler.class);
176 _scheduler=scheduler!=null?scheduler:new TimerScheduler();
177 if (pool==null)
178 pool=_server.getBean(ByteBufferPool.class);
179 _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
180
181 addBean(_server,false);
182 addBean(_executor);
183 if (executor==null)
184 unmanage(_executor);
185 addBean(_scheduler);
186 addBean(_byteBufferPool);
187
188 for (ConnectionFactory factory:factories)
189 addConnectionFactory(factory);
190
191 if (acceptors<=0)
192 acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 2);
193 if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
194 LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);
195 _acceptors = new Thread[acceptors];
196 }
197
198
199 @Override
200 public Server getServer()
201 {
202 return _server;
203 }
204
205 @Override
206 public Executor getExecutor()
207 {
208 return _executor;
209 }
210
211 @Override
212 public ByteBufferPool getByteBufferPool()
213 {
214 return _byteBufferPool;
215 }
216
217 @Override
218 @ManagedAttribute("Idle timeout")
219 public long getIdleTimeout()
220 {
221 return _idleTimeout;
222 }
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237 public void setIdleTimeout(long idleTimeout)
238 {
239 _idleTimeout = idleTimeout;
240 }
241
242
243
244
245 @ManagedAttribute("number of acceptor threads")
246 public int getAcceptors()
247 {
248 return _acceptors.length;
249 }
250
251 @Override
252 protected void doStart() throws Exception
253 {
254 _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
255 if(_defaultConnectionFactory==null)
256 throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
257
258 super.doStart();
259
260 _stopping=new CountDownLatch(_acceptors.length);
261 for (int i = 0; i < _acceptors.length; i++)
262 getExecutor().execute(new Acceptor(i));
263
264 LOG.info("Started {}", this);
265 }
266
267
268 protected void interruptAcceptors()
269 {
270 for (Thread thread : _acceptors)
271 {
272 if (thread != null)
273 thread.interrupt();
274 }
275 }
276
277 @Override
278 public Future<Void> shutdown()
279 {
280 return new FutureCallback(true);
281 }
282
283 @Override
284 protected void doStop() throws Exception
285 {
286
287 interruptAcceptors();
288
289
290 long stopTimeout = getStopTimeout();
291 if (stopTimeout > 0 && _stopping!=null)
292 _stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
293 _stopping=null;
294
295 super.doStop();
296
297 LOG.info("Stopped {}", this);
298 }
299
300 public void join() throws InterruptedException
301 {
302 join(0);
303 }
304
305 public void join(long timeout) throws InterruptedException
306 {
307 for (Thread thread : _acceptors)
308 if (thread != null)
309 thread.join(timeout);
310 }
311
312 protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
313
314
315
316
317
318
319 protected boolean isAccepting()
320 {
321 return isRunning();
322 }
323
324 @Override
325 public ConnectionFactory getConnectionFactory(String protocol)
326 {
327 synchronized (_factories)
328 {
329 return _factories.get(protocol.toLowerCase(Locale.ENGLISH));
330 }
331 }
332
333 @Override
334 public <T> T getConnectionFactory(Class<T> factoryType)
335 {
336 synchronized (_factories)
337 {
338 for (ConnectionFactory f : _factories.values())
339 if (factoryType.isAssignableFrom(f.getClass()))
340 return (T)f;
341 return null;
342 }
343 }
344
345 public void addConnectionFactory(ConnectionFactory factory)
346 {
347 synchronized (_factories)
348 {
349 ConnectionFactory old=_factories.remove(factory.getProtocol());
350 if (old!=null)
351 removeBean(old);
352 _factories.put(factory.getProtocol().toLowerCase(Locale.ENGLISH), factory);
353 addBean(factory);
354 if (_defaultProtocol==null)
355 _defaultProtocol=factory.getProtocol();
356 }
357 }
358
359 public ConnectionFactory removeConnectionFactory(String protocol)
360 {
361 synchronized (_factories)
362 {
363 ConnectionFactory factory= _factories.remove(protocol.toLowerCase(Locale.ENGLISH));
364 removeBean(factory);
365 return factory;
366 }
367 }
368
369 @Override
370 public Collection<ConnectionFactory> getConnectionFactories()
371 {
372 synchronized (_factories)
373 {
374 return _factories.values();
375 }
376 }
377
378 public void setConnectionFactories(Collection<ConnectionFactory> factories)
379 {
380 synchronized (_factories)
381 {
382 List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
383 for (ConnectionFactory factory: existing)
384 removeConnectionFactory(factory.getProtocol());
385 for (ConnectionFactory factory: factories)
386 if (factory!=null)
387 addConnectionFactory(factory);
388 }
389 }
390
391
392 @Override
393 @ManagedAttribute("Protocols supported by this connector")
394 public List<String> getProtocols()
395 {
396 synchronized (_factories)
397 {
398 return new ArrayList<>(_factories.keySet());
399 }
400 }
401
402 public void clearConnectionFactories()
403 {
404 synchronized (_factories)
405 {
406 _factories.clear();
407 }
408 }
409
410 @ManagedAttribute("This connector's default protocol")
411 public String getDefaultProtocol()
412 {
413 return _defaultProtocol;
414 }
415
416 public void setDefaultProtocol(String defaultProtocol)
417 {
418 _defaultProtocol = defaultProtocol.toLowerCase(Locale.ENGLISH);
419 if (isRunning())
420 _defaultConnectionFactory=getConnectionFactory(_defaultProtocol);
421 }
422
423 @Override
424 public ConnectionFactory getDefaultConnectionFactory()
425 {
426 if (isStarted())
427 return _defaultConnectionFactory;
428 return getConnectionFactory(_defaultProtocol);
429 }
430
431 private class Acceptor implements Runnable
432 {
433 private final int _acceptor;
434
435 private Acceptor(int id)
436 {
437 _acceptor = id;
438 }
439
440 @Override
441 public void run()
442 {
443 Thread current = Thread.currentThread();
444 String name = current.getName();
445 current.setName(name + "-acceptor-" + _acceptor + "-" + AbstractConnector.this);
446
447 synchronized (AbstractConnector.this)
448 {
449 _acceptors[_acceptor] = current;
450 }
451
452 try
453 {
454 while (isAccepting())
455 {
456 try
457 {
458 accept(_acceptor);
459 }
460 catch (Throwable e)
461 {
462 if (isAccepting())
463 LOG.warn(e);
464 else
465 LOG.debug(e);
466 }
467 }
468 }
469 finally
470 {
471 current.setName(name);
472
473 synchronized (AbstractConnector.this)
474 {
475 _acceptors[_acceptor] = null;
476 }
477 _stopping.countDown();
478 }
479 }
480 }
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505 @Override
506 public Collection<EndPoint> getConnectedEndPoints()
507 {
508 return _immutableEndPoints;
509 }
510
511 protected void onEndPointOpened(EndPoint endp)
512 {
513 _endpoints.add(endp);
514 }
515
516 protected void onEndPointClosed(EndPoint endp)
517 {
518 _endpoints.remove(endp);
519 }
520
521 @Override
522 public Scheduler getScheduler()
523 {
524 return _scheduler;
525 }
526
527 @Override
528 public String toString()
529 {
530 return String.format("%s@%x{%s}",
531 getClass().getSimpleName(),
532 hashCode(),
533 getDefaultProtocol());
534 }
535 }