View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13   
14  package org.eclipse.jetty.server.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.channels.ByteChannel;
20  import java.nio.channels.ServerSocketChannel;
21  import java.nio.channels.SocketChannel;
22  import java.util.Collections;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentHashMap;
25  
26  import org.eclipse.jetty.http.HttpException;
27  import org.eclipse.jetty.io.Buffer;
28  import org.eclipse.jetty.io.ConnectedEndPoint;
29  import org.eclipse.jetty.io.Connection;
30  import org.eclipse.jetty.io.EndPoint;
31  import org.eclipse.jetty.io.EofException;
32  import org.eclipse.jetty.io.nio.ChannelEndPoint;
33  import org.eclipse.jetty.server.HttpConnection;
34  import org.eclipse.jetty.server.Request;
35  import org.eclipse.jetty.util.ConcurrentHashSet;
36  import org.eclipse.jetty.util.log.Log;
37  
38  
39  /* ------------------------------------------------------------------------------- */
40  /**  Blocking NIO connector.
41   * This connector uses efficient NIO buffers with a traditional blocking thread model.
42   * Direct NIO buffers are used and a thread is allocated per connections.
43   * 
44   * This connector is best used when there are a few very active connections.
45   * 
46   * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
47   * 
48   * 
49   *
50   */
51  public class BlockingChannelConnector extends AbstractNIOConnector 
52  {
53      private transient ServerSocketChannel _acceptChannel;
54      private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
55      
56      
57      /* ------------------------------------------------------------ */
58      /** Constructor.
59       * 
60       */
61      public BlockingChannelConnector()
62      {
63      }
64  
65      /* ------------------------------------------------------------ */
66      public Object getConnection()
67      {
68          return _acceptChannel;
69      }
70  
71      /* ------------------------------------------------------------ */
72      /**
73       * @see org.eclipse.jetty.server.AbstractConnector#doStart()
74       */
75      @Override
76      protected void doStart() throws Exception
77      {
78          super.doStart();
79          getThreadPool().dispatch(new Runnable()
80          {
81  
82              public void run()
83              {
84                  while (isRunning())
85                  {
86                      try
87                      {
88                          Thread.sleep(400);
89                          long now=System.currentTimeMillis();
90                          for (BlockingChannelEndPoint endp : _endpoints)
91                          {
92                              endp.checkIdleTimestamp(now);
93                          }
94                      }
95                      catch(InterruptedException e)
96                      {
97                          Log.ignore(e);
98                      }
99                      catch(Exception e)
100                     {
101                         Log.warn(e);
102                     }
103                 }
104             }
105             
106         });
107         
108     }
109 
110     
111     /* ------------------------------------------------------------ */
112     public void open() throws IOException
113     {
114         // Create a new server socket and set to non blocking mode
115         _acceptChannel= ServerSocketChannel.open();
116         _acceptChannel.configureBlocking(true);
117 
118         // Bind the server socket to the local host and port
119         InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
120         _acceptChannel.socket().bind(addr,getAcceptQueueSize());
121     }
122 
123     /* ------------------------------------------------------------ */
124     public void close() throws IOException
125     {
126         if (_acceptChannel != null)
127             _acceptChannel.close();
128         _acceptChannel=null;
129     }
130     
131     /* ------------------------------------------------------------ */
132     @Override
133     public void accept(int acceptorID)
134     	throws IOException, InterruptedException
135     {   
136         SocketChannel channel = _acceptChannel.accept();
137         channel.configureBlocking(true);
138         Socket socket=channel.socket();
139         configure(socket);
140 
141         BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
142         connection.dispatch();
143     }
144     
145     /* ------------------------------------------------------------------------------- */
146     @Override
147     public void customize(EndPoint endpoint, Request request)
148         throws IOException
149     {
150         super.customize(endpoint, request);
151         endpoint.setMaxIdleTime(_maxIdleTime);
152         configure(((SocketChannel)endpoint.getTransport()).socket());
153     }
154 
155 
156     /* ------------------------------------------------------------------------------- */
157     public int getLocalPort()
158     {
159         if (_acceptChannel==null || !_acceptChannel.isOpen())
160             return -1;
161         return _acceptChannel.socket().getLocalPort();
162     }
163     
164     /* ------------------------------------------------------------------------------- */
165     /* ------------------------------------------------------------------------------- */
166     /* ------------------------------------------------------------------------------- */
167     private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
168     {
169         private Connection _connection;
170         private int _timeout;
171         private volatile long _idleTimestamp;
172         
173         BlockingChannelEndPoint(ByteChannel channel) 
174             throws IOException
175         {
176             super(channel,BlockingChannelConnector.this._maxIdleTime);
177             _connection = new HttpConnection(BlockingChannelConnector.this,this,getServer());
178         }
179         
180         /* ------------------------------------------------------------ */
181         /** Get the connection.
182          * @return the connection
183          */
184         public Connection getConnection()
185         {
186             return _connection;
187         }
188         
189         /* ------------------------------------------------------------ */
190         public void setConnection(Connection connection)
191         {
192             _connection=connection;
193         }
194 
195         /* ------------------------------------------------------------ */
196         public void checkIdleTimestamp(long now)
197         {
198             if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
199             {
200                 idleExpired();
201             }
202         }
203 
204         /* ------------------------------------------------------------ */
205         protected void idleExpired()
206         {
207             try
208             {
209                 close();
210             }
211             catch (IOException e)
212             {
213                 Log.ignore(e);
214             }
215         }
216         
217         /* ------------------------------------------------------------ */
218         void dispatch() throws IOException
219         {
220             if (!getThreadPool().dispatch(this))
221             {
222                 Log.warn("dispatch failed for  {}",_connection);
223                 BlockingChannelEndPoint.this.close();
224             }
225         }
226         
227         /* ------------------------------------------------------------ */
228         /**
229          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
230          */
231         @Override
232         public int fill(Buffer buffer) throws IOException
233         {
234             _idleTimestamp=System.currentTimeMillis();
235             return super.fill(buffer);
236         }
237 
238         /* ------------------------------------------------------------ */
239         /**
240          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
241          */
242         @Override
243         public int flush(Buffer buffer) throws IOException
244         {
245             _idleTimestamp=System.currentTimeMillis();
246             return super.flush(buffer);
247         }
248 
249         /* ------------------------------------------------------------ */
250         /**
251          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
252          */
253         @Override
254         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
255         {
256             _idleTimestamp=System.currentTimeMillis();
257             return super.flush(header,buffer,trailer);
258         }
259 
260         /* ------------------------------------------------------------ */
261         public void run()
262         {
263             try
264             {
265                 _timeout=getMaxIdleTime();
266                 connectionOpened(_connection);
267                 _endpoints.add(this);
268 
269                 while (isOpen())
270                 {
271                     _idleTimestamp=System.currentTimeMillis();
272                     if (_connection.isIdle())
273                     {
274                         if (getServer().getThreadPool().isLowOnThreads())
275                         {
276                             int lrmit = getLowResourcesMaxIdleTime();
277                             if (lrmit>=0 && _timeout!= lrmit)
278                             {
279                                 _timeout=lrmit;
280                             }
281                         }
282                     }
283                     else
284                     {
285                         if (_timeout!=getMaxIdleTime())
286                         {
287                             _timeout=getMaxIdleTime();
288                         }
289                     }
290                     
291                     _connection = _connection.handle();
292                     
293                 }
294             }
295             catch (EofException e)
296             {
297                 Log.debug("EOF", e);
298                 try{BlockingChannelEndPoint.this.close();}
299                 catch(IOException e2){Log.ignore(e2);}
300             }
301             catch (HttpException e)
302             {
303                 Log.debug("BAD", e);
304                 try{BlockingChannelEndPoint.this.close();}
305                 catch(IOException e2){Log.ignore(e2);}
306             }
307             catch(Throwable e)
308             {
309                 Log.warn("handle failed",e);
310                 try{BlockingChannelEndPoint.this.close();}
311                 catch(IOException e2){Log.ignore(e2);}
312             }
313             finally
314             {
315                 connectionClosed(_connection);
316                 _endpoints.remove(this);
317                 
318                 // wait for client to close, but if not, close ourselves.
319                 try
320                 {
321                     if (!_socket.isClosed())
322                     {
323                         long timestamp=System.currentTimeMillis();
324                         int max_idle=getMaxIdleTime(); 
325 
326                         _socket.setSoTimeout(getMaxIdleTime());
327                         int c=0;
328                         do
329                         {
330                             c = _socket.getInputStream().read();
331                         }
332                         while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
333                         if (!_socket.isClosed())
334                             _socket.close();
335                     }
336                 }
337                 catch(IOException e)
338                 {
339                     Log.ignore(e);
340                 }
341             }
342         }
343     }
344 }