View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.server.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.ServerSocketChannel;
21  import java.nio.channels.SocketChannel;
22  import java.util.Arrays;
23  
24  import org.eclipse.jetty.continuation.Continuation;
25  import org.eclipse.jetty.io.ConnectedEndPoint;
26  import org.eclipse.jetty.io.Connection;
27  import org.eclipse.jetty.io.EndPoint;
28  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
29  import org.eclipse.jetty.io.nio.SelectorManager;
30  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
31  import org.eclipse.jetty.server.AsyncHttpConnection;
32  import org.eclipse.jetty.server.Connector;
33  import org.eclipse.jetty.server.HttpConnection;
34  import org.eclipse.jetty.server.Request;
35  import org.eclipse.jetty.server.Server;
36  import org.eclipse.jetty.util.component.AggregateLifeCycle;
37  import org.eclipse.jetty.util.log.Log;
38  import org.eclipse.jetty.util.thread.Timeout.Task;
39  
40  /* ------------------------------------------------------------------------------- */
41  /**
42   * Selecting NIO connector.
43   * <p>
44   * This connector uses efficient NIO buffers with a non blocking threading model. Direct NIO buffers
45   * are used and threads are only allocated to connections with requests. Synchronization is used to
46   * simulate blocking for the servlet API, and any unflushed content at the end of request handling
47   * is written asynchronously.
48   * </p>
49   * <p>
50   * This connector is best used when there are a many connections that have idle periods.
51   * </p>
52   * <p>
53   * When used with {@link org.eclipse.jetty.continuation.Continuation}, threadless waits are supported.
54   * If a filter or servlet returns after calling {@link Continuation#suspend()} or when a
55   * runtime exception is thrown from a call to {@link Continuation#undispatch()}, Jetty will
56   * will not send a response to the client. Instead the thread is released and the Continuation is
57   * placed on the timer queue. If the Continuation timeout expires, or it's
58   * resume method is called, then the request is again allocated a thread and the request is retried.
59   * The limitation of this approach is that request content is not available on the retried request,
60   * thus if possible it should be read after the continuation or saved as a request attribute or as the
61   * associated object of the Continuation instance.
62   * </p>
63   *
64   * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector"
65   */
66  public class SelectChannelConnector extends AbstractNIOConnector
67  {
68      protected ServerSocketChannel _acceptChannel;
69      private int _lowResourcesConnections;
70      private int _lowResourcesMaxIdleTime;
71      private int _localPort=-1;
72  
73      private final SelectorManager _manager = new ConnectorSelectorManager();
74  
75      /* ------------------------------------------------------------------------------- */
76      /**
77       * Constructor.
78       *
79       */
80      public SelectChannelConnector()
81      {
82          _manager.setMaxIdleTime(getMaxIdleTime());
83          setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
84      }
85  
86      /* ------------------------------------------------------------ */
87      @Override
88      public void accept(int acceptorID) throws IOException
89      {
90          ServerSocketChannel server = _acceptChannel;
91          if (server!=null && server.isOpen())
92          {
93              SocketChannel channel = _acceptChannel.accept();
94              channel.configureBlocking(false);
95              Socket socket = channel.socket();
96              configure(socket);
97              _manager.register(channel);
98          }
99      }
100 
101     /* ------------------------------------------------------------ */
102     public void close() throws IOException
103     {
104         synchronized(this)
105         {
106             if (_acceptChannel != null)
107                 _acceptChannel.close();
108             _acceptChannel = null;
109             _localPort=-2;
110         }
111     }
112 
113     /* ------------------------------------------------------------------------------- */
114     @Override
115     public void customize(EndPoint endpoint, Request request) throws IOException
116     {
117         SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
118         cep.cancelIdle();
119         request.setTimeStamp(cep.getSelectSet().getNow());
120         endpoint.setMaxIdleTime(_maxIdleTime);
121         super.customize(endpoint, request);
122     }
123 
124     /* ------------------------------------------------------------------------------- */
125     @Override
126     public void persist(EndPoint endpoint) throws IOException
127     {
128         ((SelectChannelEndPoint)endpoint).scheduleIdle();
129         super.persist(endpoint);
130     }
131 
132     /* ------------------------------------------------------------ */
133     public Object getConnection()
134     {
135         return _acceptChannel;
136     }
137 
138     /* ------------------------------------------------------------------------------- */
139     public int getLocalPort()
140     {
141         synchronized(this)
142         {
143             return _localPort;
144         }
145     }
146 
147     /* ------------------------------------------------------------ */
148     public void open() throws IOException
149     {
150         synchronized(this)
151         {
152             if (_acceptChannel == null)
153             {
154                 // Create a new server socket
155                 _acceptChannel = ServerSocketChannel.open();
156                 // Set to blocking mode
157                 _acceptChannel.configureBlocking(true);
158 
159                 // Bind the server socket to the local host and port
160                 _acceptChannel.socket().setReuseAddress(getReuseAddress());
161                 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
162                 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
163 
164                 _localPort=_acceptChannel.socket().getLocalPort();
165                 if (_localPort<=0)
166                     throw new IOException("Server channel not bound");
167 
168             }
169         }
170     }
171 
172     /* ------------------------------------------------------------ */
173     @Override
174     public void setMaxIdleTime(int maxIdleTime)
175     {
176         _manager.setMaxIdleTime(maxIdleTime);
177         super.setMaxIdleTime(maxIdleTime);
178     }
179 
180     /* ------------------------------------------------------------ */
181     /**
182      * @return the lowResourcesConnections
183      */
184     public int getLowResourcesConnections()
185     {
186         return _lowResourcesConnections;
187     }
188 
189     /* ------------------------------------------------------------ */
190     /**
191      * Set the number of connections, which if exceeded places this manager in low resources state.
192      * This is not an exact measure as the connection count is averaged over the select sets.
193      * @param lowResourcesConnections the number of connections
194      * @see #setLowResourcesMaxIdleTime(int)
195      */
196     public void setLowResourcesConnections(int lowResourcesConnections)
197     {
198         _lowResourcesConnections=lowResourcesConnections;
199     }
200 
201     /* ------------------------------------------------------------ */
202     /**
203      * @return the lowResourcesMaxIdleTime
204      */
205     @Override
206     public int getLowResourcesMaxIdleTime()
207     {
208         return _lowResourcesMaxIdleTime;
209     }
210 
211     /* ------------------------------------------------------------ */
212     /**
213      * Set the period in ms that a connection is allowed to be idle when this there are more
214      * than {@link #getLowResourcesConnections()} connections.  This allows the server to rapidly close idle connections
215      * in order to gracefully handle high load situations.
216      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low.
217      * @see #setMaxIdleTime(int)
218      */
219     @Override
220     public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
221     {
222         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
223         super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
224     }
225 
226 
227     /* ------------------------------------------------------------ */
228     /*
229      * @see org.eclipse.jetty.server.server.AbstractConnector#doStart()
230      */
231     @Override
232     protected void doStart() throws Exception
233     {
234         _manager.setSelectSets(getAcceptors());
235         _manager.setMaxIdleTime(getMaxIdleTime());
236         _manager.setLowResourcesConnections(getLowResourcesConnections());
237         _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
238         _manager.start();
239 
240         super.doStart();
241 
242         // start a thread to Select
243         for (int i=0;i<getAcceptors();i++)
244         {
245             final int id=i;
246             _manager.dispatch(new Runnable()
247             {
248                 public void run()
249                 {
250                     String name=Thread.currentThread().getName();
251                     try
252                     {
253                         Thread.currentThread().setName(name+" Selector"+id+" "+SelectChannelConnector.this);
254                         while (isRunning())
255                         {
256                             try
257                             {
258                                 _manager.doSelect(id);
259                             }
260                             catch(ThreadDeath e)
261                             {
262                                 throw e;
263                             }
264                             catch(IOException e)
265                             {
266                                 Log.ignore(e);
267                             }
268                             catch(Exception e)
269                             {
270                                 Log.warn(e);
271                             }
272                         }
273                     }
274                     finally
275                     {
276                         Thread.currentThread().setName(name);
277                     }
278                 }
279             });
280         }
281     }
282 
283     /* ------------------------------------------------------------ */
284     /*
285      * @see org.eclipse.jetty.server.server.AbstractConnector#doStop()
286      */
287     @Override
288     protected void doStop() throws Exception
289     {
290         synchronized(this)
291         {
292             if(_manager.isRunning())
293             {
294                 try
295                 {
296                     _manager.stop();
297                 }
298                 catch (Exception e)
299                 {
300                     Log.warn(e);
301                 }
302             }
303         }
304         super.doStop();
305     }
306 
307     /* ------------------------------------------------------------ */
308     protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
309     {
310         return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
311     }
312 
313     /* ------------------------------------------------------------------------------- */
314     protected void endPointClosed(SelectChannelEndPoint endpoint)
315     {
316         connectionClosed(endpoint.getConnection());
317     }
318 
319     /* ------------------------------------------------------------------------------- */
320     protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
321     {
322         return new SelectChannelHttpConnection(SelectChannelConnector.this,endpoint,getServer(),endpoint);
323     }
324 
325     /* ------------------------------------------------------------ */
326     public void dump(Appendable out, String indent) throws IOException
327     {
328         out.append(String.valueOf(this)).append("\n");
329         ServerSocketChannel channel=_acceptChannel;
330         if (channel==null)
331             AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{null,"CLOSED",_manager}));
332         else
333             AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager}));
334     }
335 
336     /* ------------------------------------------------------------ */
337     /* ------------------------------------------------------------ */
338     /* ------------------------------------------------------------ */
339     private class SelectChannelHttpConnection extends AsyncHttpConnection
340     {
341         private final SelectChannelEndPoint _endpoint;
342 
343         private SelectChannelHttpConnection(Connector connector, EndPoint endpoint, Server server, SelectChannelEndPoint endpoint2)
344         {
345             super(connector,endpoint,server);
346             _endpoint = endpoint2;
347         }
348 
349         /* ------------------------------------------------------------ */
350         @Override
351         public void cancelTimeout(Task task)
352         {
353             _endpoint.getSelectSet().cancelTimeout(task);
354         }
355 
356         /* ------------------------------------------------------------ */
357         @Override
358         public void scheduleTimeout(Task task, long timeoutMs)
359         {
360             _endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
361         }
362     }
363 
364     /* ------------------------------------------------------------ */
365     /* ------------------------------------------------------------ */
366     /* ------------------------------------------------------------ */
367     private final class ConnectorSelectorManager extends SelectorManager
368     {
369         @Override
370         public boolean dispatch(Runnable task)
371         {
372             return getThreadPool().dispatch(task);
373         }
374 
375         @Override
376         protected void endPointClosed(final SelectChannelEndPoint endpoint)
377         {
378             SelectChannelConnector.this.endPointClosed(endpoint);
379         }
380 
381         @Override
382         protected void endPointOpened(SelectChannelEndPoint endpoint)
383         {
384             // TODO handle max connections and low resources
385             connectionOpened(endpoint.getConnection());
386         }
387 
388         @Override
389         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
390         {
391             connectionUpgraded(oldConnection,endpoint.getConnection());
392         }
393 
394         @Override
395         protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
396         {
397             return SelectChannelConnector.this.newConnection(channel,endpoint);
398         }
399 
400         @Override
401         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
402         {
403             return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
404         }
405     }
406 
407 }