1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.net.Socket;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.LinkedHashMap;
27 import java.util.List;
28 import java.util.Locale;
29 import java.util.Map;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34
35 import org.eclipse.jetty.io.ArrayByteBufferPool;
36 import org.eclipse.jetty.io.ByteBufferPool;
37 import org.eclipse.jetty.io.Connection;
38 import org.eclipse.jetty.io.ssl.SslConnection;
39 import org.eclipse.jetty.util.FutureCallback;
40 import org.eclipse.jetty.util.annotation.ManagedAttribute;
41 import org.eclipse.jetty.util.annotation.ManagedObject;
42 import org.eclipse.jetty.util.component.ContainerLifeCycle;
43 import org.eclipse.jetty.util.component.Dumpable;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46 import org.eclipse.jetty.util.ssl.SslContextFactory;
47 import org.eclipse.jetty.util.thread.Scheduler;
48 import org.eclipse.jetty.util.thread.TimerScheduler;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 @ManagedObject("Abstract implementation of the Connector Interface")
134 public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
135 {
136 protected final Logger LOG = Log.getLogger(getClass());
137
138 private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>();
139 private final Server _server;
140 private final Executor _executor;
141 private final Scheduler _scheduler;
142 private final ByteBufferPool _byteBufferPool;
143 private final Thread[] _acceptors;
144 private volatile CountDownLatch _stopping;
145 private long _idleTimeout = 30000;
146 private String _defaultProtocol;
147 private ConnectionFactory _defaultConnectionFactory;
148
149
150
151
152
153
154
155
156
157 public AbstractConnector(
158 Server server,
159 Executor executor,
160 Scheduler scheduler,
161 ByteBufferPool pool,
162 int acceptors,
163 ConnectionFactory... factories)
164 {
165 _server=server;
166 _executor=executor!=null?executor:_server.getThreadPool();
167 _scheduler=scheduler!=null?scheduler:new TimerScheduler();
168 _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
169
170 addBean(_server,false);
171 addBean(_executor);
172 if (executor==null)
173 unmanage(_executor);
174 addBean(_scheduler);
175 addBean(_byteBufferPool);
176
177 for (ConnectionFactory factory:factories)
178 addConnectionFactory(factory);
179
180 if (acceptors<=0)
181 acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 2);
182 if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
183 LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);
184 _acceptors = new Thread[acceptors];
185 }
186
187
188 @Override
189 public Server getServer()
190 {
191 return _server;
192 }
193
194 @Override
195 public Executor getExecutor()
196 {
197 return _executor;
198 }
199
200 @Override
201 public ByteBufferPool getByteBufferPool()
202 {
203 return _byteBufferPool;
204 }
205
206 @Override
207 @ManagedAttribute("Idle timeout")
208 public long getIdleTimeout()
209 {
210 return _idleTimeout;
211 }
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226 public void setIdleTimeout(long idleTimeout)
227 {
228 _idleTimeout = idleTimeout;
229 }
230
231
232
233
234 @ManagedAttribute("number of acceptor threads")
235 public int getAcceptors()
236 {
237 return _acceptors.length;
238 }
239
240 @Override
241 protected void doStart() throws Exception
242 {
243 _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
244 if(_defaultConnectionFactory==null)
245 throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
246
247 super.doStart();
248
249 _stopping=new CountDownLatch(_acceptors.length);
250 for (int i = 0; i < _acceptors.length; i++)
251 getExecutor().execute(new Acceptor(i));
252
253 LOG.info("Started {}", this);
254 }
255
256
257 protected void interruptAcceptors()
258 {
259 for (Thread thread : _acceptors)
260 {
261 if (thread != null)
262 thread.interrupt();
263 }
264 }
265
266 @Override
267 public Future<Void> shutdown()
268 {
269 return new FutureCallback(true);
270 }
271
272 @Override
273 protected void doStop() throws Exception
274 {
275
276 interruptAcceptors();
277
278
279 long stopTimeout = getStopTimeout();
280 if (stopTimeout > 0 && _stopping!=null)
281 _stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
282 _stopping=null;
283
284 super.doStop();
285
286 LOG.info("Stopped {}", this);
287 }
288
289 public void join() throws InterruptedException
290 {
291 join(0);
292 }
293
294 public void join(long timeout) throws InterruptedException
295 {
296 for (Thread thread : _acceptors)
297 if (thread != null)
298 thread.join(timeout);
299 }
300
301 protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
302
303
304
305
306
307
308 protected boolean isAccepting()
309 {
310 return isRunning();
311 }
312
313 @Override
314 public ConnectionFactory getConnectionFactory(String protocol)
315 {
316 synchronized (_factories)
317 {
318 return _factories.get(protocol.toLowerCase(Locale.ENGLISH));
319 }
320 }
321
322 @Override
323 public <T> T getConnectionFactory(Class<T> factoryType)
324 {
325 synchronized (_factories)
326 {
327 for (ConnectionFactory f : _factories.values())
328 if (factoryType.isAssignableFrom(f.getClass()))
329 return (T)f;
330 return null;
331 }
332 }
333
334 public void addConnectionFactory(ConnectionFactory factory)
335 {
336 synchronized (_factories)
337 {
338 ConnectionFactory old=_factories.remove(factory.getProtocol());
339 if (old!=null)
340 removeBean(old);
341 _factories.put(factory.getProtocol().toLowerCase(Locale.ENGLISH), factory);
342 addBean(factory);
343 if (_defaultProtocol==null)
344 _defaultProtocol=factory.getProtocol();
345 }
346 }
347
348 public ConnectionFactory removeConnectionFactory(String protocol)
349 {
350 synchronized (_factories)
351 {
352 ConnectionFactory factory= _factories.remove(protocol.toLowerCase(Locale.ENGLISH));
353 removeBean(factory);
354 return factory;
355 }
356 }
357
358 @Override
359 public Collection<ConnectionFactory> getConnectionFactories()
360 {
361 synchronized (_factories)
362 {
363 return _factories.values();
364 }
365 }
366
367 public void setConnectionFactories(Collection<ConnectionFactory> factories)
368 {
369 synchronized (_factories)
370 {
371 List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
372 for (ConnectionFactory factory: existing)
373 removeConnectionFactory(factory.getProtocol());
374 for (ConnectionFactory factory: factories)
375 if (factory!=null)
376 addConnectionFactory(factory);
377 }
378 }
379
380
381 @Override
382 @ManagedAttribute("Protocols supported by this connector")
383 public List<String> getProtocols()
384 {
385 synchronized (_factories)
386 {
387 return new ArrayList<>(_factories.keySet());
388 }
389 }
390
391 public void clearConnectionFactories()
392 {
393 synchronized (_factories)
394 {
395 _factories.clear();
396 }
397 }
398
399 @ManagedAttribute("This connector's default protocol")
400 public String getDefaultProtocol()
401 {
402 return _defaultProtocol;
403 }
404
405 public void setDefaultProtocol(String defaultProtocol)
406 {
407 _defaultProtocol = defaultProtocol.toLowerCase(Locale.ENGLISH);
408 if (isRunning())
409 _defaultConnectionFactory=getConnectionFactory(_defaultProtocol);
410 }
411
412 @Override
413 public ConnectionFactory getDefaultConnectionFactory()
414 {
415 if (isStarted())
416 return _defaultConnectionFactory;
417 return getConnectionFactory(_defaultProtocol);
418 }
419
420 private class Acceptor implements Runnable
421 {
422 private final int _acceptor;
423
424 private Acceptor(int id)
425 {
426 _acceptor = id;
427 }
428
429 @Override
430 public void run()
431 {
432 Thread current = Thread.currentThread();
433 String name = current.getName();
434 current.setName(name + "-acceptor-" + _acceptor + "-" + AbstractConnector.this);
435
436 synchronized (AbstractConnector.this)
437 {
438 _acceptors[_acceptor] = current;
439 }
440
441 try
442 {
443 while (isAccepting())
444 {
445 try
446 {
447 accept(_acceptor);
448 }
449 catch (Throwable e)
450 {
451 if (isAccepting())
452 LOG.warn(e);
453 else
454 LOG.debug(e);
455 }
456 }
457 }
458 finally
459 {
460 current.setName(name);
461
462 synchronized (AbstractConnector.this)
463 {
464 _acceptors[_acceptor] = null;
465 }
466 _stopping.countDown();
467 }
468 }
469 }
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491 @Override
492 public Scheduler getScheduler()
493 {
494 return _scheduler;
495 }
496
497 @Override
498 public String toString()
499 {
500 return String.format("%s@%x{%s}",
501 getClass().getSimpleName(),
502 hashCode(),
503 getDefaultProtocol());
504 }
505 }