View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.server.bio;
15  
16  import java.io.IOException;
17  import java.net.InetAddress;
18  import java.net.ServerSocket;
19  import java.net.Socket;
20  import java.net.SocketException;
21  import java.util.HashSet;
22  import java.util.Set;
23  
24  import org.eclipse.jetty.http.HttpException;
25  import org.eclipse.jetty.io.Buffer;
26  import org.eclipse.jetty.io.ConnectedEndPoint;
27  import org.eclipse.jetty.io.Connection;
28  import org.eclipse.jetty.io.EndPoint;
29  import org.eclipse.jetty.io.EofException;
30  import org.eclipse.jetty.io.bio.SocketEndPoint;
31  import org.eclipse.jetty.server.AbstractConnector;
32  import org.eclipse.jetty.server.AbstractHttpConnection;
33  import org.eclipse.jetty.server.BlockingHttpConnection;
34  import org.eclipse.jetty.server.Request;
35  import org.eclipse.jetty.util.component.AggregateLifeCycle;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  
39  
40  /* ------------------------------------------------------------------------------- */
41  /**  Socket Connector.
42   * This connector implements a traditional blocking IO and threading model.
43   * Normal JRE sockets are used and a thread is allocated per connection.
44   * Buffers are managed so that large buffers are only allocated to active connections.
45   *
46   * This Connector should only be used if NIO is not available.
47   *
48   * @org.apache.xbean.XBean element="bioConnector" description="Creates a BIO based socket connector"
49   *
50   *
51   */
52  public class SocketConnector extends AbstractConnector
53  {
54      private static final Logger LOG = Log.getLogger(SocketConnector.class);
55  
56      protected ServerSocket _serverSocket;
57      protected final Set<EndPoint> _connections;
58      protected volatile int _localPort=-1;
59  
60      /* ------------------------------------------------------------ */
61      /** Constructor.
62       *
63       */
64      public SocketConnector()
65      {
66          _connections=new HashSet<EndPoint>();
67      }
68  
69      /* ------------------------------------------------------------ */
70      public Object getConnection()
71      {
72          return _serverSocket;
73      }
74  
75      /* ------------------------------------------------------------ */
76      public void open() throws IOException
77      {
78          // Create a new server socket and set to non blocking mode
79          if (_serverSocket==null || _serverSocket.isClosed())
80          _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize());
81          _serverSocket.setReuseAddress(getReuseAddress());
82          _localPort=_serverSocket.getLocalPort();
83          if (_localPort<=0)
84              throw new IllegalStateException("port not allocated for "+this);
85  
86      }
87  
88      /* ------------------------------------------------------------ */
89      protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException
90      {
91          ServerSocket ss= host==null?
92              new ServerSocket(port,backlog):
93              new ServerSocket(port,backlog,InetAddress.getByName(host));
94  
95          return ss;
96      }
97  
98      /* ------------------------------------------------------------ */
99      public void close() throws IOException
100     {
101         if (_serverSocket!=null)
102             _serverSocket.close();
103         _serverSocket=null;
104         _localPort=-2;
105     }
106 
107     /* ------------------------------------------------------------ */
108     @Override
109     public void accept(int acceptorID)
110     	throws IOException, InterruptedException
111     {
112         Socket socket = _serverSocket.accept();
113         configure(socket);
114 
115         ConnectorEndPoint connection=new ConnectorEndPoint(socket);
116         connection.dispatch();
117     }
118 
119     /* ------------------------------------------------------------------------------- */
120     /**
121      * Allows subclass to override Conection if required.
122      */
123     protected Connection newConnection(EndPoint endpoint)
124     {
125         return new BlockingHttpConnection(this, endpoint, getServer());
126     }
127 
128     /* ------------------------------------------------------------------------------- */
129     @Override
130     public void customize(EndPoint endpoint, Request request)
131         throws IOException
132     {
133         ConnectorEndPoint connection = (ConnectorEndPoint)endpoint;
134         int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime;
135         connection.setMaxIdleTime(lrmit);
136 
137         super.customize(endpoint, request);
138     }
139 
140     /* ------------------------------------------------------------------------------- */
141     public int getLocalPort()
142     {
143         return _localPort;
144     }
145 
146     /* ------------------------------------------------------------------------------- */
147     @Override
148     protected void doStart() throws Exception
149     {
150         _connections.clear();
151         super.doStart();
152     }
153 
154     /* ------------------------------------------------------------------------------- */
155     @Override
156     protected void doStop() throws Exception
157     {
158         super.doStop();
159         Set<EndPoint> set = new HashSet<EndPoint>();
160         synchronized(_connections)
161         {
162             set.addAll(_connections);
163         }
164         for (EndPoint endPoint : set)
165         {
166             ConnectorEndPoint connection = (ConnectorEndPoint)endPoint;
167             connection.close();
168         }
169     }
170 
171     @Override
172     public void dump(Appendable out, String indent) throws IOException
173     {
174         super.dump(out, indent);
175         Set<EndPoint> connections = new HashSet<EndPoint>();
176         synchronized (_connections)
177         {
178             connections.addAll(_connections);
179         }
180         AggregateLifeCycle.dump(out, indent, connections);
181     }
182 
183     /* ------------------------------------------------------------------------------- */
184     /* ------------------------------------------------------------------------------- */
185     /* ------------------------------------------------------------------------------- */
186     protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint
187     {
188         volatile Connection _connection;
189         protected final Socket _socket;
190 
191         public ConnectorEndPoint(Socket socket) throws IOException
192         {
193             super(socket,_maxIdleTime);
194             _connection = newConnection(this);
195             _socket=socket;
196         }
197 
198         public Connection getConnection()
199         {
200             return _connection;
201         }
202 
203         public void setConnection(Connection connection)
204         {
205             if (_connection!=connection && _connection!=null)
206                 connectionUpgraded(_connection,connection);
207             _connection=connection;
208         }
209 
210         public void dispatch() throws IOException
211         {
212             if (getThreadPool()==null || !getThreadPool().dispatch(this))
213             {
214                 LOG.warn("dispatch failed for {}",_connection);
215                 close();
216             }
217         }
218 
219         @Override
220         public int fill(Buffer buffer) throws IOException
221         {
222             int l = super.fill(buffer);
223             if (l<0)
224                 close();
225             return l;
226         }
227 
228         @Override
229         public void close() throws IOException
230         {
231             if (_connection instanceof AbstractHttpConnection)
232                 ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel();
233             super.close();
234         }
235 
236         public void run()
237         {
238             try
239             {
240                 connectionOpened(_connection);
241                 synchronized(_connections)
242                 {
243                     _connections.add(this);
244                 }
245 
246                 while (isStarted() && !isClosed())
247                 {
248                     if (_connection.isIdle())
249                     {
250                         if (isLowResources())
251                             setMaxIdleTime(getLowResourcesMaxIdleTime());
252                     }
253 
254                     _connection=_connection.handle();
255                 }
256             }
257             catch (EofException e)
258             {
259                 LOG.debug("EOF", e);
260                 try{close();}
261                 catch(IOException e2){LOG.ignore(e2);}
262             }
263             catch (SocketException e)
264             {
265                 LOG.debug("EOF", e);
266                 try{close();}
267                 catch(IOException e2){LOG.ignore(e2);}
268             }
269             catch (HttpException e)
270             {
271                 LOG.debug("BAD", e);
272                 try{close();}
273                 catch(IOException e2){LOG.ignore(e2);}
274             }
275             catch(Exception e)
276             {
277                 LOG.warn("handle failed?",e);
278                 try{close();}
279                 catch(IOException e2){LOG.ignore(e2);}
280             }
281             finally
282             {
283                 connectionClosed(_connection);
284                 synchronized(_connections)
285                 {
286                     _connections.remove(this);
287                 }
288 
289                 // wait for client to close, but if not, close ourselves.
290                 try
291                 {
292                     if (!_socket.isClosed())
293                     {
294                         long timestamp=System.currentTimeMillis();
295                         int max_idle=getMaxIdleTime();
296 
297                         _socket.setSoTimeout(getMaxIdleTime());
298                         int c=0;
299                         do
300                         {
301                             c = _socket.getInputStream().read();
302                         }
303                         while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
304                         if (!_socket.isClosed())
305                             _socket.close();
306                     }
307                 }
308                 catch(IOException e)
309                 {
310                     LOG.ignore(e);
311                 }
312             }
313         }
314     }
315 }