View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.server.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.ServerSocketChannel;
21  import java.nio.channels.SocketChannel;
22  
23  import org.eclipse.jetty.continuation.Continuation;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EndPoint;
27  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
28  import org.eclipse.jetty.io.nio.SelectorManager;
29  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
30  import org.eclipse.jetty.server.HttpConnection;
31  import org.eclipse.jetty.server.Request;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.thread.Timeout.Task;
34  
35  /* ------------------------------------------------------------------------------- */
36  /**
37   * Selecting NIO connector.
38   * <p>
39   * This connector uses efficient NIO buffers with a non blocking threading model. Direct NIO buffers
40   * are used and threads are only allocated to connections with requests. Synchronization is used to
41   * simulate blocking for the servlet API, and any unflushed content at the end of request handling
42   * is written asynchronously.
43   * </p>
44   * <p>
45   * This connector is best used when there are a many connections that have idle periods.
46   * </p>
47   * <p>
48   * When used with {@link org.eclipse.jetty.continuation.Continuation}, threadless waits are supported.
49   * If a filter or servlet returns after calling {@link Continuation#suspend()} or when a
50   * runtime exception is thrown from a call to {@link Continuation#undispatch()}, Jetty will
51   * will not send a response to the client. Instead the thread is released and the Continuation is
52   * placed on the timer queue. If the Continuation timeout expires, or it's
53   * resume method is called, then the request is again allocated a thread and the request is retried.
54   * The limitation of this approach is that request content is not available on the retried request,
55   * thus if possible it should be read after the continuation or saved as a request attribute or as the
56   * associated object of the Continuation instance.
57   * </p>
58   *
59   * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector"
60   */
61  public class SelectChannelConnector extends AbstractNIOConnector
62  {
63      protected ServerSocketChannel _acceptChannel;
64      private int _lowResourcesConnections;
65      private int _lowResourcesMaxIdleTime;
66      private int _localPort=-1;
67  
68      private final SelectorManager _manager = new SelectorManager()
69      {
70          @Override
71          protected SocketChannel acceptChannel(SelectionKey key) throws IOException
72          {
73              // TODO handle max connections
74              SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
75              if (channel==null)
76                  return null;
77              channel.configureBlocking(false);
78              Socket socket = channel.socket();
79              configure(socket);
80              return channel;
81          }
82  
83          @Override
84          public boolean dispatch(Runnable task)
85          {
86              return getThreadPool().dispatch(task);
87          }
88  
89          @Override
90          protected void endPointClosed(final SelectChannelEndPoint endpoint)
91          {
92              connectionClosed(endpoint.getConnection());
93          }
94  
95          @Override
96          protected void endPointOpened(SelectChannelEndPoint endpoint)
97          {
98              // TODO handle max connections and low resources
99              connectionOpened(endpoint.getConnection());
100         }
101 
102         @Override
103         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
104         {
105             connectionUpgraded(oldConnection,endpoint.getConnection());
106         }
107 
108         @Override
109         protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
110         {
111             return SelectChannelConnector.this.newConnection(channel,endpoint);
112         }
113 
114         @Override
115         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
116         {
117             return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
118         }
119     };
120 
121     /* ------------------------------------------------------------------------------- */
122     /**
123      * Constructor.
124      *
125      */
126     public SelectChannelConnector()
127     {
128     }
129 
130     /* ------------------------------------------------------------ */
131     @Override
132     public void accept(int acceptorID) throws IOException
133     {
134         _manager.doSelect(acceptorID);
135     }
136 
137     /* ------------------------------------------------------------ */
138     public void close() throws IOException
139     {
140         synchronized(this)
141         {
142             if(_manager.isRunning())
143             {
144                 try
145                 {
146                     _manager.stop();
147                 }
148                 catch (Exception e)
149                 {
150                     Log.warn(e);
151                 }
152             }
153             if (_acceptChannel != null)
154                 _acceptChannel.close();
155             _acceptChannel = null;
156             _localPort=-2;
157         }
158     }
159 
160     /* ------------------------------------------------------------------------------- */
161     @Override
162     public void customize(EndPoint endpoint, Request request) throws IOException
163     {
164         SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
165         cep.cancelIdle();
166         request.setTimeStamp(cep.getSelectSet().getNow());
167         super.customize(endpoint, request);
168     }
169 
170     /* ------------------------------------------------------------------------------- */
171     @Override
172     public void persist(EndPoint endpoint) throws IOException
173     {
174         ((SelectChannelEndPoint)endpoint).scheduleIdle();
175         super.persist(endpoint);
176     }
177 
178     /* ------------------------------------------------------------ */
179     public Object getConnection()
180     {
181         return _acceptChannel;
182     }
183 
184     /* ------------------------------------------------------------------------------- */
185     public int getLocalPort()
186     {
187         synchronized(this)
188         {
189             return _localPort;
190         }
191     }
192 
193     /* ------------------------------------------------------------ */
194     public void open() throws IOException
195     {
196         synchronized(this)
197         {
198             if (_acceptChannel == null)
199             {
200                 // Create a new server socket
201                 _acceptChannel = ServerSocketChannel.open();
202                 // Set to blocking mode
203                 _acceptChannel.configureBlocking(true);
204 
205                 // Bind the server socket to the local host and port
206                 _acceptChannel.socket().setReuseAddress(getReuseAddress());
207                 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
208                 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
209 
210                 _localPort=_acceptChannel.socket().getLocalPort();
211                 if (_localPort<=0)
212                     throw new IOException("Server channel not bound");
213 
214                 // Set to non blocking mode
215                 _acceptChannel.configureBlocking(false);
216 
217             }
218         }
219     }
220 
221     /* ------------------------------------------------------------ */
222     @Override
223     public void setMaxIdleTime(int maxIdleTime)
224     {
225         _manager.setMaxIdleTime(maxIdleTime);
226         super.setMaxIdleTime(maxIdleTime);
227     }
228 
229     /* ------------------------------------------------------------ */
230     /**
231      * @return the lowResourcesConnections
232      */
233     public int getLowResourcesConnections()
234     {
235         return _lowResourcesConnections;
236     }
237 
238     /* ------------------------------------------------------------ */
239     /**
240      * Set the number of connections, which if exceeded places this manager in low resources state.
241      * This is not an exact measure as the connection count is averaged over the select sets.
242      * @param lowResourcesConnections the number of connections
243      * @see #setLowResourcesMaxIdleTime(int)
244      */
245     public void setLowResourcesConnections(int lowResourcesConnections)
246     {
247         _lowResourcesConnections=lowResourcesConnections;
248     }
249 
250     /* ------------------------------------------------------------ */
251     /**
252      * @return the lowResourcesMaxIdleTime
253      */
254     @Override
255     public int getLowResourcesMaxIdleTime()
256     {
257         return _lowResourcesMaxIdleTime;
258     }
259 
260     /* ------------------------------------------------------------ */
261     /**
262      * Set the period in ms that a connection is allowed to be idle when this there are more
263      * than {@link #getLowResourcesConnections()} connections.  This allows the server to rapidly close idle connections
264      * in order to gracefully handle high load situations.
265      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low.
266      * @see #setMaxIdleTime(int)
267      */
268     @Override
269     public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
270     {
271         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
272         super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
273     }
274 
275 
276     /* ------------------------------------------------------------ */
277     /*
278      * @see org.eclipse.jetty.server.server.AbstractConnector#doStart()
279      */
280     @Override
281     protected void doStart() throws Exception
282     {
283         _manager.setSelectSets(getAcceptors());
284         _manager.setMaxIdleTime(getMaxIdleTime());
285         _manager.setLowResourcesConnections(getLowResourcesConnections());
286         _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
287         _manager.start();
288         open();
289         _manager.register(_acceptChannel);
290         super.doStart();
291     }
292 
293     /* ------------------------------------------------------------ */
294     /*
295      * @see org.eclipse.jetty.server.server.AbstractConnector#doStop()
296      */
297     @Override
298     protected void doStop() throws Exception
299     {
300         super.doStop();
301     }
302 
303     /* ------------------------------------------------------------ */
304     protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
305     {
306         return new SelectChannelEndPoint(channel,selectSet,key);
307     }
308 
309     /* ------------------------------------------------------------------------------- */
310     protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
311     {
312         return new HttpConnection(SelectChannelConnector.this,endpoint,getServer())
313         {
314             /* ------------------------------------------------------------ */
315             @Override
316             public void cancelTimeout(Task task)
317             {
318                 endpoint.getSelectSet().cancelTimeout(task);
319             }
320 
321             /* ------------------------------------------------------------ */
322             @Override
323             public void scheduleTimeout(Task task, long timeoutMs)
324             {
325                 endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
326             }
327         };
328     }
329 
330     /* ------------------------------------------------------------------------------- */
331     public void dump()
332     {
333         Log.info("channel "+_acceptChannel+(_acceptChannel.isOpen()?" is open":" is closed"));
334         _manager.dump();
335     }
336 }