View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.net.Socket;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.LinkedHashMap;
27  import java.util.List;
28  import java.util.Locale;
29  import java.util.Map;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.eclipse.jetty.io.ArrayByteBufferPool;
36  import org.eclipse.jetty.io.ByteBufferPool;
37  import org.eclipse.jetty.io.Connection;
38  import org.eclipse.jetty.io.ssl.SslConnection;
39  import org.eclipse.jetty.util.FutureCallback;
40  import org.eclipse.jetty.util.annotation.ManagedAttribute;
41  import org.eclipse.jetty.util.annotation.ManagedObject;
42  import org.eclipse.jetty.util.component.ContainerLifeCycle;
43  import org.eclipse.jetty.util.component.Dumpable;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  import org.eclipse.jetty.util.ssl.SslContextFactory;
47  import org.eclipse.jetty.util.thread.Scheduler;
48  import org.eclipse.jetty.util.thread.TimerScheduler;
49  
50  /**
51   * <p>An abstract implementation of {@link Connector} that provides a {@link ConnectionFactory} mechanism
52   * for creating {@link Connection} instances for various protocols (HTTP, SSL, SPDY, etc).</p>
53   *
54   * <h2>Connector Services</h2>
55   * The abstract connector manages the dependent services needed by all specific connector instances:
56   * <ul>
57   * <li>The {@link Executor} service is used to run all active tasks needed by this connector such as accepting connections
58   * or handle HTTP requests. The default is to use the {@link Server#getThreadPool()} as an executor.
59   * </li>
60   * <li>The {@link Scheduler} service is used to monitor the idle timeouts of all connections and is also made available
61   * to the connections to time such things as asynchronous request timeouts.  The default is to use a new
62   * {@link TimerScheduler} instance.
63   * </li>
64   * <li>The {@link ByteBufferPool} service is made available to all connections to be used to acquire and release
65   * {@link ByteBuffer} instances from a pool.  The default is to use a new {@link ArrayByteBufferPool} instance.
66   * </li>
67   * </ul>
68   * These services are managed as aggregate beans by the {@link ContainerLifeCycle} super class and
69   * may either be managed or unmanaged beans.
70   *
71   * <h2>Connection Factories</h2>
72   * The connector keeps a collection of {@link ConnectionFactory} instances, each of which are known by their
73   * protocol name.  The protocol name may be a real protocol (eg http/1.1 or spdy/3) or it may be a private name
74   * that represents a special connection factory. For example, the name "SSL-http/1.1" is used for
75   * an {@link SslConnectionFactory} that has been instantiated with the {@link HttpConnectionFactory} as it's
76   * next protocol.
77   *
78   * <h4>Configuring Connection Factories</h4>
79   * The collection of available {@link ConnectionFactory} may be constructor injected or modified with the
80   * methods {@link #addConnectionFactory(ConnectionFactory)}, {@link #removeConnectionFactory(String)} and
81   * {@link #setConnectionFactories(Collection)}.  Only a single {@link ConnectionFactory} instance may be configured
82   * per protocol name, so if two factories with the same {@link ConnectionFactory#getProtocol()} are set, then
83   * the second will replace the first.
84   * <p>
85   * The protocol factory used for newly accepted connections is specified by
86   * the method {@link #setDefaultProtocol(String)} or defaults to the protocol of the first configured factory.
87   * <p>
88   * Each Connection factory type is responsible for the configuration of the protocols that it accepts. Thus to
89   * configure the HTTP protocol, you pass a {@link HttpConfiguration} instance to the {@link HttpConnectionFactory}
90   * (or the SPDY factories that can also provide HTTP Semantics).  Similarly the {@link SslConnectionFactory} is
91   * configured by passing it a {@link SslContextFactory} and a next protocol name.
92   *
93   * <h4>Connection Factory Operation</h4>
94   * {@link ConnectionFactory}s may simply create a {@link Connection} instance to support a specific
95   * protocol.  For example, the {@link HttpConnectionFactory} will create a {@link HttpConnection} instance
96   * that can handle http/1.1, http/1.0 and http/0.9.
97   * <p>
98   * {@link ConnectionFactory}s may also create a chain of {@link Connection} instances, using other {@link ConnectionFactory} instances.
99   * For example, the {@link SslConnectionFactory} is configured with a next protocol name, so that once it has accepted
100  * a connection and created an {@link SslConnection}, it then used the next {@link ConnectionFactory} from the
101  * connector using the {@link #getConnectionFactory(String)} method, to create a {@link Connection} instance that
102  * will handle the unecrypted bytes from the {@link SslConnection}.   If the next protocol is "http/1.1", then the
103  * {@link SslConnectionFactory} will have a protocol name of "SSL-http/1.1" and lookup "http/1.1" for the protocol
104  * to run over the SSL connection.
105  * <p>
106  * {@link ConnectionFactory}s may also create temporary {@link Connection} instances that will exchange bytes
107  * over the connection to determine what is the next protocol to use.  For example the NPN protocol is an extension
108  * of SSL to allow a protocol to be specified during the SSL handshake. NPN is used by the SPDY protocol to
109  * negotiate the version of SPDY or HTTP that the client and server will speak.  Thus to accept a SPDY connection, the
110  * connector will be configured with {@link ConnectionFactory}s for "SSL-NPN", "NPN", "spdy/3", "spdy/2", "http/1.1"
111  * with the default protocol being "SSL-NPN".  Thus a newly accepted connection uses "SSL-NPN", which specifies a
112  * SSLConnectionFactory with "NPN" as the next protocol.  Thus an SslConnection instance is created chained to an NPNConnection
113  * instance.  The NPN connection then negotiates with the client to determined the next protocol, which could be
114  * "spdy/3", "spdy/2" or the default of "http/1.1".  Once the next protocol is determined, the NPN connection
115  * calls {@link #getConnectionFactory(String)} to create a connection instance that will replace the NPN connection as
116  * the connection chained to the SSLConnection.
117  * <p>
118  * <h2>Acceptors</h2>
119  * The connector will execute a number of acceptor tasks to the {@link Exception} service passed to the constructor.
120  * The acceptor tasks run in a loop while the connector is running and repeatedly call the abstract {@link #accept(int)} method.
121  * The implementation of the accept method must:
122  * <nl>
123  * <li>block waiting for new connections
124  * <li>accept the connection (eg socket accept)
125  * <li>perform any configuration of the connection (eg. socket linger times)
126  * <li>call the {@link #getDefaultConnectionFactory()} {@link ConnectionFactory#newConnection(Connector, org.eclipse.jetty.io.EndPoint)}
127  * method to create a new Connection instance.
128  * </nl>
129  * The default number of acceptor tasks is the minimum of 1 and half the number of available CPUs. Having more acceptors may reduce
130  * the latency for servers that see a high rate of new connections (eg HTTP/1.0 without keep-alive).  Typically the default is
131  * sufficient for modern persistent protocols (HTTP/1.1, SPDY etc.)
132  */
133 @ManagedObject("Abstract implementation of the Connector Interface")
134 public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
135 {
136     protected final Logger LOG = Log.getLogger(getClass());
137     // Order is important on server side, so we use a LinkedHashMap
138     private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>();
139     private final Server _server;
140     private final Executor _executor;
141     private final Scheduler _scheduler;
142     private final ByteBufferPool _byteBufferPool;
143     private final Thread[] _acceptors;
144     private volatile CountDownLatch _stopping;
145     private long _idleTimeout = 30000;
146     private String _defaultProtocol;
147     private ConnectionFactory _defaultConnectionFactory;
148 
149     /**
150      * @param server The server this connector will be added to. Must not be null.
151      * @param executor An executor for this connector or null to use the servers executor
152      * @param scheduler A scheduler for this connector or null to a new {@link TimerScheduler} instance.
153      * @param pool A buffer pool for this connector or null to use a default {@link ByteBufferPool}
154      * @param acceptors the number of acceptor threads to use, or 0 for a default value.
155      * @param factories The Connection Factories to use.
156      */
157     public AbstractConnector(
158             Server server,
159             Executor executor,
160             Scheduler scheduler,
161             ByteBufferPool pool,
162             int acceptors,
163             ConnectionFactory... factories)
164     {
165         _server=server;
166         _executor=executor!=null?executor:_server.getThreadPool();
167         _scheduler=scheduler!=null?scheduler:new TimerScheduler();
168         _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
169 
170         addBean(_server,false);
171         addBean(_executor);
172         if (executor==null)
173             unmanage(_executor); // inherited from server
174         addBean(_scheduler);
175         addBean(_byteBufferPool);
176 
177         for (ConnectionFactory factory:factories)
178             addConnectionFactory(factory);
179 
180         if (acceptors<=0)
181             acceptors=Math.max(1,(Runtime.getRuntime().availableProcessors()) / 2);
182         if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
183             LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);
184         _acceptors = new Thread[acceptors];
185     }
186 
187 
188     @Override
189     public Server getServer()
190     {
191         return _server;
192     }
193 
194     @Override
195     public Executor getExecutor()
196     {
197         return _executor;
198     }
199 
200     @Override
201     public ByteBufferPool getByteBufferPool()
202     {
203         return _byteBufferPool;
204     }
205 
206     @Override
207     @ManagedAttribute("Idle timeout")
208     public long getIdleTimeout()
209     {
210         return _idleTimeout;
211     }
212 
213     /**
214      * <p>Sets the maximum Idle time for a connection, which roughly translates to the {@link Socket#setSoTimeout(int)}
215      * call, although with NIO implementations other mechanisms may be used to implement the timeout.</p>
216      * <p>The max idle time is applied:</p>
217      * <ul>
218      * <li>When waiting for a new message to be received on a connection</li>
219      * <li>When waiting for a new message to be sent on a connection</li>
220      * </ul>
221      * <p>This value is interpreted as the maximum time between some progress being made on the connection.
222      * So if a single byte is read or written, then the timeout is reset.</p>
223      *
224      * @param idleTimeout the idle timeout
225      */
226     public void setIdleTimeout(long idleTimeout)
227     {
228         _idleTimeout = idleTimeout;
229     }
230 
231     /**
232      * @return Returns the number of acceptor threads.
233      */
234     @ManagedAttribute("number of acceptor threads")
235     public int getAcceptors()
236     {
237         return _acceptors.length;
238     }
239 
240     @Override
241     protected void doStart() throws Exception
242     {
243         _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
244         if(_defaultConnectionFactory==null)
245             throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
246 
247         super.doStart();
248 
249         _stopping=new CountDownLatch(_acceptors.length);
250         for (int i = 0; i < _acceptors.length; i++)
251             getExecutor().execute(new Acceptor(i));
252 
253         LOG.info("Started {}", this);
254     }
255 
256 
257     protected void interruptAcceptors()
258     {
259         for (Thread thread : _acceptors)
260         {
261             if (thread != null)
262                 thread.interrupt();
263         }
264     }
265 
266     @Override
267     public Future<Void> shutdown()
268     {
269         return new FutureCallback(true);
270     }
271 
272     @Override
273     protected void doStop() throws Exception
274     {
275         // Tell the acceptors we are stopping
276         interruptAcceptors();
277 
278         // If we have a stop timeout
279         long stopTimeout = getStopTimeout();
280         if (stopTimeout > 0 && _stopping!=null)
281             _stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
282         _stopping=null;
283 
284         super.doStop();
285 
286         LOG.info("Stopped {}", this);
287     }
288 
289     public void join() throws InterruptedException
290     {
291         join(0);
292     }
293 
294     public void join(long timeout) throws InterruptedException
295     {
296         for (Thread thread : _acceptors)
297             if (thread != null)
298                 thread.join(timeout);
299     }
300 
301     protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
302 
303 
304     /* ------------------------------------------------------------ */
305     /**
306      * @return Is the connector accepting new connections
307      */
308     protected boolean isAccepting()
309     {
310         return isRunning();
311     }
312 
313     @Override
314     public ConnectionFactory getConnectionFactory(String protocol)
315     {
316         synchronized (_factories)
317         {
318             return _factories.get(protocol.toLowerCase(Locale.ENGLISH));
319         }
320     }
321 
322     @Override
323     public <T> T getConnectionFactory(Class<T> factoryType)
324     {
325         synchronized (_factories)
326         {
327             for (ConnectionFactory f : _factories.values())
328                 if (factoryType.isAssignableFrom(f.getClass()))
329                     return (T)f;
330             return null;
331         }
332     }
333 
334     public void addConnectionFactory(ConnectionFactory factory)
335     {
336         synchronized (_factories)
337         {
338             ConnectionFactory old=_factories.remove(factory.getProtocol());
339             if (old!=null)
340                 removeBean(old);
341             _factories.put(factory.getProtocol().toLowerCase(Locale.ENGLISH), factory);
342             addBean(factory);
343             if (_defaultProtocol==null)
344                 _defaultProtocol=factory.getProtocol();
345         }
346     }
347 
348     public ConnectionFactory removeConnectionFactory(String protocol)
349     {
350         synchronized (_factories)
351         {
352             ConnectionFactory factory= _factories.remove(protocol.toLowerCase(Locale.ENGLISH));
353             removeBean(factory);
354             return factory;
355         }
356     }
357 
358     @Override
359     public Collection<ConnectionFactory> getConnectionFactories()
360     {
361         synchronized (_factories)
362         {
363             return _factories.values();
364         }
365     }
366 
367     public void setConnectionFactories(Collection<ConnectionFactory> factories)
368     {
369         synchronized (_factories)
370         {
371             List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
372             for (ConnectionFactory factory: existing)
373                 removeConnectionFactory(factory.getProtocol());
374             for (ConnectionFactory factory: factories)
375                 if (factory!=null)
376                     addConnectionFactory(factory);
377         }
378     }
379 
380 
381     @Override
382     @ManagedAttribute("Protocols supported by this connector")
383     public List<String> getProtocols()
384     {
385         synchronized (_factories)
386         {
387             return new ArrayList<>(_factories.keySet());
388         }
389     }
390 
391     public void clearConnectionFactories()
392     {
393         synchronized (_factories)
394         {
395             _factories.clear();
396         }
397     }
398 
399     @ManagedAttribute("This connector's default protocol")
400     public String getDefaultProtocol()
401     {
402         return _defaultProtocol;
403     }
404 
405     public void setDefaultProtocol(String defaultProtocol)
406     {
407         _defaultProtocol = defaultProtocol.toLowerCase(Locale.ENGLISH);
408         if (isRunning())
409             _defaultConnectionFactory=getConnectionFactory(_defaultProtocol);
410     }
411 
412     @Override
413     public ConnectionFactory getDefaultConnectionFactory()
414     {
415         if (isStarted())
416             return _defaultConnectionFactory;
417         return getConnectionFactory(_defaultProtocol);
418     }
419 
420     private class Acceptor implements Runnable
421     {
422         private final int _acceptor;
423 
424         private Acceptor(int id)
425         {
426             _acceptor = id;
427         }
428 
429         @Override
430         public void run()
431         {
432             Thread current = Thread.currentThread();
433             String name = current.getName();
434             current.setName(name + "-acceptor-" + _acceptor + "-" + AbstractConnector.this);
435 
436             synchronized (AbstractConnector.this)
437             {
438                 _acceptors[_acceptor] = current;
439             }
440 
441             try
442             {
443                 while (isAccepting())
444                 {
445                     try
446                     {
447                         accept(_acceptor);
448                     }
449                     catch (Throwable e)
450                     {
451                         if (isAccepting())
452                             LOG.warn(e);
453                         else
454                             LOG.debug(e);
455                     }
456                 }
457             }
458             finally
459             {
460                 current.setName(name);
461 
462                 synchronized (AbstractConnector.this)
463                 {
464                     _acceptors[_acceptor] = null;
465                 }
466                 _stopping.countDown();
467             }
468         }
469     }
470 
471 //    protected void connectionOpened(Connection connection)
472 //    {
473 //        _stats.connectionOpened();
474 //        connection.onOpen();
475 //    }
476 //
477 //    protected void connectionClosed(Connection connection)
478 //    {
479 //        connection.onClose();
480 //        long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
481 //        _stats.connectionClosed(duration, connection.getMessagesIn(), connection.getMessagesOut());
482 //    }
483 //
484 //    public void connectionUpgraded(Connection oldConnection, Connection newConnection)
485 //    {
486 //        oldConnection.onClose();
487 //        _stats.connectionUpgraded(oldConnection.getMessagesIn(), oldConnection.getMessagesOut());
488 //        newConnection.onOpen();
489 //    }
490 
491     @Override
492     public Scheduler getScheduler()
493     {
494         return _scheduler;
495     }
496 
497     @Override
498     public String toString()
499     {
500         return String.format("%s@%x{%s}",
501                 getClass().getSimpleName(),
502                 hashCode(),
503                 getDefaultProtocol());
504     }
505 }