View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.net.Socket;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashSet;
28  import java.util.LinkedHashMap;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.TimeUnit;
37  
38  import org.eclipse.jetty.io.ArrayByteBufferPool;
39  import org.eclipse.jetty.io.ByteBufferPool;
40  import org.eclipse.jetty.io.EndPoint;
41  import org.eclipse.jetty.io.ssl.SslConnection;
42  import org.eclipse.jetty.util.FutureCallback;
43  import org.eclipse.jetty.util.StringUtil;
44  import org.eclipse.jetty.util.annotation.ManagedAttribute;
45  import org.eclipse.jetty.util.annotation.ManagedObject;
46  import org.eclipse.jetty.util.component.ContainerLifeCycle;
47  import org.eclipse.jetty.util.component.Dumpable;
48  import org.eclipse.jetty.util.log.Log;
49  import org.eclipse.jetty.util.log.Logger;
50  import org.eclipse.jetty.util.ssl.SslContextFactory;
51  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
52  import org.eclipse.jetty.util.thread.Scheduler;
53  
54  /**
55   * <p>An abstract implementation of {@link Connector} that provides a {@link ConnectionFactory} mechanism
56   * for creating {@link org.eclipse.jetty.io.Connection} instances for various protocols (HTTP, SSL, etc).</p>
57   *
58   * <h2>Connector Services</h2>
59   * The abstract connector manages the dependent services needed by all specific connector instances:
60   * <ul>
61   * <li>The {@link Executor} service is used to run all active tasks needed by this connector such as accepting connections
62   * or handle HTTP requests. The default is to use the {@link Server#getThreadPool()} as an executor.
63   * </li>
64   * <li>The {@link Scheduler} service is used to monitor the idle timeouts of all connections and is also made available
65   * to the connections to time such things as asynchronous request timeouts.  The default is to use a new
66   * {@link ScheduledExecutorScheduler} instance.
67   * </li>
68   * <li>The {@link ByteBufferPool} service is made available to all connections to be used to acquire and release
69   * {@link ByteBuffer} instances from a pool.  The default is to use a new {@link ArrayByteBufferPool} instance.
70   * </li>
71   * </ul>
72   * These services are managed as aggregate beans by the {@link ContainerLifeCycle} super class and
73   * may either be managed or unmanaged beans.
74   *
75   * <h2>Connection Factories</h2>
76   * The connector keeps a collection of {@link ConnectionFactory} instances, each of which are known by their
77   * protocol name.  The protocol name may be a real protocol (e.g. "http/1.1" or "h2") or it may be a private name
78   * that represents a special connection factory. For example, the name "SSL-http/1.1" is used for
79   * an {@link SslConnectionFactory} that has been instantiated with the {@link HttpConnectionFactory} as it's
80   * next protocol.
81   *
82   * <h2>Configuring Connection Factories</h2>
83   * The collection of available {@link ConnectionFactory} may be constructor injected or modified with the
84   * methods {@link #addConnectionFactory(ConnectionFactory)}, {@link #removeConnectionFactory(String)} and
85   * {@link #setConnectionFactories(Collection)}.  Only a single {@link ConnectionFactory} instance may be configured
86   * per protocol name, so if two factories with the same {@link ConnectionFactory#getProtocol()} are set, then
87   * the second will replace the first.
88   * <p>
89   * The protocol factory used for newly accepted connections is specified by
90   * the method {@link #setDefaultProtocol(String)} or defaults to the protocol of the first configured factory.
91   * <p>
92   * Each Connection factory type is responsible for the configuration of the protocols that it accepts. Thus to
93   * configure the HTTP protocol, you pass a {@link HttpConfiguration} instance to the {@link HttpConnectionFactory}
94   * (or other factories that can also provide HTTP Semantics).  Similarly the {@link SslConnectionFactory} is
95   * configured by passing it a {@link SslContextFactory} and a next protocol name.
96   *
97   * <h2>Connection Factory Operation</h2>
98   * {@link ConnectionFactory}s may simply create a {@link org.eclipse.jetty.io.Connection} instance to support a specific
99   * protocol.  For example, the {@link HttpConnectionFactory} will create a {@link HttpConnection} instance
100  * that can handle http/1.1, http/1.0 and http/0.9.
101  * <p>
102  * {@link ConnectionFactory}s may also create a chain of {@link org.eclipse.jetty.io.Connection} instances, using other {@link ConnectionFactory} instances.
103  * For example, the {@link SslConnectionFactory} is configured with a next protocol name, so that once it has accepted
104  * a connection and created an {@link SslConnection}, it then used the next {@link ConnectionFactory} from the
105  * connector using the {@link #getConnectionFactory(String)} method, to create a {@link org.eclipse.jetty.io.Connection} instance that
106  * will handle the unencrypted bytes from the {@link SslConnection}.   If the next protocol is "http/1.1", then the
107  * {@link SslConnectionFactory} will have a protocol name of "SSL-http/1.1" and lookup "http/1.1" for the protocol
108  * to run over the SSL connection.
109  * <p>
110  * {@link ConnectionFactory}s may also create temporary {@link org.eclipse.jetty.io.Connection} instances that will exchange bytes
111  * over the connection to determine what is the next protocol to use.  For example the ALPN protocol is an extension
112  * of SSL to allow a protocol to be specified during the SSL handshake. ALPN is used by the HTTP/2 protocol to
113  * negotiate the protocol that the client and server will speak.  Thus to accept a HTTP/2 connection, the
114  * connector will be configured with {@link ConnectionFactory}s for "SSL-ALPN", "h2", "http/1.1"
115  * with the default protocol being "SSL-ALPN".  Thus a newly accepted connection uses "SSL-ALPN", which specifies a
116  * SSLConnectionFactory with "ALPN" as the next protocol.  Thus an SSL connection instance is created chained to an ALPN
117  * connection instance.  The ALPN connection then negotiates with the client to determined the next protocol, which
118  * could be "h2" or the default of "http/1.1".  Once the next protocol is determined, the ALPN connection
119  * calls {@link #getConnectionFactory(String)} to create a connection instance that will replace the ALPN connection as
120  * the connection chained to the SSL connection.
121  * <h2>Acceptors</h2>
122  * The connector will execute a number of acceptor tasks to the {@link Exception} service passed to the constructor.
123  * The acceptor tasks run in a loop while the connector is running and repeatedly call the abstract {@link #accept(int)} method.
124  * The implementation of the accept method must:
125  * <ol>
126  * <li>block waiting for new connections</li>
127  * <li>accept the connection (eg socket accept)</li>
128  * <li>perform any configuration of the connection (eg. socket linger times)</li>
129  * <li>call the {@link #getDefaultConnectionFactory()} {@link ConnectionFactory#newConnection(Connector, org.eclipse.jetty.io.EndPoint)}
130  * method to create a new Connection instance.</li>
131  * </ol>
132  * The default number of acceptor tasks is the minimum of 1 and half the number of available CPUs. Having more acceptors may reduce
133  * the latency for servers that see a high rate of new connections (eg HTTP/1.0 without keep-alive).  Typically the default is
134  * sufficient for modern persistent protocols (HTTP/1.1, HTTP/2 etc.)
135  */
136 @ManagedObject("Abstract implementation of the Connector Interface")
137 public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
138 {
139     protected final Logger LOG = Log.getLogger(getClass());
140     // Order is important on server side, so we use a LinkedHashMap
141     private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>();
142     private final Server _server;
143     private final Executor _executor;
144     private final Scheduler _scheduler;
145     private final ByteBufferPool _byteBufferPool;
146     private final Thread[] _acceptors;
147     private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());
148     private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
149     private volatile CountDownLatch _stopping;
150     private long _idleTimeout = 30000;
151     private String _defaultProtocol;
152     private ConnectionFactory _defaultConnectionFactory;
153     private String _name;
154     private int _acceptorPriorityDelta;
155 
156 
157     /**
158      * @param server The server this connector will be added to. Must not be null.
159      * @param executor An executor for this connector or null to use the servers executor
160      * @param scheduler A scheduler for this connector or null to either a {@link Scheduler} set as a server bean or if none set, then a new {@link ScheduledExecutorScheduler} instance.
161      * @param pool A buffer pool for this connector or null to either a {@link ByteBufferPool} set as a server bean or none set, the new  {@link ArrayByteBufferPool} instance.
162      * @param acceptors the number of acceptor threads to use, or -1 for a default value. If 0, then no acceptor threads will be launched and some other mechanism will need to be used to accept new connections.
163      * @param factories The Connection Factories to use.
164      */
165     public AbstractConnector(
166             Server server,
167             Executor executor,
168             Scheduler scheduler,
169             ByteBufferPool pool,
170             int acceptors,
171             ConnectionFactory... factories)
172     {
173         _server=server;
174         _executor=executor!=null?executor:_server.getThreadPool();
175         if (scheduler==null)
176             scheduler=_server.getBean(Scheduler.class);
177         _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
178         if (pool==null)
179             pool=_server.getBean(ByteBufferPool.class);
180         _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
181 
182         addBean(_server,false);
183         addBean(_executor);
184         if (executor==null)
185             unmanage(_executor); // inherited from server
186         addBean(_scheduler);
187         addBean(_byteBufferPool);
188 
189         for (ConnectionFactory factory:factories)
190             addConnectionFactory(factory);
191 
192         int cores = Runtime.getRuntime().availableProcessors();
193         if (acceptors < 0)
194             acceptors=Math.max(1, Math.min(4,cores/8));
195         if (acceptors > cores)
196             LOG.warn("Acceptors should be <= availableProcessors: " + this);
197         _acceptors = new Thread[acceptors];
198     }
199 
200 
201     @Override
202     public Server getServer()
203     {
204         return _server;
205     }
206 
207     @Override
208     public Executor getExecutor()
209     {
210         return _executor;
211     }
212 
213     @Override
214     public ByteBufferPool getByteBufferPool()
215     {
216         return _byteBufferPool;
217     }
218 
219     @Override
220     @ManagedAttribute("Idle timeout")
221     public long getIdleTimeout()
222     {
223         return _idleTimeout;
224     }
225 
226     /**
227      * <p>Sets the maximum Idle time for a connection, which roughly translates to the {@link Socket#setSoTimeout(int)}
228      * call, although with NIO implementations other mechanisms may be used to implement the timeout.</p>
229      * <p>The max idle time is applied:</p>
230      * <ul>
231      * <li>When waiting for a new message to be received on a connection</li>
232      * <li>When waiting for a new message to be sent on a connection</li>
233      * </ul>
234      * <p>This value is interpreted as the maximum time between some progress being made on the connection.
235      * So if a single byte is read or written, then the timeout is reset.</p>
236      *
237      * @param idleTimeout the idle timeout
238      */
239     public void setIdleTimeout(long idleTimeout)
240     {
241         _idleTimeout = idleTimeout;
242     }
243 
244     /**
245      * @return Returns the number of acceptor threads.
246      */
247     @ManagedAttribute("number of acceptor threads")
248     public int getAcceptors()
249     {
250         return _acceptors.length;
251     }
252 
253     @Override
254     protected void doStart() throws Exception
255     {
256         _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
257         if(_defaultConnectionFactory==null)
258             throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
259 
260         super.doStart();
261 
262         _stopping=new CountDownLatch(_acceptors.length);
263         for (int i = 0; i < _acceptors.length; i++)
264         {
265             Acceptor a = new Acceptor(i);
266             addBean(a);
267             getExecutor().execute(a);
268         }
269 
270         LOG.info("Started {}", this);
271     }
272 
273 
274     protected void interruptAcceptors()
275     {
276         synchronized (this)
277         {
278             for (Thread thread : _acceptors)
279             {
280                 if (thread != null)
281                     thread.interrupt();
282             }
283         }
284     }
285 
286     @Override
287     public Future<Void> shutdown()
288     {
289         return new FutureCallback(true);
290     }
291 
292     @Override
293     protected void doStop() throws Exception
294     {
295         // Tell the acceptors we are stopping
296         interruptAcceptors();
297 
298         // If we have a stop timeout
299         long stopTimeout = getStopTimeout();
300         CountDownLatch stopping=_stopping;
301         if (stopTimeout > 0 && stopping!=null)
302             stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
303         _stopping=null;
304 
305         super.doStop();
306 
307         for (Acceptor a : getBeans(Acceptor.class))
308             removeBean(a);
309 
310         LOG.info("Stopped {}", this);
311     }
312 
313     public void join() throws InterruptedException
314     {
315         join(0);
316     }
317 
318     public void join(long timeout) throws InterruptedException
319     {
320         synchronized (this)
321         {
322             for (Thread thread : _acceptors)
323                 if (thread != null)
324                     thread.join(timeout);
325         }
326     }
327 
328     protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
329 
330 
331     /* ------------------------------------------------------------ */
332     /**
333      * @return Is the connector accepting new connections
334      */
335     protected boolean isAccepting()
336     {
337         return isRunning();
338     }
339 
340     @Override
341     public ConnectionFactory getConnectionFactory(String protocol)
342     {
343         synchronized (_factories)
344         {
345             return _factories.get(StringUtil.asciiToLowerCase(protocol));
346         }
347     }
348 
349     @Override
350     public <T> T getConnectionFactory(Class<T> factoryType)
351     {
352         synchronized (_factories)
353         {
354             for (ConnectionFactory f : _factories.values())
355                 if (factoryType.isAssignableFrom(f.getClass()))
356                     return (T)f;
357             return null;
358         }
359     }
360 
361     public void addConnectionFactory(ConnectionFactory factory)
362     {
363         synchronized (_factories)
364         {
365             Set<ConnectionFactory> to_remove = new HashSet<>();
366             for (String key:factory.getProtocols())
367             {
368                 key=StringUtil.asciiToLowerCase(key);
369                 ConnectionFactory old=_factories.remove(key);
370                 if (old!=null)
371                 {
372                     if (old.getProtocol().equals(_defaultProtocol))
373                         _defaultProtocol=null;
374                     to_remove.add(old);
375                 }
376                 _factories.put(key, factory);
377             }
378 
379             // keep factories still referenced
380             for (ConnectionFactory f : _factories.values())
381                 to_remove.remove(f);
382 
383             // remove old factories
384             for (ConnectionFactory old: to_remove)
385             {
386                 removeBean(old);
387                 if (LOG.isDebugEnabled())
388                     LOG.debug("{} removed {}", this, old);
389             }
390 
391             // add new Bean
392             addBean(factory);
393             if (_defaultProtocol==null)
394                 _defaultProtocol=factory.getProtocol();
395             if (LOG.isDebugEnabled())
396                 LOG.debug("{} added {}", this, factory);
397         }
398     }
399 
400     public void addFirstConnectionFactory(ConnectionFactory factory)
401     {
402         synchronized (_factories)
403         {
404             List<ConnectionFactory> existings = new ArrayList<>(_factories.values());
405             _factories.clear();
406             addConnectionFactory(factory);
407             for (ConnectionFactory existing : existings)
408                 addConnectionFactory(existing);
409             _defaultProtocol = factory.getProtocol();
410         }
411     }
412 
413     public void addIfAbsentConnectionFactory(ConnectionFactory factory)
414     {
415         synchronized (_factories)
416         {
417             String key=StringUtil.asciiToLowerCase(factory.getProtocol());
418             if (_factories.containsKey(key))
419             {
420                 if (LOG.isDebugEnabled())
421                     LOG.debug("{} addIfAbsent ignored {}", this, factory);
422             }
423             else
424             {
425                 _factories.put(key, factory);
426                 addBean(factory);
427                 if (_defaultProtocol==null)
428                     _defaultProtocol=factory.getProtocol();
429                 if (LOG.isDebugEnabled())
430                     LOG.debug("{} addIfAbsent added {}", this, factory);
431             }
432         }
433     }
434 
435     public ConnectionFactory removeConnectionFactory(String protocol)
436     {
437         synchronized (_factories)
438         {
439             ConnectionFactory factory= _factories.remove(StringUtil.asciiToLowerCase(protocol));
440             removeBean(factory);
441             return factory;
442         }
443     }
444 
445     @Override
446     public Collection<ConnectionFactory> getConnectionFactories()
447     {
448         synchronized (_factories)
449         {
450             return _factories.values();
451         }
452     }
453 
454     public void setConnectionFactories(Collection<ConnectionFactory> factories)
455     {
456         synchronized (_factories)
457         {
458             List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
459             for (ConnectionFactory factory: existing)
460                 removeConnectionFactory(factory.getProtocol());
461             for (ConnectionFactory factory: factories)
462                 if (factory!=null)
463                     addConnectionFactory(factory);
464         }
465     }
466 
467     @ManagedAttribute("The priority delta to apply to acceptor threads")
468     public int getAcceptorPriorityDelta()
469     {
470         return _acceptorPriorityDelta;
471     }
472 
473     /* ------------------------------------------------------------ */
474     /** Set the acceptor thread priority delta.
475      * <p>This allows the acceptor thread to run at a different priority.
476      * Typically this would be used to lower the priority to give preference
477      * to handling previously accepted connections rather than accepting
478      * new connections</p>
479      * @param acceptorPriorityDelta the acceptor priority delta
480      */
481     public void setAcceptorPriorityDelta(int acceptorPriorityDelta)
482     {
483         int old=_acceptorPriorityDelta;
484         _acceptorPriorityDelta = acceptorPriorityDelta;
485         if (old!=acceptorPriorityDelta && isStarted())
486         {
487             for (Thread thread : _acceptors)
488                 thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,thread.getPriority()-old+acceptorPriorityDelta)));
489         }
490     }
491 
492     @Override
493     @ManagedAttribute("Protocols supported by this connector")
494     public List<String> getProtocols()
495     {
496         synchronized (_factories)
497         {
498             return new ArrayList<>(_factories.keySet());
499         }
500     }
501 
502     public void clearConnectionFactories()
503     {
504         synchronized (_factories)
505         {
506             _factories.clear();
507         }
508     }
509 
510     @ManagedAttribute("This connector's default protocol")
511     public String getDefaultProtocol()
512     {
513         return _defaultProtocol;
514     }
515 
516     public void setDefaultProtocol(String defaultProtocol)
517     {
518         _defaultProtocol = StringUtil.asciiToLowerCase(defaultProtocol);
519         if (isRunning())
520             _defaultConnectionFactory=getConnectionFactory(_defaultProtocol);
521     }
522 
523     @Override
524     public ConnectionFactory getDefaultConnectionFactory()
525     {
526         if (isStarted())
527             return _defaultConnectionFactory;
528         return getConnectionFactory(_defaultProtocol);
529     }
530 
531     private class Acceptor implements Runnable
532     {
533         private final int _id;
534         private String _name;
535 
536         private Acceptor(int id)
537         {
538             _id = id;
539         }
540 
541         @Override
542         public void run()
543         {
544             final Thread thread = Thread.currentThread();
545             String name=thread.getName();
546             _name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
547             thread.setName(_name);
548 
549             int priority=thread.getPriority();
550             if (_acceptorPriorityDelta!=0)
551                 thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));
552 
553             synchronized (AbstractConnector.this)
554             {
555                 _acceptors[_id] = thread;
556             }
557 
558             try
559             {
560                 while (isAccepting())
561                 {
562                     try
563                     {
564                         accept(_id);
565                     }
566                     catch (Throwable e)
567                     {
568                         if (isAccepting())
569                             LOG.warn(e);
570                         else
571                             LOG.ignore(e);
572                     }
573                 }
574             }
575             finally
576             {
577                 thread.setName(name);
578                 if (_acceptorPriorityDelta!=0)
579                     thread.setPriority(priority);
580 
581                 synchronized (AbstractConnector.this)
582                 {
583                     _acceptors[_id] = null;
584                 }
585                 CountDownLatch stopping=_stopping;
586                 if (stopping!=null)
587                     stopping.countDown();
588             }
589         }
590 
591         @Override
592         public String toString()
593         {
594             String name=_name;
595             if (name==null)
596                 return String.format("acceptor-%d@%x", _id, hashCode());
597             return name;
598         }
599 
600     }
601 
602 
603 
604 
605 //    protected void connectionOpened(Connection connection)
606 //    {
607 //        _stats.connectionOpened();
608 //        connection.onOpen();
609 //    }
610 //
611 //    protected void connectionClosed(Connection connection)
612 //    {
613 //        connection.onClose();
614 //        long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
615 //        _stats.connectionClosed(duration, connection.getMessagesIn(), connection.getMessagesOut());
616 //    }
617 //
618 //    public void connectionUpgraded(Connection oldConnection, Connection newConnection)
619 //    {
620 //        oldConnection.onClose();
621 //        _stats.connectionUpgraded(oldConnection.getMessagesIn(), oldConnection.getMessagesOut());
622 //        newConnection.onOpen();
623 //    }
624 
625     @Override
626     public Collection<EndPoint> getConnectedEndPoints()
627     {
628         return _immutableEndPoints;
629     }
630 
631     protected void onEndPointOpened(EndPoint endp)
632     {
633         _endpoints.add(endp);
634     }
635 
636     protected void onEndPointClosed(EndPoint endp)
637     {
638         _endpoints.remove(endp);
639     }
640 
641     @Override
642     public Scheduler getScheduler()
643     {
644         return _scheduler;
645     }
646 
647     @Override
648     public String getName()
649     {
650         return _name;
651     }
652 
653     /* ------------------------------------------------------------ */
654     /**
655      * Set a connector name.   A context may be configured with
656      * virtual hosts in the form "@contextname" and will only serve
657      * requests from the named connector,
658      * @param name A connector name.
659      */
660     public void setName(String name)
661     {
662         _name=name;
663     }
664 
665     @Override
666     public String toString()
667     {
668         return String.format("%s@%x{%s,%s}",
669                 _name==null?getClass().getSimpleName():_name,
670                 hashCode(),
671                 getDefaultProtocol(),getProtocols());
672     }
673 }