1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.net.Socket;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.LinkedHashMap;
28 import java.util.List;
29 import java.util.Locale;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.TimeUnit;
37
38 import org.eclipse.jetty.io.ArrayByteBufferPool;
39 import org.eclipse.jetty.io.ByteBufferPool;
40 import org.eclipse.jetty.io.Connection;
41 import org.eclipse.jetty.io.EndPoint;
42 import org.eclipse.jetty.io.ssl.SslConnection;
43 import org.eclipse.jetty.util.FutureCallback;
44 import org.eclipse.jetty.util.annotation.ManagedAttribute;
45 import org.eclipse.jetty.util.annotation.ManagedObject;
46 import org.eclipse.jetty.util.component.ContainerLifeCycle;
47 import org.eclipse.jetty.util.component.Dumpable;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50 import org.eclipse.jetty.util.ssl.SslContextFactory;
51 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
52 import org.eclipse.jetty.util.thread.Scheduler;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 @ManagedObject("Abstract implementation of the Connector Interface")
138 public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
139 {
140 protected final Logger LOG = Log.getLogger(getClass());
141
142 private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>();
143 private final Server _server;
144 private final Executor _executor;
145 private final Scheduler _scheduler;
146 private final ByteBufferPool _byteBufferPool;
147 private final Thread[] _acceptors;
148 private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap());
149 private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
150 private volatile CountDownLatch _stopping;
151 private long _idleTimeout = 30000;
152 private String _defaultProtocol;
153 private ConnectionFactory _defaultConnectionFactory;
154 private String _name;
155
156
157
158
159
160
161
162
163
164
165 public AbstractConnector(
166 Server server,
167 Executor executor,
168 Scheduler scheduler,
169 ByteBufferPool pool,
170 int acceptors,
171 ConnectionFactory... factories)
172 {
173 _server=server;
174 _executor=executor!=null?executor:_server.getThreadPool();
175 if (scheduler==null)
176 scheduler=_server.getBean(Scheduler.class);
177 _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
178 if (pool==null)
179 pool=_server.getBean(ByteBufferPool.class);
180 _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
181
182 addBean(_server,false);
183 addBean(_executor);
184 if (executor==null)
185 unmanage(_executor);
186 addBean(_scheduler);
187 addBean(_byteBufferPool);
188
189 for (ConnectionFactory factory:factories)
190 addConnectionFactory(factory);
191
192 if (acceptors<=0)
193 acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 2);
194 if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
195 LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);
196 _acceptors = new Thread[acceptors];
197 }
198
199
200 @Override
201 public Server getServer()
202 {
203 return _server;
204 }
205
206 @Override
207 public Executor getExecutor()
208 {
209 return _executor;
210 }
211
212 @Override
213 public ByteBufferPool getByteBufferPool()
214 {
215 return _byteBufferPool;
216 }
217
218 @Override
219 @ManagedAttribute("Idle timeout")
220 public long getIdleTimeout()
221 {
222 return _idleTimeout;
223 }
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238 public void setIdleTimeout(long idleTimeout)
239 {
240 _idleTimeout = idleTimeout;
241 }
242
243
244
245
246 @ManagedAttribute("number of acceptor threads")
247 public int getAcceptors()
248 {
249 return _acceptors.length;
250 }
251
252 @Override
253 protected void doStart() throws Exception
254 {
255 _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
256 if(_defaultConnectionFactory==null)
257 throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
258
259 super.doStart();
260
261 _stopping=new CountDownLatch(_acceptors.length);
262 for (int i = 0; i < _acceptors.length; i++)
263 getExecutor().execute(new Acceptor(i));
264
265 LOG.info("Started {}", this);
266 }
267
268
269 protected void interruptAcceptors()
270 {
271 for (Thread thread : _acceptors)
272 {
273 if (thread != null)
274 thread.interrupt();
275 }
276 }
277
278 @Override
279 public Future<Void> shutdown()
280 {
281 return new FutureCallback(true);
282 }
283
284 @Override
285 protected void doStop() throws Exception
286 {
287
288 interruptAcceptors();
289
290
291 long stopTimeout = getStopTimeout();
292 CountDownLatch stopping=_stopping;
293 if (stopTimeout > 0 && stopping!=null)
294 stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
295 _stopping=null;
296
297 super.doStop();
298
299 LOG.info("Stopped {}", this);
300 }
301
302 public void join() throws InterruptedException
303 {
304 join(0);
305 }
306
307 public void join(long timeout) throws InterruptedException
308 {
309 for (Thread thread : _acceptors)
310 if (thread != null)
311 thread.join(timeout);
312 }
313
314 protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
315
316
317
318
319
320
321 protected boolean isAccepting()
322 {
323 return isRunning();
324 }
325
326 @Override
327 public ConnectionFactory getConnectionFactory(String protocol)
328 {
329 synchronized (_factories)
330 {
331 return _factories.get(protocol.toLowerCase(Locale.ENGLISH));
332 }
333 }
334
335 @Override
336 public <T> T getConnectionFactory(Class<T> factoryType)
337 {
338 synchronized (_factories)
339 {
340 for (ConnectionFactory f : _factories.values())
341 if (factoryType.isAssignableFrom(f.getClass()))
342 return (T)f;
343 return null;
344 }
345 }
346
347 public void addConnectionFactory(ConnectionFactory factory)
348 {
349 synchronized (_factories)
350 {
351 ConnectionFactory old=_factories.remove(factory.getProtocol());
352 if (old!=null)
353 removeBean(old);
354 _factories.put(factory.getProtocol().toLowerCase(Locale.ENGLISH), factory);
355 addBean(factory);
356 if (_defaultProtocol==null)
357 _defaultProtocol=factory.getProtocol();
358 }
359 }
360
361 public ConnectionFactory removeConnectionFactory(String protocol)
362 {
363 synchronized (_factories)
364 {
365 ConnectionFactory factory= _factories.remove(protocol.toLowerCase(Locale.ENGLISH));
366 removeBean(factory);
367 return factory;
368 }
369 }
370
371 @Override
372 public Collection<ConnectionFactory> getConnectionFactories()
373 {
374 synchronized (_factories)
375 {
376 return _factories.values();
377 }
378 }
379
380 public void setConnectionFactories(Collection<ConnectionFactory> factories)
381 {
382 synchronized (_factories)
383 {
384 List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
385 for (ConnectionFactory factory: existing)
386 removeConnectionFactory(factory.getProtocol());
387 for (ConnectionFactory factory: factories)
388 if (factory!=null)
389 addConnectionFactory(factory);
390 }
391 }
392
393
394 @Override
395 @ManagedAttribute("Protocols supported by this connector")
396 public List<String> getProtocols()
397 {
398 synchronized (_factories)
399 {
400 return new ArrayList<>(_factories.keySet());
401 }
402 }
403
404 public void clearConnectionFactories()
405 {
406 synchronized (_factories)
407 {
408 _factories.clear();
409 }
410 }
411
412 @ManagedAttribute("This connector's default protocol")
413 public String getDefaultProtocol()
414 {
415 return _defaultProtocol;
416 }
417
418 public void setDefaultProtocol(String defaultProtocol)
419 {
420 _defaultProtocol = defaultProtocol.toLowerCase(Locale.ENGLISH);
421 if (isRunning())
422 _defaultConnectionFactory=getConnectionFactory(_defaultProtocol);
423 }
424
425 @Override
426 public ConnectionFactory getDefaultConnectionFactory()
427 {
428 if (isStarted())
429 return _defaultConnectionFactory;
430 return getConnectionFactory(_defaultProtocol);
431 }
432
433 private class Acceptor implements Runnable
434 {
435 private final int _acceptor;
436
437 private Acceptor(int id)
438 {
439 _acceptor = id;
440 }
441
442 @Override
443 public void run()
444 {
445 Thread current = Thread.currentThread();
446 String name = current.getName();
447 current.setName(name + "-acceptor-" + _acceptor + "-" + AbstractConnector.this);
448
449 synchronized (AbstractConnector.this)
450 {
451 _acceptors[_acceptor] = current;
452 }
453
454 try
455 {
456 while (isAccepting())
457 {
458 try
459 {
460 accept(_acceptor);
461 }
462 catch (Throwable e)
463 {
464 if (isAccepting())
465 LOG.warn(e);
466 else
467 LOG.debug(e);
468 }
469 }
470 }
471 finally
472 {
473 current.setName(name);
474
475 synchronized (AbstractConnector.this)
476 {
477 _acceptors[_acceptor] = null;
478 }
479 CountDownLatch stopping=_stopping;
480 if (stopping!=null)
481 stopping.countDown();
482 }
483 }
484 }
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509 @Override
510 public Collection<EndPoint> getConnectedEndPoints()
511 {
512 return _immutableEndPoints;
513 }
514
515 protected void onEndPointOpened(EndPoint endp)
516 {
517 _endpoints.add(endp);
518 }
519
520 protected void onEndPointClosed(EndPoint endp)
521 {
522 _endpoints.remove(endp);
523 }
524
525 @Override
526 public Scheduler getScheduler()
527 {
528 return _scheduler;
529 }
530
531 @Override
532 public String getName()
533 {
534 return _name;
535 }
536
537
538
539
540
541
542
543
544 public void setName(String name)
545 {
546 _name=name;
547 }
548
549 @Override
550 public String toString()
551 {
552 return String.format("%s@%x{%s}",
553 _name==null?getClass().getSimpleName():_name,
554 hashCode(),
555 getDefaultProtocol());
556 }
557 }