View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.server.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.ServerSocketChannel;
21  import java.nio.channels.SocketChannel;
22  import java.util.Arrays;
23  import java.util.Collections;
24  
25  import org.eclipse.jetty.continuation.Continuation;
26  import org.eclipse.jetty.io.ConnectedEndPoint;
27  import org.eclipse.jetty.io.Connection;
28  import org.eclipse.jetty.io.EndPoint;
29  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
30  import org.eclipse.jetty.io.nio.SelectorManager;
31  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
32  import org.eclipse.jetty.server.HttpConnection;
33  import org.eclipse.jetty.server.Request;
34  import org.eclipse.jetty.util.component.AggregateLifeCycle;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.thread.Timeout.Task;
37  
38  /* ------------------------------------------------------------------------------- */
39  /**
40   * Selecting NIO connector.
41   * <p>
42   * This connector uses efficient NIO buffers with a non blocking threading model. Direct NIO buffers
43   * are used and threads are only allocated to connections with requests. Synchronization is used to
44   * simulate blocking for the servlet API, and any unflushed content at the end of request handling
45   * is written asynchronously.
46   * </p>
47   * <p>
48   * This connector is best used when there are a many connections that have idle periods.
49   * </p>
50   * <p>
51   * When used with {@link org.eclipse.jetty.continuation.Continuation}, threadless waits are supported.
52   * If a filter or servlet returns after calling {@link Continuation#suspend()} or when a
53   * runtime exception is thrown from a call to {@link Continuation#undispatch()}, Jetty will
54   * will not send a response to the client. Instead the thread is released and the Continuation is
55   * placed on the timer queue. If the Continuation timeout expires, or it's
56   * resume method is called, then the request is again allocated a thread and the request is retried.
57   * The limitation of this approach is that request content is not available on the retried request,
58   * thus if possible it should be read after the continuation or saved as a request attribute or as the
59   * associated object of the Continuation instance.
60   * </p>
61   *
62   * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector"
63   */
64  public class SelectChannelConnector extends AbstractNIOConnector
65  {
66      protected ServerSocketChannel _acceptChannel;
67      private int _lowResourcesConnections;
68      private int _lowResourcesMaxIdleTime;
69      private int _localPort=-1;
70  
71      private final SelectorManager _manager = new ConnectorSelectorManager();
72  
73      /* ------------------------------------------------------------------------------- */
74      /**
75       * Constructor.
76       *
77       */
78      public SelectChannelConnector()
79      {
80          _manager.setMaxIdleTime(getMaxIdleTime());
81      }
82  
83      /* ------------------------------------------------------------ */
84      @Override
85      public void accept(int acceptorID) throws IOException
86      {
87          _manager.doSelect(acceptorID);
88      }
89  
90      /* ------------------------------------------------------------ */
91      public void close() throws IOException
92      {
93          synchronized(this)
94          {
95              if(_manager.isRunning())
96              {
97                  try
98                  {
99                      _manager.stop();
100                 }
101                 catch (Exception e)
102                 {
103                     Log.warn(e);
104                 }
105             }
106             if (_acceptChannel != null)
107                 _acceptChannel.close();
108             _acceptChannel = null;
109             _localPort=-2;
110         }
111     }
112 
113     /* ------------------------------------------------------------------------------- */
114     @Override
115     public void customize(EndPoint endpoint, Request request) throws IOException
116     {
117         SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
118         cep.cancelIdle();
119         request.setTimeStamp(cep.getSelectSet().getNow());
120         endpoint.setMaxIdleTime(_maxIdleTime);
121         super.customize(endpoint, request);
122     }
123 
124     /* ------------------------------------------------------------------------------- */
125     @Override
126     public void persist(EndPoint endpoint) throws IOException
127     {
128         ((SelectChannelEndPoint)endpoint).scheduleIdle();
129         super.persist(endpoint);
130     }
131 
132     /* ------------------------------------------------------------ */
133     public Object getConnection()
134     {
135         return _acceptChannel;
136     }
137 
138     /* ------------------------------------------------------------------------------- */
139     public int getLocalPort()
140     {
141         synchronized(this)
142         {
143             return _localPort;
144         }
145     }
146 
147     /* ------------------------------------------------------------ */
148     public void open() throws IOException
149     {
150         synchronized(this)
151         {
152             if (_acceptChannel == null)
153             {
154                 // Create a new server socket
155                 _acceptChannel = ServerSocketChannel.open();
156                 // Set to blocking mode
157                 _acceptChannel.configureBlocking(true);
158 
159                 // Bind the server socket to the local host and port
160                 _acceptChannel.socket().setReuseAddress(getReuseAddress());
161                 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
162                 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
163 
164                 _localPort=_acceptChannel.socket().getLocalPort();
165                 if (_localPort<=0)
166                     throw new IOException("Server channel not bound");
167 
168             }
169         }
170     }
171 
172     /* ------------------------------------------------------------ */
173     @Override
174     public void setMaxIdleTime(int maxIdleTime)
175     {
176         _manager.setMaxIdleTime(maxIdleTime);
177         super.setMaxIdleTime(maxIdleTime);
178     }
179 
180     /* ------------------------------------------------------------ */
181     /**
182      * @return the lowResourcesConnections
183      */
184     public int getLowResourcesConnections()
185     {
186         return _lowResourcesConnections;
187     }
188 
189     /* ------------------------------------------------------------ */
190     /**
191      * Set the number of connections, which if exceeded places this manager in low resources state.
192      * This is not an exact measure as the connection count is averaged over the select sets.
193      * @param lowResourcesConnections the number of connections
194      * @see #setLowResourcesMaxIdleTime(int)
195      */
196     public void setLowResourcesConnections(int lowResourcesConnections)
197     {
198         _lowResourcesConnections=lowResourcesConnections;
199     }
200 
201     /* ------------------------------------------------------------ */
202     /**
203      * @return the lowResourcesMaxIdleTime
204      */
205     @Override
206     public int getLowResourcesMaxIdleTime()
207     {
208         return _lowResourcesMaxIdleTime;
209     }
210 
211     /* ------------------------------------------------------------ */
212     /**
213      * Set the period in ms that a connection is allowed to be idle when this there are more
214      * than {@link #getLowResourcesConnections()} connections.  This allows the server to rapidly close idle connections
215      * in order to gracefully handle high load situations.
216      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low.
217      * @see #setMaxIdleTime(int)
218      */
219     @Override
220     public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
221     {
222         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
223         super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
224     }
225 
226 
227     /* ------------------------------------------------------------ */
228     /*
229      * @see org.eclipse.jetty.server.server.AbstractConnector#doStart()
230      */
231     @Override
232     protected void doStart() throws Exception
233     {
234         _manager.setSelectSets(getAcceptors());
235         _manager.setMaxIdleTime(getMaxIdleTime());
236         _manager.setLowResourcesConnections(getLowResourcesConnections());
237         _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
238         _manager.start();
239         
240         super.doStart();
241         
242         // start a thread to accept new connections
243         _manager.dispatch(new Runnable()
244         {
245             public void run()
246             {
247                 final ServerSocketChannel server=_acceptChannel;
248                 while (isRunning() && _acceptChannel==server && server.isOpen())
249                 {
250                     try
251                     {
252                         SocketChannel channel = server.accept();
253                         channel.configureBlocking(false);
254                         Socket socket = channel.socket();
255                         configure(socket);
256                         _manager.register(channel);
257                     }
258                     catch(IOException e)
259                     {
260                         Log.ignore(e);
261                     }
262                 }
263             }
264         });
265         
266     }
267 
268     /* ------------------------------------------------------------ */
269     /*
270      * @see org.eclipse.jetty.server.server.AbstractConnector#doStop()
271      */
272     @Override
273     protected void doStop() throws Exception
274     {
275         super.doStop();
276     }
277 
278     /* ------------------------------------------------------------ */
279     protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
280     {
281         return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
282     }
283 
284     /* ------------------------------------------------------------------------------- */
285     protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
286     {
287         return new HttpConnection(SelectChannelConnector.this,endpoint,getServer())
288         {
289             /* ------------------------------------------------------------ */
290             @Override
291             public void cancelTimeout(Task task)
292             {
293                 endpoint.getSelectSet().cancelTimeout(task);
294             }
295 
296             /* ------------------------------------------------------------ */
297             @Override
298             public void scheduleTimeout(Task task, long timeoutMs)
299             {
300                 endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
301             }
302         };
303     }
304 
305     /* ------------------------------------------------------------ */
306     public void dump(Appendable out, String indent) throws IOException
307     {
308         out.append(String.valueOf(this)).append("\n");
309         AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager}));
310     }
311 
312     /* ------------------------------------------------------------ */
313     /* ------------------------------------------------------------ */
314     /* ------------------------------------------------------------ */
315     private final class ConnectorSelectorManager extends SelectorManager
316     {
317         @Override
318         public boolean dispatch(Runnable task)
319         {
320             return getThreadPool().dispatch(task);
321         }
322 
323         @Override
324         protected void endPointClosed(final SelectChannelEndPoint endpoint)
325         {
326             connectionClosed(endpoint.getConnection());
327         }
328 
329         @Override
330         protected void endPointOpened(SelectChannelEndPoint endpoint)
331         {
332             // TODO handle max connections and low resources
333             connectionOpened(endpoint.getConnection());
334         }
335 
336         @Override
337         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
338         {
339             connectionUpgraded(oldConnection,endpoint.getConnection());
340         }
341 
342         @Override
343         protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
344         {
345             return SelectChannelConnector.this.newConnection(channel,endpoint);
346         }
347 
348         @Override
349         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
350         {
351             return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
352         }
353     }
354 
355 }