View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server.nio;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.Socket;
24  import java.nio.channels.ByteChannel;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.ServerSocketChannel;
27  import java.nio.channels.SocketChannel;
28  import java.util.Set;
29  
30  import org.eclipse.jetty.http.HttpException;
31  import org.eclipse.jetty.io.Buffer;
32  import org.eclipse.jetty.io.ConnectedEndPoint;
33  import org.eclipse.jetty.io.Connection;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.io.EofException;
36  import org.eclipse.jetty.io.nio.ChannelEndPoint;
37  import org.eclipse.jetty.server.BlockingHttpConnection;
38  import org.eclipse.jetty.server.Request;
39  import org.eclipse.jetty.util.ConcurrentHashSet;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  
43  
44  /* ------------------------------------------------------------------------------- */
45  /**  Blocking NIO connector.
46   * This connector uses efficient NIO buffers with a traditional blocking thread model.
47   * Direct NIO buffers are used and a thread is allocated per connections.
48   *
49   * This connector is best used when there are a few very active connections.
50   *
51   * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
52   *
53   *
54   *
55   */
56  public class BlockingChannelConnector extends AbstractNIOConnector
57  {
58      private static final Logger LOG = Log.getLogger(BlockingChannelConnector.class);
59  
60      private transient ServerSocketChannel _acceptChannel;
61      private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
62  
63  
64      /* ------------------------------------------------------------ */
65      /** Constructor.
66       *
67       */
68      public BlockingChannelConnector()
69      {
70      }
71  
72      /* ------------------------------------------------------------ */
73      public Object getConnection()
74      {
75          return _acceptChannel;
76      }
77  
78      /* ------------------------------------------------------------ */
79      /**
80       * @see org.eclipse.jetty.server.AbstractConnector#doStart()
81       */
82      @Override
83      protected void doStart() throws Exception
84      {
85          super.doStart();
86          getThreadPool().dispatch(new Runnable()
87          {
88  
89              public void run()
90              {
91                  while (isRunning())
92                  {
93                      try
94                      {
95                          Thread.sleep(400);
96                          long now=System.currentTimeMillis();
97                          for (BlockingChannelEndPoint endp : _endpoints)
98                          {
99                              endp.checkIdleTimestamp(now);
100                         }
101                     }
102                     catch(InterruptedException e)
103                     {
104                         LOG.ignore(e);
105                     }
106                     catch(Exception e)
107                     {
108                         LOG.warn(e);
109                     }
110                 }
111             }
112 
113         });
114 
115     }
116 
117 
118     /* ------------------------------------------------------------ */
119     public void open() throws IOException
120     {
121         // Create a new server socket and set to non blocking mode
122         _acceptChannel= ServerSocketChannel.open();
123         _acceptChannel.configureBlocking(true);
124 
125         // Bind the server socket to the local host and port
126         InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
127         _acceptChannel.socket().bind(addr,getAcceptQueueSize());
128     }
129 
130     /* ------------------------------------------------------------ */
131     public void close() throws IOException
132     {
133         if (_acceptChannel != null)
134             _acceptChannel.close();
135         _acceptChannel=null;
136     }
137 
138     /* ------------------------------------------------------------ */
139     @Override
140     public void accept(int acceptorID)
141     	throws IOException, InterruptedException
142     {
143         SocketChannel channel = _acceptChannel.accept();
144         channel.configureBlocking(true);
145         Socket socket=channel.socket();
146         configure(socket);
147 
148         BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
149         connection.dispatch();
150     }
151 
152     /* ------------------------------------------------------------------------------- */
153     @Override
154     public void customize(EndPoint endpoint, Request request)
155         throws IOException
156     {
157         super.customize(endpoint, request);
158         endpoint.setMaxIdleTime(_maxIdleTime);
159         configure(((SocketChannel)endpoint.getTransport()).socket());
160     }
161 
162 
163     /* ------------------------------------------------------------------------------- */
164     public int getLocalPort()
165     {
166         if (_acceptChannel==null || !_acceptChannel.isOpen())
167             return -1;
168         return _acceptChannel.socket().getLocalPort();
169     }
170 
171     /* ------------------------------------------------------------------------------- */
172     /* ------------------------------------------------------------------------------- */
173     /* ------------------------------------------------------------------------------- */
174     private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
175     {
176         private Connection _connection;
177         private int _timeout;
178         private volatile long _idleTimestamp;
179 
180         BlockingChannelEndPoint(ByteChannel channel)
181             throws IOException
182         {
183             super(channel,BlockingChannelConnector.this._maxIdleTime);
184             _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
185         }
186 
187         /* ------------------------------------------------------------ */
188         /** Get the connection.
189          * @return the connection
190          */
191         public Connection getConnection()
192         {
193             return _connection;
194         }
195 
196         /* ------------------------------------------------------------ */
197         public void setConnection(Connection connection)
198         {
199             _connection=connection;
200         }
201 
202         /* ------------------------------------------------------------ */
203         public void checkIdleTimestamp(long now)
204         {
205             if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
206             {
207                 idleExpired();
208             }
209         }
210 
211         /* ------------------------------------------------------------ */
212         protected void idleExpired()
213         {
214             try
215             {
216                 super.close();
217             }
218             catch (IOException e)
219             {
220                 LOG.ignore(e);
221             }
222         }
223 
224         /* ------------------------------------------------------------ */
225         void dispatch() throws IOException
226         {
227             if (!getThreadPool().dispatch(this))
228             {
229                 LOG.warn("dispatch failed for  {}",_connection);
230                 super.close();
231             }
232         }
233 
234         /* ------------------------------------------------------------ */
235         /**
236          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
237          */
238         @Override
239         public int fill(Buffer buffer) throws IOException
240         {
241             _idleTimestamp=System.currentTimeMillis();
242             return super.fill(buffer);
243         }
244 
245         /* ------------------------------------------------------------ */
246         /**
247          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
248          */
249         @Override
250         public int flush(Buffer buffer) throws IOException
251         {
252             _idleTimestamp=System.currentTimeMillis();
253             return super.flush(buffer);
254         }
255 
256         /* ------------------------------------------------------------ */
257         /**
258          * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
259          */
260         @Override
261         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
262         {
263             _idleTimestamp=System.currentTimeMillis();
264             return super.flush(header,buffer,trailer);
265         }
266 
267         /* ------------------------------------------------------------ */
268         public void run()
269         {
270             try
271             {
272                 _timeout=getMaxIdleTime();
273                 connectionOpened(_connection);
274                 _endpoints.add(this);
275 
276                 while (isOpen())
277                 {
278                     _idleTimestamp=System.currentTimeMillis();
279                     if (_connection.isIdle())
280                     {
281                         if (getServer().getThreadPool().isLowOnThreads())
282                         {
283                             int lrmit = getLowResourcesMaxIdleTime();
284                             if (lrmit>=0 && _timeout!= lrmit)
285                             {
286                                 _timeout=lrmit;
287                             }
288                         }
289                     }
290                     else
291                     {
292                         if (_timeout!=getMaxIdleTime())
293                         {
294                             _timeout=getMaxIdleTime();
295                         }
296                     }
297 
298                     _connection = _connection.handle();
299 
300                 }
301             }
302             catch (EofException e)
303             {
304                 LOG.debug("EOF", e);
305                 try{BlockingChannelEndPoint.this.close();}
306                 catch(IOException e2){LOG.ignore(e2);}
307             }
308             catch (HttpException e)
309             {
310                 LOG.debug("BAD", e);
311                 try{super.close();}
312                 catch(IOException e2){LOG.ignore(e2);}
313             }
314             catch(Throwable e)
315             {
316                 LOG.warn("handle failed",e);
317                 try{super.close();}
318                 catch(IOException e2){LOG.ignore(e2);}
319             }
320             finally
321             {
322                 connectionClosed(_connection);
323                 _endpoints.remove(this);
324 
325                 // wait for client to close, but if not, close ourselves.
326                 try
327                 {
328                     if (!_socket.isClosed())
329                     {
330                         long timestamp=System.currentTimeMillis();
331                         int max_idle=getMaxIdleTime();
332 
333                         _socket.setSoTimeout(getMaxIdleTime());
334                         int c=0;
335                         do
336                         {
337                             c = _socket.getInputStream().read();
338                         }
339                         while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
340                         if (!_socket.isClosed())
341                             _socket.close();
342                     }
343                 }
344                 catch(IOException e)
345                 {
346                     LOG.ignore(e);
347                 }
348             }
349         }
350 
351         /* ------------------------------------------------------------ */
352         @Override
353         public String toString()
354         {
355             return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}",
356                     hashCode(),
357                     _socket.getRemoteSocketAddress(),
358                     _socket.getLocalSocketAddress(),
359                     isOpen(),
360                     isInputShutdown(),
361                     isOutputShutdown(),
362                     _connection);
363         }
364 
365     }
366 }