View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13   
14  package org.eclipse.jetty.server.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.channels.ByteChannel;
20  import java.nio.channels.ServerSocketChannel;
21  import java.nio.channels.SocketChannel;
22  import java.util.Set;
23  
24  import org.eclipse.jetty.http.HttpException;
25  import org.eclipse.jetty.io.Buffer;
26  import org.eclipse.jetty.io.ConnectedEndPoint;
27  import org.eclipse.jetty.io.Connection;
28  import org.eclipse.jetty.io.EndPoint;
29  import org.eclipse.jetty.io.EofException;
30  import org.eclipse.jetty.io.nio.ChannelEndPoint;
31  import org.eclipse.jetty.server.BlockingHttpConnection;
32  import org.eclipse.jetty.server.HttpConnection;
33  import org.eclipse.jetty.server.Request;
34  import org.eclipse.jetty.util.ConcurrentHashSet;
35  import org.eclipse.jetty.util.log.Log;
36  
37  
38  /* ------------------------------------------------------------------------------- */
39  /**  Blocking NIO connector.
40   * This connector uses efficient NIO buffers with a traditional blocking thread model.
41   * Direct NIO buffers are used and a thread is allocated per connections.
42   * 
43   * This connector is best used when there are a few very active connections.
44   * 
45   * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
46   * 
47   * 
48   *
49   */
50  public class BlockingChannelConnector extends AbstractNIOConnector 
51  {
52      private transient ServerSocketChannel _acceptChannel;
53      private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
54      
55      
56      /* ------------------------------------------------------------ */
57      /** Constructor.
58       * 
59       */
60      public BlockingChannelConnector()
61      {
62      }
63  
64      /* ------------------------------------------------------------ */
65      public Object getConnection()
66      {
67          return _acceptChannel;
68      }
69  
70      /* ------------------------------------------------------------ */
71      /**
72       * @see org.eclipse.jetty.server.AbstractConnector#doStart()
73       */
74      @Override
75      protected void doStart() throws Exception
76      {
77          super.doStart();
78          getThreadPool().dispatch(new Runnable()
79          {
80  
81              public void run()
82              {
83                  while (isRunning())
84                  {
85                      try
86                      {
87                          Thread.sleep(400);
88                          long now=System.currentTimeMillis();
89                          for (BlockingChannelEndPoint endp : _endpoints)
90                          {
91                              endp.checkIdleTimestamp(now);
92                          }
93                      }
94                      catch(InterruptedException e)
95                      {
96                          Log.ignore(e);
97                      }
98                      catch(Exception e)
99                      {
100                         Log.warn(e);
101                     }
102                 }
103             }
104             
105         });
106         
107     }
108 
109     
110     /* ------------------------------------------------------------ */
111     public void open() throws IOException
112     {
113         // Create a new server socket and set to non blocking mode
114         _acceptChannel= ServerSocketChannel.open();
115         _acceptChannel.configureBlocking(true);
116 
117         // Bind the server socket to the local host and port
118         InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
119         _acceptChannel.socket().bind(addr,getAcceptQueueSize());
120     }
121 
122     /* ------------------------------------------------------------ */
123     public void close() throws IOException
124     {
125         if (_acceptChannel != null)
126             _acceptChannel.close();
127         _acceptChannel=null;
128     }
129     
130     /* ------------------------------------------------------------ */
131     @Override
132     public void accept(int acceptorID)
133     	throws IOException, InterruptedException
134     {   
135         SocketChannel channel = _acceptChannel.accept();
136         channel.configureBlocking(true);
137         Socket socket=channel.socket();
138         configure(socket);
139 
140         BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
141         connection.dispatch();
142     }
143     
144     /* ------------------------------------------------------------------------------- */
145     @Override
146     public void customize(EndPoint endpoint, Request request)
147         throws IOException
148     {
149         super.customize(endpoint, request);
150         endpoint.setMaxIdleTime(_maxIdleTime);
151         configure(((SocketChannel)endpoint.getTransport()).socket());
152     }
153 
154 
155     /* ------------------------------------------------------------------------------- */
156     public int getLocalPort()
157     {
158         if (_acceptChannel==null || !_acceptChannel.isOpen())
159             return -1;
160         return _acceptChannel.socket().getLocalPort();
161     }
162     
163     /* ------------------------------------------------------------------------------- */
164     /* ------------------------------------------------------------------------------- */
165     /* ------------------------------------------------------------------------------- */
166     private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
167     {
168         private Connection _connection;
169         private int _timeout;
170         private volatile long _idleTimestamp;
171         
172         BlockingChannelEndPoint(ByteChannel channel) 
173             throws IOException
174         {
175             super(channel,BlockingChannelConnector.this._maxIdleTime);
176             _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
177         }
178         
179         /* ------------------------------------------------------------ */
180         /** Get the connection.
181          * @return the connection
182          */
183         public Connection getConnection()
184         {
185             return _connection;
186         }
187         
188         /* ------------------------------------------------------------ */
189         public void setConnection(Connection connection)
190         {
191             _connection=connection;
192         }
193 
194         /* ------------------------------------------------------------ */
195         public void checkIdleTimestamp(long now)
196         {
197             if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
198             {
199                 idleExpired();
200             }
201         }
202 
203         /* ------------------------------------------------------------ */
204         protected void idleExpired()
205         {
206             try
207             {
208                 close();
209             }
210             catch (IOException e)
211             {
212                 Log.ignore(e);
213             }
214         }
215         
216         /* ------------------------------------------------------------ */
217         void dispatch() throws IOException
218         {
219             if (!getThreadPool().dispatch(this))
220             {
221                 Log.warn("dispatch failed for  {}",_connection);
222                 BlockingChannelEndPoint.this.close();
223             }
224         }
225         
226         /* ------------------------------------------------------------ */
227         /**
228          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
229          */
230         @Override
231         public int fill(Buffer buffer) throws IOException
232         {
233             _idleTimestamp=System.currentTimeMillis();
234             return super.fill(buffer);
235         }
236 
237         /* ------------------------------------------------------------ */
238         /**
239          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
240          */
241         @Override
242         public int flush(Buffer buffer) throws IOException
243         {
244             _idleTimestamp=System.currentTimeMillis();
245             return super.flush(buffer);
246         }
247 
248         /* ------------------------------------------------------------ */
249         /**
250          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
251          */
252         @Override
253         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
254         {
255             _idleTimestamp=System.currentTimeMillis();
256             return super.flush(header,buffer,trailer);
257         }
258 
259         /* ------------------------------------------------------------ */
260         public void run()
261         {
262             try
263             {
264                 _timeout=getMaxIdleTime();
265                 connectionOpened(_connection);
266                 _endpoints.add(this);
267 
268                 while (isOpen())
269                 {
270                     _idleTimestamp=System.currentTimeMillis();
271                     if (_connection.isIdle())
272                     {
273                         if (getServer().getThreadPool().isLowOnThreads())
274                         {
275                             int lrmit = getLowResourcesMaxIdleTime();
276                             if (lrmit>=0 && _timeout!= lrmit)
277                             {
278                                 _timeout=lrmit;
279                             }
280                         }
281                     }
282                     else
283                     {
284                         if (_timeout!=getMaxIdleTime())
285                         {
286                             _timeout=getMaxIdleTime();
287                         }
288                     }
289                     
290                     _connection = _connection.handle();
291                     
292                 }
293             }
294             catch (EofException e)
295             {
296                 Log.debug("EOF", e);
297                 try{BlockingChannelEndPoint.this.close();}
298                 catch(IOException e2){Log.ignore(e2);}
299             }
300             catch (HttpException e)
301             {
302                 Log.debug("BAD", e);
303                 try{BlockingChannelEndPoint.this.close();}
304                 catch(IOException e2){Log.ignore(e2);}
305             }
306             catch(Throwable e)
307             {
308                 Log.warn("handle failed",e);
309                 try{BlockingChannelEndPoint.this.close();}
310                 catch(IOException e2){Log.ignore(e2);}
311             }
312             finally
313             {
314                 connectionClosed(_connection);
315                 _endpoints.remove(this);
316                 
317                 // wait for client to close, but if not, close ourselves.
318                 try
319                 {
320                     if (!_socket.isClosed())
321                     {
322                         long timestamp=System.currentTimeMillis();
323                         int max_idle=getMaxIdleTime(); 
324 
325                         _socket.setSoTimeout(getMaxIdleTime());
326                         int c=0;
327                         do
328                         {
329                             c = _socket.getInputStream().read();
330                         }
331                         while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
332                         if (!_socket.isClosed())
333                             _socket.close();
334                     }
335                 }
336                 catch(IOException e)
337                 {
338                     Log.ignore(e);
339                 }
340             }
341         }
342     }
343 }