View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13   
14  package org.eclipse.jetty.server.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.channels.ByteChannel;
20  import java.nio.channels.ServerSocketChannel;
21  import java.nio.channels.SocketChannel;
22  import java.util.Set;
23  
24  import org.eclipse.jetty.http.HttpException;
25  import org.eclipse.jetty.io.Buffer;
26  import org.eclipse.jetty.io.ConnectedEndPoint;
27  import org.eclipse.jetty.io.Connection;
28  import org.eclipse.jetty.io.EndPoint;
29  import org.eclipse.jetty.io.EofException;
30  import org.eclipse.jetty.io.nio.ChannelEndPoint;
31  import org.eclipse.jetty.server.BlockingHttpConnection;
32  import org.eclipse.jetty.server.HttpConnection;
33  import org.eclipse.jetty.server.Request;
34  import org.eclipse.jetty.util.ConcurrentHashSet;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  
38  
39  /* ------------------------------------------------------------------------------- */
40  /**  Blocking NIO connector.
41   * This connector uses efficient NIO buffers with a traditional blocking thread model.
42   * Direct NIO buffers are used and a thread is allocated per connections.
43   * 
44   * This connector is best used when there are a few very active connections.
45   * 
46   * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
47   * 
48   * 
49   *
50   */
51  public class BlockingChannelConnector extends AbstractNIOConnector 
52  {
53      private static final Logger LOG = Log.getLogger(BlockingChannelConnector.class);
54  
55      private transient ServerSocketChannel _acceptChannel;
56      private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
57      
58      
59      /* ------------------------------------------------------------ */
60      /** Constructor.
61       * 
62       */
63      public BlockingChannelConnector()
64      {
65      }
66  
67      /* ------------------------------------------------------------ */
68      public Object getConnection()
69      {
70          return _acceptChannel;
71      }
72  
73      /* ------------------------------------------------------------ */
74      /**
75       * @see org.eclipse.jetty.server.AbstractConnector#doStart()
76       */
77      @Override
78      protected void doStart() throws Exception
79      {
80          super.doStart();
81          getThreadPool().dispatch(new Runnable()
82          {
83  
84              public void run()
85              {
86                  while (isRunning())
87                  {
88                      try
89                      {
90                          Thread.sleep(400);
91                          long now=System.currentTimeMillis();
92                          for (BlockingChannelEndPoint endp : _endpoints)
93                          {
94                              endp.checkIdleTimestamp(now);
95                          }
96                      }
97                      catch(InterruptedException e)
98                      {
99                          LOG.ignore(e);
100                     }
101                     catch(Exception e)
102                     {
103                         LOG.warn(e);
104                     }
105                 }
106             }
107             
108         });
109         
110     }
111 
112     
113     /* ------------------------------------------------------------ */
114     public void open() throws IOException
115     {
116         // Create a new server socket and set to non blocking mode
117         _acceptChannel= ServerSocketChannel.open();
118         _acceptChannel.configureBlocking(true);
119 
120         // Bind the server socket to the local host and port
121         InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
122         _acceptChannel.socket().bind(addr,getAcceptQueueSize());
123     }
124 
125     /* ------------------------------------------------------------ */
126     public void close() throws IOException
127     {
128         if (_acceptChannel != null)
129             _acceptChannel.close();
130         _acceptChannel=null;
131     }
132     
133     /* ------------------------------------------------------------ */
134     @Override
135     public void accept(int acceptorID)
136     	throws IOException, InterruptedException
137     {   
138         SocketChannel channel = _acceptChannel.accept();
139         channel.configureBlocking(true);
140         Socket socket=channel.socket();
141         configure(socket);
142 
143         BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
144         connection.dispatch();
145     }
146     
147     /* ------------------------------------------------------------------------------- */
148     @Override
149     public void customize(EndPoint endpoint, Request request)
150         throws IOException
151     {
152         super.customize(endpoint, request);
153         endpoint.setMaxIdleTime(_maxIdleTime);
154         configure(((SocketChannel)endpoint.getTransport()).socket());
155     }
156 
157 
158     /* ------------------------------------------------------------------------------- */
159     public int getLocalPort()
160     {
161         if (_acceptChannel==null || !_acceptChannel.isOpen())
162             return -1;
163         return _acceptChannel.socket().getLocalPort();
164     }
165     
166     /* ------------------------------------------------------------------------------- */
167     /* ------------------------------------------------------------------------------- */
168     /* ------------------------------------------------------------------------------- */
169     private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
170     {
171         private Connection _connection;
172         private int _timeout;
173         private volatile long _idleTimestamp;
174         
175         BlockingChannelEndPoint(ByteChannel channel) 
176             throws IOException
177         {
178             super(channel,BlockingChannelConnector.this._maxIdleTime);
179             _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
180         }
181         
182         /* ------------------------------------------------------------ */
183         /** Get the connection.
184          * @return the connection
185          */
186         public Connection getConnection()
187         {
188             return _connection;
189         }
190         
191         /* ------------------------------------------------------------ */
192         public void setConnection(Connection connection)
193         {
194             _connection=connection;
195         }
196 
197         /* ------------------------------------------------------------ */
198         public void checkIdleTimestamp(long now)
199         {
200             if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
201             {
202                 idleExpired();
203             }
204         }
205 
206         /* ------------------------------------------------------------ */
207         protected void idleExpired()
208         {
209             try
210             {
211                 close();
212             }
213             catch (IOException e)
214             {
215                 LOG.ignore(e);
216             }
217         }
218         
219         /* ------------------------------------------------------------ */
220         void dispatch() throws IOException
221         {
222             if (!getThreadPool().dispatch(this))
223             {
224                 LOG.warn("dispatch failed for  {}",_connection);
225                 BlockingChannelEndPoint.this.close();
226             }
227         }
228         
229         /* ------------------------------------------------------------ */
230         /**
231          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
232          */
233         @Override
234         public int fill(Buffer buffer) throws IOException
235         {
236             _idleTimestamp=System.currentTimeMillis();
237             return super.fill(buffer);
238         }
239 
240         /* ------------------------------------------------------------ */
241         /**
242          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
243          */
244         @Override
245         public int flush(Buffer buffer) throws IOException
246         {
247             _idleTimestamp=System.currentTimeMillis();
248             return super.flush(buffer);
249         }
250 
251         /* ------------------------------------------------------------ */
252         /**
253          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
254          */
255         @Override
256         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
257         {
258             _idleTimestamp=System.currentTimeMillis();
259             return super.flush(header,buffer,trailer);
260         }
261 
262         /* ------------------------------------------------------------ */
263         public void run()
264         {
265             try
266             {
267                 _timeout=getMaxIdleTime();
268                 connectionOpened(_connection);
269                 _endpoints.add(this);
270 
271                 while (isOpen())
272                 {
273                     _idleTimestamp=System.currentTimeMillis();
274                     if (_connection.isIdle())
275                     {
276                         if (getServer().getThreadPool().isLowOnThreads())
277                         {
278                             int lrmit = getLowResourcesMaxIdleTime();
279                             if (lrmit>=0 && _timeout!= lrmit)
280                             {
281                                 _timeout=lrmit;
282                             }
283                         }
284                     }
285                     else
286                     {
287                         if (_timeout!=getMaxIdleTime())
288                         {
289                             _timeout=getMaxIdleTime();
290                         }
291                     }
292                     
293                     _connection = _connection.handle();
294                     
295                 }
296             }
297             catch (EofException e)
298             {
299                 LOG.debug("EOF", e);
300                 try{BlockingChannelEndPoint.this.close();}
301                 catch(IOException e2){LOG.ignore(e2);}
302             }
303             catch (HttpException e)
304             {
305                 LOG.debug("BAD", e);
306                 try{BlockingChannelEndPoint.this.close();}
307                 catch(IOException e2){LOG.ignore(e2);}
308             }
309             catch(Throwable e)
310             {
311                 LOG.warn("handle failed",e);
312                 try{BlockingChannelEndPoint.this.close();}
313                 catch(IOException e2){LOG.ignore(e2);}
314             }
315             finally
316             {
317                 connectionClosed(_connection);
318                 _endpoints.remove(this);
319                 
320                 // wait for client to close, but if not, close ourselves.
321                 try
322                 {
323                     if (!_socket.isClosed())
324                     {
325                         long timestamp=System.currentTimeMillis();
326                         int max_idle=getMaxIdleTime(); 
327 
328                         _socket.setSoTimeout(getMaxIdleTime());
329                         int c=0;
330                         do
331                         {
332                             c = _socket.getInputStream().read();
333                         }
334                         while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
335                         if (!_socket.isClosed())
336                             _socket.close();
337                     }
338                 }
339                 catch(IOException e)
340                 {
341                     LOG.ignore(e);
342                 }
343             }
344         }
345     }
346 }