View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.net.Socket;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.LinkedHashMap;
28  import java.util.List;
29  import java.util.Locale;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.TimeUnit;
37  
38  import org.eclipse.jetty.io.ArrayByteBufferPool;
39  import org.eclipse.jetty.io.ByteBufferPool;
40  import org.eclipse.jetty.io.Connection;
41  import org.eclipse.jetty.io.EndPoint;
42  import org.eclipse.jetty.io.ssl.SslConnection;
43  import org.eclipse.jetty.util.FutureCallback;
44  import org.eclipse.jetty.util.annotation.ManagedAttribute;
45  import org.eclipse.jetty.util.annotation.ManagedObject;
46  import org.eclipse.jetty.util.component.ContainerLifeCycle;
47  import org.eclipse.jetty.util.component.Dumpable;
48  import org.eclipse.jetty.util.log.Log;
49  import org.eclipse.jetty.util.log.Logger;
50  import org.eclipse.jetty.util.ssl.SslContextFactory;
51  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
52  import org.eclipse.jetty.util.thread.Scheduler;
53  
54  /**
55   * <p>An abstract implementation of {@link Connector} that provides a {@link ConnectionFactory} mechanism
56   * for creating {@link Connection} instances for various protocols (HTTP, SSL, SPDY, etc).</p>
57   *
58   * <h2>Connector Services</h2>
59   * The abstract connector manages the dependent services needed by all specific connector instances:
60   * <ul>
61   * <li>The {@link Executor} service is used to run all active tasks needed by this connector such as accepting connections
62   * or handle HTTP requests. The default is to use the {@link Server#getThreadPool()} as an executor.
63   * </li>
64   * <li>The {@link Scheduler} service is used to monitor the idle timeouts of all connections and is also made available
65   * to the connections to time such things as asynchronous request timeouts.  The default is to use a new
66   * {@link ScheduledExecutorScheduler} instance.
67   * </li>
68   * <li>The {@link ByteBufferPool} service is made available to all connections to be used to acquire and release
69   * {@link ByteBuffer} instances from a pool.  The default is to use a new {@link ArrayByteBufferPool} instance.
70   * </li>
71   * </ul>
72   * These services are managed as aggregate beans by the {@link ContainerLifeCycle} super class and
73   * may either be managed or unmanaged beans.
74   *
75   * <h2>Connection Factories</h2>
76   * The connector keeps a collection of {@link ConnectionFactory} instances, each of which are known by their
77   * protocol name.  The protocol name may be a real protocol (eg http/1.1 or spdy/3) or it may be a private name
78   * that represents a special connection factory. For example, the name "SSL-http/1.1" is used for
79   * an {@link SslConnectionFactory} that has been instantiated with the {@link HttpConnectionFactory} as it's
80   * next protocol.
81   *
82   * <h4>Configuring Connection Factories</h4>
83   * The collection of available {@link ConnectionFactory} may be constructor injected or modified with the
84   * methods {@link #addConnectionFactory(ConnectionFactory)}, {@link #removeConnectionFactory(String)} and
85   * {@link #setConnectionFactories(Collection)}.  Only a single {@link ConnectionFactory} instance may be configured
86   * per protocol name, so if two factories with the same {@link ConnectionFactory#getProtocol()} are set, then
87   * the second will replace the first.
88   * <p>
89   * The protocol factory used for newly accepted connections is specified by
90   * the method {@link #setDefaultProtocol(String)} or defaults to the protocol of the first configured factory.
91   * <p>
92   * Each Connection factory type is responsible for the configuration of the protocols that it accepts. Thus to
93   * configure the HTTP protocol, you pass a {@link HttpConfiguration} instance to the {@link HttpConnectionFactory}
94   * (or the SPDY factories that can also provide HTTP Semantics).  Similarly the {@link SslConnectionFactory} is
95   * configured by passing it a {@link SslContextFactory} and a next protocol name.
96   *
97   * <h4>Connection Factory Operation</h4>
98   * {@link ConnectionFactory}s may simply create a {@link Connection} instance to support a specific
99   * protocol.  For example, the {@link HttpConnectionFactory} will create a {@link HttpConnection} instance
100  * that can handle http/1.1, http/1.0 and http/0.9.
101  * <p>
102  * {@link ConnectionFactory}s may also create a chain of {@link Connection} instances, using other {@link ConnectionFactory} instances.
103  * For example, the {@link SslConnectionFactory} is configured with a next protocol name, so that once it has accepted
104  * a connection and created an {@link SslConnection}, it then used the next {@link ConnectionFactory} from the
105  * connector using the {@link #getConnectionFactory(String)} method, to create a {@link Connection} instance that
106  * will handle the unecrypted bytes from the {@link SslConnection}.   If the next protocol is "http/1.1", then the
107  * {@link SslConnectionFactory} will have a protocol name of "SSL-http/1.1" and lookup "http/1.1" for the protocol
108  * to run over the SSL connection.
109  * <p>
110  * {@link ConnectionFactory}s may also create temporary {@link Connection} instances that will exchange bytes
111  * over the connection to determine what is the next protocol to use.  For example the NPN protocol is an extension
112  * of SSL to allow a protocol to be specified during the SSL handshake. NPN is used by the SPDY protocol to
113  * negotiate the version of SPDY or HTTP that the client and server will speak.  Thus to accept a SPDY connection, the
114  * connector will be configured with {@link ConnectionFactory}s for "SSL-NPN", "NPN", "spdy/3", "spdy/2", "http/1.1"
115  * with the default protocol being "SSL-NPN".  Thus a newly accepted connection uses "SSL-NPN", which specifies a
116  * SSLConnectionFactory with "NPN" as the next protocol.  Thus an SslConnection instance is created chained to an NPNConnection
117  * instance.  The NPN connection then negotiates with the client to determined the next protocol, which could be
118  * "spdy/3", "spdy/2" or the default of "http/1.1".  Once the next protocol is determined, the NPN connection
119  * calls {@link #getConnectionFactory(String)} to create a connection instance that will replace the NPN connection as
120  * the connection chained to the SSLConnection.
121  * <p>
122  * <h2>Acceptors</h2>
123  * The connector will execute a number of acceptor tasks to the {@link Exception} service passed to the constructor.
124  * The acceptor tasks run in a loop while the connector is running and repeatedly call the abstract {@link #accept(int)} method.
125  * The implementation of the accept method must:
126  * <nl>
127  * <li>block waiting for new connections
128  * <li>accept the connection (eg socket accept)
129  * <li>perform any configuration of the connection (eg. socket linger times)
130  * <li>call the {@link #getDefaultConnectionFactory()} {@link ConnectionFactory#newConnection(Connector, org.eclipse.jetty.io.EndPoint)}
131  * method to create a new Connection instance.
132  * </nl>
133  * The default number of acceptor tasks is the minimum of 1 and half the number of available CPUs. Having more acceptors may reduce
134  * the latency for servers that see a high rate of new connections (eg HTTP/1.0 without keep-alive).  Typically the default is
135  * sufficient for modern persistent protocols (HTTP/1.1, SPDY etc.)
136  */
137 @ManagedObject("Abstract implementation of the Connector Interface")
138 public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
139 {
140     protected final Logger LOG = Log.getLogger(getClass());
141     // Order is important on server side, so we use a LinkedHashMap
142     private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>();
143     private final Server _server;
144     private final Executor _executor;
145     private final Scheduler _scheduler;
146     private final ByteBufferPool _byteBufferPool;
147     private final Thread[] _acceptors;
148     private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap());
149     private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
150     private volatile CountDownLatch _stopping;
151     private long _idleTimeout = 30000;
152     private String _defaultProtocol;
153     private ConnectionFactory _defaultConnectionFactory;
154     private String _name;
155 
156 
157     /**
158      * @param server The server this connector will be added to. Must not be null.
159      * @param executor An executor for this connector or null to use the servers executor
160      * @param scheduler A scheduler for this connector or null to either a {@link Scheduler} set as a server bean or if none set, then a new {@link ScheduledExecutorScheduler} instance.
161      * @param pool A buffer pool for this connector or null to either a {@link ByteBufferPool} set as a server bean or none set, the new  {@link ArrayByteBufferPool} instance.
162      * @param acceptors the number of acceptor threads to use, or 0 for a default value.
163      * @param factories The Connection Factories to use.
164      */
165     public AbstractConnector(
166             Server server,
167             Executor executor,
168             Scheduler scheduler,
169             ByteBufferPool pool,
170             int acceptors,
171             ConnectionFactory... factories)
172     {
173         _server=server;
174         _executor=executor!=null?executor:_server.getThreadPool();
175         if (scheduler==null)
176             scheduler=_server.getBean(Scheduler.class);
177         _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
178         if (pool==null)
179             pool=_server.getBean(ByteBufferPool.class);
180         _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
181 
182         addBean(_server,false);
183         addBean(_executor);
184         if (executor==null)
185             unmanage(_executor); // inherited from server
186         addBean(_scheduler);
187         addBean(_byteBufferPool);
188 
189         for (ConnectionFactory factory:factories)
190             addConnectionFactory(factory);
191 
192         if (acceptors<=0)
193             acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 2);
194         if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
195             LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);
196         _acceptors = new Thread[acceptors];
197     }
198 
199 
200     @Override
201     public Server getServer()
202     {
203         return _server;
204     }
205 
206     @Override
207     public Executor getExecutor()
208     {
209         return _executor;
210     }
211 
212     @Override
213     public ByteBufferPool getByteBufferPool()
214     {
215         return _byteBufferPool;
216     }
217 
218     @Override
219     @ManagedAttribute("Idle timeout")
220     public long getIdleTimeout()
221     {
222         return _idleTimeout;
223     }
224 
225     /**
226      * <p>Sets the maximum Idle time for a connection, which roughly translates to the {@link Socket#setSoTimeout(int)}
227      * call, although with NIO implementations other mechanisms may be used to implement the timeout.</p>
228      * <p>The max idle time is applied:</p>
229      * <ul>
230      * <li>When waiting for a new message to be received on a connection</li>
231      * <li>When waiting for a new message to be sent on a connection</li>
232      * </ul>
233      * <p>This value is interpreted as the maximum time between some progress being made on the connection.
234      * So if a single byte is read or written, then the timeout is reset.</p>
235      *
236      * @param idleTimeout the idle timeout
237      */
238     public void setIdleTimeout(long idleTimeout)
239     {
240         _idleTimeout = idleTimeout;
241     }
242 
243     /**
244      * @return Returns the number of acceptor threads.
245      */
246     @ManagedAttribute("number of acceptor threads")
247     public int getAcceptors()
248     {
249         return _acceptors.length;
250     }
251 
252     @Override
253     protected void doStart() throws Exception
254     {
255         _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
256         if(_defaultConnectionFactory==null)
257             throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
258 
259         super.doStart();
260 
261         _stopping=new CountDownLatch(_acceptors.length);
262         for (int i = 0; i < _acceptors.length; i++)
263             getExecutor().execute(new Acceptor(i));
264 
265         LOG.info("Started {}", this);
266     }
267 
268 
269     protected void interruptAcceptors()
270     {
271         for (Thread thread : _acceptors)
272         {
273             if (thread != null)
274                 thread.interrupt();
275         }
276     }
277 
278     @Override
279     public Future<Void> shutdown()
280     {
281         return new FutureCallback(true);
282     }
283 
284     @Override
285     protected void doStop() throws Exception
286     {
287         // Tell the acceptors we are stopping
288         interruptAcceptors();
289 
290         // If we have a stop timeout
291         long stopTimeout = getStopTimeout();
292         CountDownLatch stopping=_stopping;
293         if (stopTimeout > 0 && stopping!=null)
294             stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
295         _stopping=null;
296 
297         super.doStop();
298 
299         LOG.info("Stopped {}", this);
300     }
301 
302     public void join() throws InterruptedException
303     {
304         join(0);
305     }
306 
307     public void join(long timeout) throws InterruptedException
308     {
309         for (Thread thread : _acceptors)
310             if (thread != null)
311                 thread.join(timeout);
312     }
313 
314     protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
315 
316 
317     /* ------------------------------------------------------------ */
318     /**
319      * @return Is the connector accepting new connections
320      */
321     protected boolean isAccepting()
322     {
323         return isRunning();
324     }
325 
326     @Override
327     public ConnectionFactory getConnectionFactory(String protocol)
328     {
329         synchronized (_factories)
330         {
331             return _factories.get(protocol.toLowerCase(Locale.ENGLISH));
332         }
333     }
334 
335     @Override
336     public <T> T getConnectionFactory(Class<T> factoryType)
337     {
338         synchronized (_factories)
339         {
340             for (ConnectionFactory f : _factories.values())
341                 if (factoryType.isAssignableFrom(f.getClass()))
342                     return (T)f;
343             return null;
344         }
345     }
346 
347     public void addConnectionFactory(ConnectionFactory factory)
348     {
349         synchronized (_factories)
350         {
351             ConnectionFactory old=_factories.remove(factory.getProtocol());
352             if (old!=null)
353                 removeBean(old);
354             _factories.put(factory.getProtocol().toLowerCase(Locale.ENGLISH), factory);
355             addBean(factory);
356             if (_defaultProtocol==null)
357                 _defaultProtocol=factory.getProtocol();
358         }
359     }
360 
361     public ConnectionFactory removeConnectionFactory(String protocol)
362     {
363         synchronized (_factories)
364         {
365             ConnectionFactory factory= _factories.remove(protocol.toLowerCase(Locale.ENGLISH));
366             removeBean(factory);
367             return factory;
368         }
369     }
370 
371     @Override
372     public Collection<ConnectionFactory> getConnectionFactories()
373     {
374         synchronized (_factories)
375         {
376             return _factories.values();
377         }
378     }
379 
380     public void setConnectionFactories(Collection<ConnectionFactory> factories)
381     {
382         synchronized (_factories)
383         {
384             List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
385             for (ConnectionFactory factory: existing)
386                 removeConnectionFactory(factory.getProtocol());
387             for (ConnectionFactory factory: factories)
388                 if (factory!=null)
389                     addConnectionFactory(factory);
390         }
391     }
392 
393 
394     @Override
395     @ManagedAttribute("Protocols supported by this connector")
396     public List<String> getProtocols()
397     {
398         synchronized (_factories)
399         {
400             return new ArrayList<>(_factories.keySet());
401         }
402     }
403 
404     public void clearConnectionFactories()
405     {
406         synchronized (_factories)
407         {
408             _factories.clear();
409         }
410     }
411 
412     @ManagedAttribute("This connector's default protocol")
413     public String getDefaultProtocol()
414     {
415         return _defaultProtocol;
416     }
417 
418     public void setDefaultProtocol(String defaultProtocol)
419     {
420         _defaultProtocol = defaultProtocol.toLowerCase(Locale.ENGLISH);
421         if (isRunning())
422             _defaultConnectionFactory=getConnectionFactory(_defaultProtocol);
423     }
424 
425     @Override
426     public ConnectionFactory getDefaultConnectionFactory()
427     {
428         if (isStarted())
429             return _defaultConnectionFactory;
430         return getConnectionFactory(_defaultProtocol);
431     }
432 
433     private class Acceptor implements Runnable
434     {
435         private final int _acceptor;
436 
437         private Acceptor(int id)
438         {
439             _acceptor = id;
440         }
441 
442         @Override
443         public void run()
444         {
445             Thread current = Thread.currentThread();
446             String name = current.getName();
447             current.setName(name + "-acceptor-" + _acceptor + "-" + AbstractConnector.this);
448 
449             synchronized (AbstractConnector.this)
450             {
451                 _acceptors[_acceptor] = current;
452             }
453 
454             try
455             {
456                 while (isAccepting())
457                 {
458                     try
459                     {
460                         accept(_acceptor);
461                     }
462                     catch (Throwable e)
463                     {
464                         if (isAccepting())
465                             LOG.warn(e);
466                         else
467                             LOG.debug(e);
468                     }
469                 }
470             }
471             finally
472             {
473                 current.setName(name);
474 
475                 synchronized (AbstractConnector.this)
476                 {
477                     _acceptors[_acceptor] = null;
478                 }
479                 CountDownLatch stopping=_stopping;
480                 if (stopping!=null)
481                     stopping.countDown();
482             }
483         }
484     }
485 
486 
487 
488 
489 //    protected void connectionOpened(Connection connection)
490 //    {
491 //        _stats.connectionOpened();
492 //        connection.onOpen();
493 //    }
494 //
495 //    protected void connectionClosed(Connection connection)
496 //    {
497 //        connection.onClose();
498 //        long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
499 //        _stats.connectionClosed(duration, connection.getMessagesIn(), connection.getMessagesOut());
500 //    }
501 //
502 //    public void connectionUpgraded(Connection oldConnection, Connection newConnection)
503 //    {
504 //        oldConnection.onClose();
505 //        _stats.connectionUpgraded(oldConnection.getMessagesIn(), oldConnection.getMessagesOut());
506 //        newConnection.onOpen();
507 //    }
508 
509     @Override
510     public Collection<EndPoint> getConnectedEndPoints()
511     {
512         return _immutableEndPoints;
513     }
514 
515     protected void onEndPointOpened(EndPoint endp)
516     {
517         _endpoints.add(endp);
518     }
519 
520     protected void onEndPointClosed(EndPoint endp)
521     {
522         _endpoints.remove(endp);
523     }
524 
525     @Override
526     public Scheduler getScheduler()
527     {
528         return _scheduler;
529     }
530 
531     @Override
532     public String getName()
533     {
534         return _name;
535     }
536     
537     /* ------------------------------------------------------------ */
538     /**
539      * Set a connector name.   A context may be configured with
540      * virtual hosts in the form "@contextname" and will only serve
541      * requests from the named connector,
542      * @param name A connector name.
543      */
544     public void setName(String name)
545     {
546         _name=name;
547     }
548     
549     @Override
550     public String toString()
551     {
552         return String.format("%s@%x{%s}",
553                 _name==null?getClass().getSimpleName():_name,
554                 hashCode(),
555                 getDefaultProtocol());
556     }
557 }