View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.server.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.channels.ByteChannel;
20  import java.nio.channels.ServerSocketChannel;
21  import java.nio.channels.SocketChannel;
22  import java.util.Set;
23  
24  import org.eclipse.jetty.http.HttpException;
25  import org.eclipse.jetty.io.Buffer;
26  import org.eclipse.jetty.io.ConnectedEndPoint;
27  import org.eclipse.jetty.io.Connection;
28  import org.eclipse.jetty.io.EndPoint;
29  import org.eclipse.jetty.io.EofException;
30  import org.eclipse.jetty.io.nio.ChannelEndPoint;
31  import org.eclipse.jetty.server.BlockingHttpConnection;
32  import org.eclipse.jetty.server.Request;
33  import org.eclipse.jetty.util.ConcurrentHashSet;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  
37  
38  /* ------------------------------------------------------------------------------- */
39  /**  Blocking NIO connector.
40   * This connector uses efficient NIO buffers with a traditional blocking thread model.
41   * Direct NIO buffers are used and a thread is allocated per connections.
42   *
43   * This connector is best used when there are a few very active connections.
44   *
45   * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
46   *
47   *
48   *
49   */
50  public class BlockingChannelConnector extends AbstractNIOConnector
51  {
52      private static final Logger LOG = Log.getLogger(BlockingChannelConnector.class);
53  
54      private transient ServerSocketChannel _acceptChannel;
55      private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
56  
57  
58      /* ------------------------------------------------------------ */
59      /** Constructor.
60       *
61       */
62      public BlockingChannelConnector()
63      {
64      }
65  
66      /* ------------------------------------------------------------ */
67      public Object getConnection()
68      {
69          return _acceptChannel;
70      }
71  
72      /* ------------------------------------------------------------ */
73      /**
74       * @see org.eclipse.jetty.server.AbstractConnector#doStart()
75       */
76      @Override
77      protected void doStart() throws Exception
78      {
79          super.doStart();
80          getThreadPool().dispatch(new Runnable()
81          {
82  
83              public void run()
84              {
85                  while (isRunning())
86                  {
87                      try
88                      {
89                          Thread.sleep(400);
90                          long now=System.currentTimeMillis();
91                          for (BlockingChannelEndPoint endp : _endpoints)
92                          {
93                              endp.checkIdleTimestamp(now);
94                          }
95                      }
96                      catch(InterruptedException e)
97                      {
98                          LOG.ignore(e);
99                      }
100                     catch(Exception e)
101                     {
102                         LOG.warn(e);
103                     }
104                 }
105             }
106 
107         });
108 
109     }
110 
111 
112     /* ------------------------------------------------------------ */
113     public void open() throws IOException
114     {
115         // Create a new server socket and set to non blocking mode
116         _acceptChannel= ServerSocketChannel.open();
117         _acceptChannel.configureBlocking(true);
118 
119         // Bind the server socket to the local host and port
120         InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
121         _acceptChannel.socket().bind(addr,getAcceptQueueSize());
122     }
123 
124     /* ------------------------------------------------------------ */
125     public void close() throws IOException
126     {
127         if (_acceptChannel != null)
128             _acceptChannel.close();
129         _acceptChannel=null;
130     }
131 
132     /* ------------------------------------------------------------ */
133     @Override
134     public void accept(int acceptorID)
135     	throws IOException, InterruptedException
136     {
137         SocketChannel channel = _acceptChannel.accept();
138         channel.configureBlocking(true);
139         Socket socket=channel.socket();
140         configure(socket);
141 
142         BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
143         connection.dispatch();
144     }
145 
146     /* ------------------------------------------------------------------------------- */
147     @Override
148     public void customize(EndPoint endpoint, Request request)
149         throws IOException
150     {
151         super.customize(endpoint, request);
152         endpoint.setMaxIdleTime(_maxIdleTime);
153         configure(((SocketChannel)endpoint.getTransport()).socket());
154     }
155 
156 
157     /* ------------------------------------------------------------------------------- */
158     public int getLocalPort()
159     {
160         if (_acceptChannel==null || !_acceptChannel.isOpen())
161             return -1;
162         return _acceptChannel.socket().getLocalPort();
163     }
164 
165     /* ------------------------------------------------------------------------------- */
166     /* ------------------------------------------------------------------------------- */
167     /* ------------------------------------------------------------------------------- */
168     private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
169     {
170         private Connection _connection;
171         private int _timeout;
172         private volatile long _idleTimestamp;
173 
174         BlockingChannelEndPoint(ByteChannel channel)
175             throws IOException
176         {
177             super(channel,BlockingChannelConnector.this._maxIdleTime);
178             _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
179         }
180 
181         /* ------------------------------------------------------------ */
182         /** Get the connection.
183          * @return the connection
184          */
185         public Connection getConnection()
186         {
187             return _connection;
188         }
189 
190         /* ------------------------------------------------------------ */
191         public void setConnection(Connection connection)
192         {
193             _connection=connection;
194         }
195 
196         /* ------------------------------------------------------------ */
197         public void checkIdleTimestamp(long now)
198         {
199             if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
200             {
201                 idleExpired();
202             }
203         }
204 
205         /* ------------------------------------------------------------ */
206         protected void idleExpired()
207         {
208             try
209             {
210                 super.close();
211             }
212             catch (IOException e)
213             {
214                 LOG.ignore(e);
215             }
216         }
217 
218         /* ------------------------------------------------------------ */
219         void dispatch() throws IOException
220         {
221             if (!getThreadPool().dispatch(this))
222             {
223                 LOG.warn("dispatch failed for  {}",_connection);
224                 super.close();
225             }
226         }
227 
228         /* ------------------------------------------------------------ */
229         /**
230          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
231          */
232         @Override
233         public int fill(Buffer buffer) throws IOException
234         {
235             _idleTimestamp=System.currentTimeMillis();
236             return super.fill(buffer);
237         }
238 
239         /* ------------------------------------------------------------ */
240         /**
241          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
242          */
243         @Override
244         public int flush(Buffer buffer) throws IOException
245         {
246             _idleTimestamp=System.currentTimeMillis();
247             return super.flush(buffer);
248         }
249 
250         /* ------------------------------------------------------------ */
251         /**
252          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
253          */
254         @Override
255         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
256         {
257             _idleTimestamp=System.currentTimeMillis();
258             return super.flush(header,buffer,trailer);
259         }
260 
261         /* ------------------------------------------------------------ */
262         public void run()
263         {
264             try
265             {
266                 _timeout=getMaxIdleTime();
267                 connectionOpened(_connection);
268                 _endpoints.add(this);
269 
270                 while (isOpen())
271                 {
272                     _idleTimestamp=System.currentTimeMillis();
273                     if (_connection.isIdle())
274                     {
275                         if (getServer().getThreadPool().isLowOnThreads())
276                         {
277                             int lrmit = getLowResourcesMaxIdleTime();
278                             if (lrmit>=0 && _timeout!= lrmit)
279                             {
280                                 _timeout=lrmit;
281                             }
282                         }
283                     }
284                     else
285                     {
286                         if (_timeout!=getMaxIdleTime())
287                         {
288                             _timeout=getMaxIdleTime();
289                         }
290                     }
291 
292                     _connection = _connection.handle();
293 
294                 }
295             }
296             catch (EofException e)
297             {
298                 LOG.debug("EOF", e);
299                 try{BlockingChannelEndPoint.this.close();}
300                 catch(IOException e2){LOG.ignore(e2);}
301             }
302             catch (HttpException e)
303             {
304                 LOG.debug("BAD", e);
305                 try{super.close();}
306                 catch(IOException e2){LOG.ignore(e2);}
307             }
308             catch(Throwable e)
309             {
310                 LOG.warn("handle failed",e);
311                 try{super.close();}
312                 catch(IOException e2){LOG.ignore(e2);}
313             }
314             finally
315             {
316                 connectionClosed(_connection);
317                 _endpoints.remove(this);
318 
319                 // wait for client to close, but if not, close ourselves.
320                 try
321                 {
322                     if (!_socket.isClosed())
323                     {
324                         long timestamp=System.currentTimeMillis();
325                         int max_idle=getMaxIdleTime();
326 
327                         _socket.setSoTimeout(getMaxIdleTime());
328                         int c=0;
329                         do
330                         {
331                             c = _socket.getInputStream().read();
332                         }
333                         while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
334                         if (!_socket.isClosed())
335                             _socket.close();
336                     }
337                 }
338                 catch(IOException e)
339                 {
340                     LOG.ignore(e);
341                 }
342             }
343         }
344     }
345 }