View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13   
14  package org.eclipse.jetty.server.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.channels.ByteChannel;
20  import java.nio.channels.ServerSocketChannel;
21  import java.nio.channels.SocketChannel;
22  import java.util.Set;
23  
24  import org.eclipse.jetty.http.HttpException;
25  import org.eclipse.jetty.io.Buffer;
26  import org.eclipse.jetty.io.ConnectedEndPoint;
27  import org.eclipse.jetty.io.Connection;
28  import org.eclipse.jetty.io.EndPoint;
29  import org.eclipse.jetty.io.EofException;
30  import org.eclipse.jetty.io.nio.ChannelEndPoint;
31  import org.eclipse.jetty.server.HttpConnection;
32  import org.eclipse.jetty.server.Request;
33  import org.eclipse.jetty.util.ConcurrentHashSet;
34  import org.eclipse.jetty.util.log.Log;
35  
36  
37  /* ------------------------------------------------------------------------------- */
38  /**  Blocking NIO connector.
39   * This connector uses efficient NIO buffers with a traditional blocking thread model.
40   * Direct NIO buffers are used and a thread is allocated per connections.
41   * 
42   * This connector is best used when there are a few very active connections.
43   * 
44   * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
45   * 
46   * 
47   *
48   */
49  public class BlockingChannelConnector extends AbstractNIOConnector 
50  {
51      private transient ServerSocketChannel _acceptChannel;
52      private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
53      
54      
55      /* ------------------------------------------------------------ */
56      /** Constructor.
57       * 
58       */
59      public BlockingChannelConnector()
60      {
61      }
62  
63      /* ------------------------------------------------------------ */
64      public Object getConnection()
65      {
66          return _acceptChannel;
67      }
68  
69      /* ------------------------------------------------------------ */
70      /**
71       * @see org.eclipse.jetty.server.AbstractConnector#doStart()
72       */
73      @Override
74      protected void doStart() throws Exception
75      {
76          super.doStart();
77          getThreadPool().dispatch(new Runnable()
78          {
79  
80              public void run()
81              {
82                  while (isRunning())
83                  {
84                      try
85                      {
86                          Thread.sleep(400);
87                          long now=System.currentTimeMillis();
88                          for (BlockingChannelEndPoint endp : _endpoints)
89                          {
90                              endp.checkIdleTimestamp(now);
91                          }
92                      }
93                      catch(InterruptedException e)
94                      {
95                          Log.ignore(e);
96                      }
97                      catch(Exception e)
98                      {
99                          Log.warn(e);
100                     }
101                 }
102             }
103             
104         });
105         
106     }
107 
108     
109     /* ------------------------------------------------------------ */
110     public void open() throws IOException
111     {
112         // Create a new server socket and set to non blocking mode
113         _acceptChannel= ServerSocketChannel.open();
114         _acceptChannel.configureBlocking(true);
115 
116         // Bind the server socket to the local host and port
117         InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
118         _acceptChannel.socket().bind(addr,getAcceptQueueSize());
119     }
120 
121     /* ------------------------------------------------------------ */
122     public void close() throws IOException
123     {
124         if (_acceptChannel != null)
125             _acceptChannel.close();
126         _acceptChannel=null;
127     }
128     
129     /* ------------------------------------------------------------ */
130     @Override
131     public void accept(int acceptorID)
132     	throws IOException, InterruptedException
133     {   
134         SocketChannel channel = _acceptChannel.accept();
135         channel.configureBlocking(true);
136         Socket socket=channel.socket();
137         configure(socket);
138 
139         BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
140         connection.dispatch();
141     }
142     
143     /* ------------------------------------------------------------------------------- */
144     @Override
145     public void customize(EndPoint endpoint, Request request)
146         throws IOException
147     {
148         super.customize(endpoint, request);
149         endpoint.setMaxIdleTime(_maxIdleTime);
150         configure(((SocketChannel)endpoint.getTransport()).socket());
151     }
152 
153 
154     /* ------------------------------------------------------------------------------- */
155     public int getLocalPort()
156     {
157         if (_acceptChannel==null || !_acceptChannel.isOpen())
158             return -1;
159         return _acceptChannel.socket().getLocalPort();
160     }
161     
162     /* ------------------------------------------------------------------------------- */
163     /* ------------------------------------------------------------------------------- */
164     /* ------------------------------------------------------------------------------- */
165     private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
166     {
167         private Connection _connection;
168         private int _timeout;
169         private volatile long _idleTimestamp;
170         
171         BlockingChannelEndPoint(ByteChannel channel) 
172             throws IOException
173         {
174             super(channel,BlockingChannelConnector.this._maxIdleTime);
175             _connection = new HttpConnection(BlockingChannelConnector.this,this,getServer());
176         }
177         
178         /* ------------------------------------------------------------ */
179         /** Get the connection.
180          * @return the connection
181          */
182         public Connection getConnection()
183         {
184             return _connection;
185         }
186         
187         /* ------------------------------------------------------------ */
188         public void setConnection(Connection connection)
189         {
190             _connection=connection;
191         }
192 
193         /* ------------------------------------------------------------ */
194         public void checkIdleTimestamp(long now)
195         {
196             if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
197             {
198                 idleExpired();
199             }
200         }
201 
202         /* ------------------------------------------------------------ */
203         protected void idleExpired()
204         {
205             try
206             {
207                 close();
208             }
209             catch (IOException e)
210             {
211                 Log.ignore(e);
212             }
213         }
214         
215         /* ------------------------------------------------------------ */
216         void dispatch() throws IOException
217         {
218             if (!getThreadPool().dispatch(this))
219             {
220                 Log.warn("dispatch failed for  {}",_connection);
221                 BlockingChannelEndPoint.this.close();
222             }
223         }
224         
225         /* ------------------------------------------------------------ */
226         /**
227          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
228          */
229         @Override
230         public int fill(Buffer buffer) throws IOException
231         {
232             _idleTimestamp=System.currentTimeMillis();
233             return super.fill(buffer);
234         }
235 
236         /* ------------------------------------------------------------ */
237         /**
238          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
239          */
240         @Override
241         public int flush(Buffer buffer) throws IOException
242         {
243             _idleTimestamp=System.currentTimeMillis();
244             return super.flush(buffer);
245         }
246 
247         /* ------------------------------------------------------------ */
248         /**
249          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
250          */
251         @Override
252         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
253         {
254             _idleTimestamp=System.currentTimeMillis();
255             return super.flush(header,buffer,trailer);
256         }
257 
258         /* ------------------------------------------------------------ */
259         public void run()
260         {
261             try
262             {
263                 _timeout=getMaxIdleTime();
264                 connectionOpened(_connection);
265                 _endpoints.add(this);
266 
267                 while (isOpen())
268                 {
269                     _idleTimestamp=System.currentTimeMillis();
270                     if (_connection.isIdle())
271                     {
272                         if (getServer().getThreadPool().isLowOnThreads())
273                         {
274                             int lrmit = getLowResourcesMaxIdleTime();
275                             if (lrmit>=0 && _timeout!= lrmit)
276                             {
277                                 _timeout=lrmit;
278                             }
279                         }
280                     }
281                     else
282                     {
283                         if (_timeout!=getMaxIdleTime())
284                         {
285                             _timeout=getMaxIdleTime();
286                         }
287                     }
288                     
289                     _connection = _connection.handle();
290                     
291                 }
292             }
293             catch (EofException e)
294             {
295                 Log.debug("EOF", e);
296                 try{BlockingChannelEndPoint.this.close();}
297                 catch(IOException e2){Log.ignore(e2);}
298             }
299             catch (HttpException e)
300             {
301                 Log.debug("BAD", e);
302                 try{BlockingChannelEndPoint.this.close();}
303                 catch(IOException e2){Log.ignore(e2);}
304             }
305             catch(Throwable e)
306             {
307                 Log.warn("handle failed",e);
308                 try{BlockingChannelEndPoint.this.close();}
309                 catch(IOException e2){Log.ignore(e2);}
310             }
311             finally
312             {
313                 connectionClosed(_connection);
314                 _endpoints.remove(this);
315                 
316                 // wait for client to close, but if not, close ourselves.
317                 try
318                 {
319                     if (!_socket.isClosed())
320                     {
321                         long timestamp=System.currentTimeMillis();
322                         int max_idle=getMaxIdleTime(); 
323 
324                         _socket.setSoTimeout(getMaxIdleTime());
325                         int c=0;
326                         do
327                         {
328                             c = _socket.getInputStream().read();
329                         }
330                         while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
331                         if (!_socket.isClosed())
332                             _socket.close();
333                     }
334                 }
335                 catch(IOException e)
336                 {
337                     Log.ignore(e);
338                 }
339             }
340         }
341     }
342 }