1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.net.Socket;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.LinkedHashMap;
28 import java.util.List;
29 import java.util.Locale;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.TimeUnit;
37
38 import org.eclipse.jetty.io.ArrayByteBufferPool;
39 import org.eclipse.jetty.io.ByteBufferPool;
40 import org.eclipse.jetty.io.Connection;
41 import org.eclipse.jetty.io.EndPoint;
42 import org.eclipse.jetty.io.ssl.SslConnection;
43 import org.eclipse.jetty.util.FutureCallback;
44 import org.eclipse.jetty.util.annotation.ManagedAttribute;
45 import org.eclipse.jetty.util.annotation.ManagedObject;
46 import org.eclipse.jetty.util.component.ContainerLifeCycle;
47 import org.eclipse.jetty.util.component.Dumpable;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50 import org.eclipse.jetty.util.ssl.SslContextFactory;
51 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
52 import org.eclipse.jetty.util.thread.Scheduler;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 @ManagedObject("Abstract implementation of the Connector Interface")
138 public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
139 {
140 protected final Logger LOG = Log.getLogger(getClass());
141
142 private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>();
143 private final Server _server;
144 private final Executor _executor;
145 private final Scheduler _scheduler;
146 private final ByteBufferPool _byteBufferPool;
147 private final Thread[] _acceptors;
148 private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap());
149 private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
150 private volatile CountDownLatch _stopping;
151 private long _idleTimeout = 30000;
152 private String _defaultProtocol;
153 private ConnectionFactory _defaultConnectionFactory;
154 private String _name;
155
156
157
158
159
160
161
162
163
164
165 public AbstractConnector(
166 Server server,
167 Executor executor,
168 Scheduler scheduler,
169 ByteBufferPool pool,
170 int acceptors,
171 ConnectionFactory... factories)
172 {
173 _server=server;
174 _executor=executor!=null?executor:_server.getThreadPool();
175 if (scheduler==null)
176 scheduler=_server.getBean(Scheduler.class);
177 _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
178 if (pool==null)
179 pool=_server.getBean(ByteBufferPool.class);
180 _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
181
182 addBean(_server,false);
183 addBean(_executor);
184 if (executor==null)
185 unmanage(_executor);
186 addBean(_scheduler);
187 addBean(_byteBufferPool);
188
189 for (ConnectionFactory factory:factories)
190 addConnectionFactory(factory);
191
192 if (acceptors<0)
193 acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 2);
194 if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
195 LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);
196 _acceptors = new Thread[acceptors];
197 }
198
199
200 @Override
201 public Server getServer()
202 {
203 return _server;
204 }
205
206 @Override
207 public Executor getExecutor()
208 {
209 return _executor;
210 }
211
212 @Override
213 public ByteBufferPool getByteBufferPool()
214 {
215 return _byteBufferPool;
216 }
217
218 @Override
219 @ManagedAttribute("Idle timeout")
220 public long getIdleTimeout()
221 {
222 return _idleTimeout;
223 }
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238 public void setIdleTimeout(long idleTimeout)
239 {
240 _idleTimeout = idleTimeout;
241 }
242
243
244
245
246 @ManagedAttribute("number of acceptor threads")
247 public int getAcceptors()
248 {
249 return _acceptors.length;
250 }
251
252 @Override
253 protected void doStart() throws Exception
254 {
255 _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
256 if(_defaultConnectionFactory==null)
257 throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
258
259 super.doStart();
260
261 _stopping=new CountDownLatch(_acceptors.length);
262 for (int i = 0; i < _acceptors.length; i++)
263 getExecutor().execute(new Acceptor(i));
264
265 LOG.info("Started {}", this);
266 }
267
268
269 protected void interruptAcceptors()
270 {
271 synchronized (this)
272 {
273 for (Thread thread : _acceptors)
274 {
275 if (thread != null)
276 thread.interrupt();
277 }
278 }
279 }
280
281 @Override
282 public Future<Void> shutdown()
283 {
284 return new FutureCallback(true);
285 }
286
287 @Override
288 protected void doStop() throws Exception
289 {
290
291 interruptAcceptors();
292
293
294 long stopTimeout = getStopTimeout();
295 CountDownLatch stopping=_stopping;
296 if (stopTimeout > 0 && stopping!=null)
297 stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
298 _stopping=null;
299
300 super.doStop();
301
302 LOG.info("Stopped {}", this);
303 }
304
305 public void join() throws InterruptedException
306 {
307 join(0);
308 }
309
310 public void join(long timeout) throws InterruptedException
311 {
312 synchronized (this)
313 {
314 for (Thread thread : _acceptors)
315 if (thread != null)
316 thread.join(timeout);
317 }
318 }
319
320 protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
321
322
323
324
325
326
327 protected boolean isAccepting()
328 {
329 return isRunning();
330 }
331
332 @Override
333 public ConnectionFactory getConnectionFactory(String protocol)
334 {
335 synchronized (_factories)
336 {
337 return _factories.get(protocol.toLowerCase(Locale.ENGLISH));
338 }
339 }
340
341 @Override
342 public <T> T getConnectionFactory(Class<T> factoryType)
343 {
344 synchronized (_factories)
345 {
346 for (ConnectionFactory f : _factories.values())
347 if (factoryType.isAssignableFrom(f.getClass()))
348 return (T)f;
349 return null;
350 }
351 }
352
353 public void addConnectionFactory(ConnectionFactory factory)
354 {
355 synchronized (_factories)
356 {
357 ConnectionFactory old=_factories.remove(factory.getProtocol());
358 if (old!=null)
359 removeBean(old);
360 _factories.put(factory.getProtocol().toLowerCase(Locale.ENGLISH), factory);
361 addBean(factory);
362 if (_defaultProtocol==null)
363 _defaultProtocol=factory.getProtocol();
364 }
365 }
366
367 public ConnectionFactory removeConnectionFactory(String protocol)
368 {
369 synchronized (_factories)
370 {
371 ConnectionFactory factory= _factories.remove(protocol.toLowerCase(Locale.ENGLISH));
372 removeBean(factory);
373 return factory;
374 }
375 }
376
377 @Override
378 public Collection<ConnectionFactory> getConnectionFactories()
379 {
380 synchronized (_factories)
381 {
382 return _factories.values();
383 }
384 }
385
386 public void setConnectionFactories(Collection<ConnectionFactory> factories)
387 {
388 synchronized (_factories)
389 {
390 List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
391 for (ConnectionFactory factory: existing)
392 removeConnectionFactory(factory.getProtocol());
393 for (ConnectionFactory factory: factories)
394 if (factory!=null)
395 addConnectionFactory(factory);
396 }
397 }
398
399
400 @Override
401 @ManagedAttribute("Protocols supported by this connector")
402 public List<String> getProtocols()
403 {
404 synchronized (_factories)
405 {
406 return new ArrayList<>(_factories.keySet());
407 }
408 }
409
410 public void clearConnectionFactories()
411 {
412 synchronized (_factories)
413 {
414 _factories.clear();
415 }
416 }
417
418 @ManagedAttribute("This connector's default protocol")
419 public String getDefaultProtocol()
420 {
421 return _defaultProtocol;
422 }
423
424 public void setDefaultProtocol(String defaultProtocol)
425 {
426 _defaultProtocol = defaultProtocol.toLowerCase(Locale.ENGLISH);
427 if (isRunning())
428 _defaultConnectionFactory=getConnectionFactory(_defaultProtocol);
429 }
430
431 @Override
432 public ConnectionFactory getDefaultConnectionFactory()
433 {
434 if (isStarted())
435 return _defaultConnectionFactory;
436 return getConnectionFactory(_defaultProtocol);
437 }
438
439 private class Acceptor implements Runnable
440 {
441 private final int _acceptor;
442
443 private Acceptor(int id)
444 {
445 _acceptor = id;
446 }
447
448 @Override
449 public void run()
450 {
451 Thread current = Thread.currentThread();
452 String name = current.getName();
453 current.setName(name + "-acceptor-" + _acceptor + "-" + AbstractConnector.this);
454
455 synchronized (AbstractConnector.this)
456 {
457 _acceptors[_acceptor] = current;
458 }
459
460 try
461 {
462 while (isAccepting())
463 {
464 try
465 {
466 accept(_acceptor);
467 }
468 catch (Throwable e)
469 {
470 if (isAccepting())
471 LOG.warn(e);
472 else
473 LOG.ignore(e);
474 }
475 }
476 }
477 finally
478 {
479 current.setName(name);
480
481 synchronized (AbstractConnector.this)
482 {
483 _acceptors[_acceptor] = null;
484 }
485 CountDownLatch stopping=_stopping;
486 if (stopping!=null)
487 stopping.countDown();
488 }
489 }
490 }
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515 @Override
516 public Collection<EndPoint> getConnectedEndPoints()
517 {
518 return _immutableEndPoints;
519 }
520
521 protected void onEndPointOpened(EndPoint endp)
522 {
523 _endpoints.add(endp);
524 }
525
526 protected void onEndPointClosed(EndPoint endp)
527 {
528 _endpoints.remove(endp);
529 }
530
531 @Override
532 public Scheduler getScheduler()
533 {
534 return _scheduler;
535 }
536
537 @Override
538 public String getName()
539 {
540 return _name;
541 }
542
543
544
545
546
547
548
549
550 public void setName(String name)
551 {
552 _name=name;
553 }
554
555 @Override
556 public String toString()
557 {
558 return String.format("%s@%x{%s}",
559 _name==null?getClass().getSimpleName():_name,
560 hashCode(),
561 getDefaultProtocol());
562 }
563 }