View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server.bio;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.ServerSocket;
24  import java.net.Socket;
25  import java.net.SocketException;
26  import java.util.HashSet;
27  import java.util.Set;
28  
29  import org.eclipse.jetty.http.HttpException;
30  import org.eclipse.jetty.io.Buffer;
31  import org.eclipse.jetty.io.ConnectedEndPoint;
32  import org.eclipse.jetty.io.Connection;
33  import org.eclipse.jetty.io.EndPoint;
34  import org.eclipse.jetty.io.EofException;
35  import org.eclipse.jetty.io.bio.SocketEndPoint;
36  import org.eclipse.jetty.server.AbstractConnector;
37  import org.eclipse.jetty.server.AbstractHttpConnection;
38  import org.eclipse.jetty.server.BlockingHttpConnection;
39  import org.eclipse.jetty.server.Request;
40  import org.eclipse.jetty.util.component.AggregateLifeCycle;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  
44  
45  /* ------------------------------------------------------------------------------- */
46  /**  Socket Connector.
47   * This connector implements a traditional blocking IO and threading model.
48   * Normal JRE sockets are used and a thread is allocated per connection.
49   * Buffers are managed so that large buffers are only allocated to active connections.
50   *
51   * This Connector should only be used if NIO is not available.
52   *
53   * @org.apache.xbean.XBean element="bioConnector" description="Creates a BIO based socket connector"
54   *
55   *
56   */
57  public class SocketConnector extends AbstractConnector
58  {
59      private static final Logger LOG = Log.getLogger(SocketConnector.class);
60  
61      protected ServerSocket _serverSocket;
62      protected final Set<EndPoint> _connections;
63      protected volatile int _localPort=-1;
64  
65      /* ------------------------------------------------------------ */
66      /** Constructor.
67       *
68       */
69      public SocketConnector()
70      {
71          _connections=new HashSet<EndPoint>();
72      }
73  
74      /* ------------------------------------------------------------ */
75      public Object getConnection()
76      {
77          return _serverSocket;
78      }
79  
80      /* ------------------------------------------------------------ */
81      public void open() throws IOException
82      {
83          // Create a new server socket and set to non blocking mode
84          if (_serverSocket==null || _serverSocket.isClosed())
85          _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize());
86          _serverSocket.setReuseAddress(getReuseAddress());
87          _localPort=_serverSocket.getLocalPort();
88          if (_localPort<=0)
89              throw new IllegalStateException("port not allocated for "+this);
90  
91      }
92  
93      /* ------------------------------------------------------------ */
94      protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException
95      {
96          ServerSocket ss= host==null?
97              new ServerSocket(port,backlog):
98              new ServerSocket(port,backlog,InetAddress.getByName(host));
99  
100         return ss;
101     }
102 
103     /* ------------------------------------------------------------ */
104     public void close() throws IOException
105     {
106         if (_serverSocket!=null)
107             _serverSocket.close();
108         _serverSocket=null;
109         _localPort=-2;
110     }
111 
112     /* ------------------------------------------------------------ */
113     @Override
114     public void accept(int acceptorID)
115     	throws IOException, InterruptedException
116     {
117         Socket socket = _serverSocket.accept();
118         configure(socket);
119 
120         ConnectorEndPoint connection=new ConnectorEndPoint(socket);
121         connection.dispatch();
122     }
123 
124     /* ------------------------------------------------------------------------------- */
125     /**
126      * Allows subclass to override Conection if required.
127      */
128     protected Connection newConnection(EndPoint endpoint)
129     {
130         return new BlockingHttpConnection(this, endpoint, getServer());
131     }
132 
133     /* ------------------------------------------------------------------------------- */
134     @Override
135     public void customize(EndPoint endpoint, Request request)
136         throws IOException
137     {
138         ConnectorEndPoint connection = (ConnectorEndPoint)endpoint;
139         int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime;
140         connection.setMaxIdleTime(lrmit);
141 
142         super.customize(endpoint, request);
143     }
144 
145     /* ------------------------------------------------------------------------------- */
146     public int getLocalPort()
147     {
148         return _localPort;
149     }
150 
151     /* ------------------------------------------------------------------------------- */
152     @Override
153     protected void doStart() throws Exception
154     {
155         _connections.clear();
156         super.doStart();
157     }
158 
159     /* ------------------------------------------------------------------------------- */
160     @Override
161     protected void doStop() throws Exception
162     {
163         super.doStop();
164         Set<EndPoint> set = new HashSet<EndPoint>();
165         synchronized(_connections)
166         {
167             set.addAll(_connections);
168         }
169         for (EndPoint endPoint : set)
170         {
171             ConnectorEndPoint connection = (ConnectorEndPoint)endPoint;
172             connection.close();
173         }
174     }
175 
176     @Override
177     public void dump(Appendable out, String indent) throws IOException
178     {
179         super.dump(out, indent);
180         Set<EndPoint> connections = new HashSet<EndPoint>();
181         synchronized (_connections)
182         {
183             connections.addAll(_connections);
184         }
185         AggregateLifeCycle.dump(out, indent, connections);
186     }
187 
188     /* ------------------------------------------------------------------------------- */
189     /* ------------------------------------------------------------------------------- */
190     /* ------------------------------------------------------------------------------- */
191     protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint
192     {
193         volatile Connection _connection;
194         protected final Socket _socket;
195 
196         public ConnectorEndPoint(Socket socket) throws IOException
197         {
198             super(socket,_maxIdleTime);
199             _connection = newConnection(this);
200             _socket=socket;
201         }
202 
203         public Connection getConnection()
204         {
205             return _connection;
206         }
207 
208         public void setConnection(Connection connection)
209         {
210             if (_connection!=connection && _connection!=null)
211                 connectionUpgraded(_connection,connection);
212             _connection=connection;
213         }
214 
215         public void dispatch() throws IOException
216         {
217             if (getThreadPool()==null || !getThreadPool().dispatch(this))
218             {
219                 LOG.warn("dispatch failed for {}",_connection);
220                 close();
221             }
222         }
223 
224         @Override
225         public int fill(Buffer buffer) throws IOException
226         {
227             int l = super.fill(buffer);
228             if (l<0)
229             {
230                 if (!isInputShutdown())
231                     shutdownInput();
232                 if (isOutputShutdown())
233                     close();
234             }
235             return l;
236         }
237 
238         @Override
239         public void close() throws IOException
240         {
241             if (_connection instanceof AbstractHttpConnection)
242                 ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel();
243             super.close();
244         }
245 
246         public void run()
247         {
248             try
249             {
250                 connectionOpened(_connection);
251                 synchronized(_connections)
252                 {
253                     _connections.add(this);
254                 }
255 
256                 while (isStarted() && !isClosed())
257                 {
258                     if (_connection.isIdle())
259                     {
260                         if (isLowResources())
261                             setMaxIdleTime(getLowResourcesMaxIdleTime());
262                     }
263 
264                     _connection=_connection.handle();
265                 }
266             }
267             catch (EofException e)
268             {
269                 LOG.debug("EOF", e);
270                 try{close();}
271                 catch(IOException e2){LOG.ignore(e2);}
272             }
273             catch (SocketException e)
274             {
275                 LOG.debug("EOF", e);
276                 try{close();}
277                 catch(IOException e2){LOG.ignore(e2);}
278             }
279             catch (HttpException e)
280             {
281                 LOG.debug("BAD", e);
282                 try{close();}
283                 catch(IOException e2){LOG.ignore(e2);}
284             }
285             catch(Exception e)
286             {
287                 LOG.warn("handle failed?",e);
288                 try{close();}
289                 catch(IOException e2){LOG.ignore(e2);}
290             }
291             finally
292             {
293                 connectionClosed(_connection);
294                 synchronized(_connections)
295                 {
296                     _connections.remove(this);
297                 }
298 
299                 // wait for client to close, but if not, close ourselves.
300                 try
301                 {
302                     if (!_socket.isClosed())
303                     {
304                         long timestamp=System.currentTimeMillis();
305                         int max_idle=getMaxIdleTime();
306 
307                         _socket.setSoTimeout(getMaxIdleTime());
308                         int c=0;
309                         do
310                         {
311                             c = _socket.getInputStream().read();
312                         }
313                         while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
314                         if (!_socket.isClosed())
315                             _socket.close();
316                     }
317                 }
318                 catch(IOException e)
319                 {
320                     LOG.ignore(e);
321                 }
322             }
323         }
324     }
325 }