View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.net.SocketTimeoutException;
18  import java.net.UnknownHostException;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  import java.nio.channels.UnresolvedAddressException;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  
25  import javax.net.ssl.SSLContext;
26  import javax.net.ssl.SSLEngine;
27  import javax.net.ssl.SSLSession;
28  
29  import org.eclipse.jetty.http.HttpGenerator;
30  import org.eclipse.jetty.http.HttpParser;
31  import org.eclipse.jetty.http.ssl.SslContextFactory;
32  import org.eclipse.jetty.io.Buffer;
33  import org.eclipse.jetty.io.Buffers;
34  import org.eclipse.jetty.io.Buffers.Type;
35  import org.eclipse.jetty.io.BuffersFactory;
36  import org.eclipse.jetty.io.ConnectedEndPoint;
37  import org.eclipse.jetty.io.Connection;
38  import org.eclipse.jetty.io.EndPoint;
39  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
40  import org.eclipse.jetty.io.nio.SelectorManager;
41  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
42  import org.eclipse.jetty.util.component.AbstractLifeCycle;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  import org.eclipse.jetty.util.thread.Timeout;
46  
47  class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
48  {
49      private static final Logger LOG = Log.getLogger(SelectConnector.class);
50  
51      private final HttpClient _httpClient;
52      private final Manager _selectorManager=new Manager();
53      private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
54      private Buffers _sslBuffers;
55  
56      /**
57       * @param httpClient the HttpClient this connector is associated to
58       */
59      SelectConnector(HttpClient httpClient)
60      {
61          _httpClient = httpClient;
62      }
63  
64      /* ------------------------------------------------------------ */
65      @Override
66      protected void doStart() throws Exception
67      {
68          super.doStart();
69  
70  
71          final boolean direct=_httpClient.getUseDirectBuffers();
72  
73          SSLEngine sslEngine=_selectorManager.newSslEngine(null);
74          final SSLSession ssl_session=sslEngine.getSession();
75          _sslBuffers = BuffersFactory.newBuffers(
76                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
77                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
78                  direct?Type.DIRECT:Type.INDIRECT,1024);
79  
80          _selectorManager.start();
81      }
82  
83      /* ------------------------------------------------------------ */
84      @Override
85      protected void doStop() throws Exception
86      {
87          _selectorManager.stop();
88      }
89  
90      /* ------------------------------------------------------------ */
91      public void startConnection( HttpDestination destination )
92          throws IOException
93      {
94          SocketChannel channel = null;
95          try
96          {
97              channel = SocketChannel.open();
98              Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
99              channel.socket().setTcpNoDelay(true);
100 
101             if (_httpClient.isConnectBlocking())
102             {
103                     channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
104                     channel.configureBlocking(false);
105                     _selectorManager.register( channel, destination );
106             }
107             else
108             {
109                 channel.configureBlocking(false);
110                 channel.connect(address.toSocketAddress());            
111                 _selectorManager.register(channel,destination);
112                 ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
113                 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
114                 _connectingChannels.put(channel,connectTimeout);
115             }
116         }
117         catch (UnresolvedAddressException ex)
118         {
119             if (channel != null)
120                 channel.close();
121             destination.onConnectionFailed(ex);
122         }
123         catch(IOException ex)
124         {
125             if (channel != null)
126                 channel.close();
127             destination.onConnectionFailed(ex);
128         }
129     }
130 
131     /* ------------------------------------------------------------ */
132     class Manager extends SelectorManager
133     {
134         @Override
135         public boolean dispatch(Runnable task)
136         {
137             return _httpClient._threadPool.dispatch(task);
138         }
139 
140         @Override
141         protected void endPointOpened(SelectChannelEndPoint endpoint)
142         {
143         }
144 
145         @Override
146         protected void endPointClosed(SelectChannelEndPoint endpoint)
147         {
148         }
149 
150         @Override
151         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
152         {
153         }
154 
155         @Override
156         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
157         {
158             if (endpoint instanceof SslSelectChannelEndPoint)
159                 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
160 
161             return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
162         }
163 
164         @Override
165         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
166         {
167             // We're connected, cancel the connect timeout
168             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
169             if (connectTimeout != null)
170                 connectTimeout.cancel();
171             if (LOG.isDebugEnabled())
172                 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
173 
174             // key should have destination at this point (will be replaced by endpoint after this call)
175             HttpDestination dest=(HttpDestination)key.attachment();
176 
177             SelectChannelEndPoint ep=null;
178 
179             if (dest.isSecure())
180             {
181                 if (dest.isProxied())
182                 {
183                     SSLEngine engine=newSslEngine(channel);
184                     ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout());
185                 }
186                 else
187                 {
188                     SSLEngine engine=newSslEngine(channel);
189                     SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout());
190                     sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate());
191                     ep = sslEp;
192                 }
193             }
194             else
195             {
196                 ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
197             }
198 
199             HttpConnection connection=(HttpConnection)ep.getConnection();
200             connection.setDestination(dest);
201             dest.onNewConnection(connection);
202             return ep;
203         }
204 
205         private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
206         {
207             SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
208             SSLEngine sslEngine;
209             if (channel != null)
210             {
211                 String peerHost = channel.socket().getInetAddress().getHostAddress();
212                 int peerPort = channel.socket().getPort();
213                 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
214             }
215             else
216             {
217                 sslEngine = sslContextFactory.newSslEngine();
218             }
219             sslEngine.setUseClientMode(true);
220             sslEngine.beginHandshake();
221 
222             return sslEngine;
223         }
224 
225         /* ------------------------------------------------------------ */
226         /* (non-Javadoc)
227          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
228          */
229         @Override
230         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
231         {
232             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
233             if (connectTimeout != null)
234                 connectTimeout.cancel();
235             
236             if (attachment instanceof HttpDestination)
237                 ((HttpDestination)attachment).onConnectionFailed(ex);
238             else
239                 super.connectionFailed(channel,ex,attachment);
240         }
241     }
242 
243     private class ConnectTimeout extends Timeout.Task
244     {
245         private final SocketChannel channel;
246         private final HttpDestination destination;
247 
248         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
249         {
250             this.channel = channel;
251             this.destination = destination;
252         }
253 
254         @Override
255         public void expired()
256         {
257             if (channel.isConnectionPending())
258             {
259                 LOG.debug("Channel {} timed out while connecting, closing it", channel);
260                 try
261                 {
262                     // This will unregister the channel from the selector
263                     channel.close();
264                 }
265                 catch (IOException x)
266                 {
267                     LOG.ignore(x);
268                 }
269                 destination.onConnectionFailed(new SocketTimeoutException());
270             }
271         }
272     }
273 
274     /**
275      * An endpoint that is able to "upgrade" from a normal endpoint to a SSL endpoint.
276      * Since {@link HttpParser} and {@link HttpGenerator} only depend on the {@link EndPoint}
277      * interface, this class overrides all methods of {@link EndPoint} to provide the right
278      * behavior depending on the fact that it has been upgraded or not.
279      */
280     public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
281     {
282         private final SelectChannelEndPoint plainEndPoint;
283         private volatile boolean upgraded = false;
284 
285         public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
286         {
287             super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
288             this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
289         }
290 
291         public void upgrade()
292         {
293             upgraded = true;
294         }
295 
296         public void shutdownOutput() throws IOException
297         {
298             if (upgraded)
299                 super.shutdownOutput();
300             else
301                 plainEndPoint.shutdownOutput();
302         }
303 
304         public void close() throws IOException
305         {
306             if (upgraded)
307                 super.close();
308             else
309                 plainEndPoint.close();
310         }
311 
312         public int fill(Buffer buffer) throws IOException
313         {
314             if (upgraded)
315                 return super.fill(buffer);
316             else
317                 return plainEndPoint.fill(buffer);
318         }
319 
320         public int flush(Buffer buffer) throws IOException
321         {
322             if (upgraded)
323                 return super.flush(buffer);
324             else
325                 return plainEndPoint.flush(buffer);
326         }
327 
328         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
329         {
330             if (upgraded)
331                 return super.flush(header, buffer, trailer);
332             else
333                 return plainEndPoint.flush(header, buffer, trailer);
334         }
335 
336         public String getLocalAddr()
337         {
338             if (upgraded)
339                 return super.getLocalAddr();
340             else
341                 return plainEndPoint.getLocalAddr();
342         }
343 
344         public String getLocalHost()
345         {
346             if (upgraded)
347                 return super.getLocalHost();
348             else
349                 return plainEndPoint.getLocalHost();
350         }
351 
352         public int getLocalPort()
353         {
354             if (upgraded)
355                 return super.getLocalPort();
356             else
357                 return plainEndPoint.getLocalPort();
358         }
359 
360         public String getRemoteAddr()
361         {
362             if (upgraded)
363                 return super.getRemoteAddr();
364             else
365                 return plainEndPoint.getRemoteAddr();
366         }
367 
368         public String getRemoteHost()
369         {
370             if (upgraded)
371                 return super.getRemoteHost();
372             else
373                 return plainEndPoint.getRemoteHost();
374         }
375 
376         public int getRemotePort()
377         {
378             if (upgraded)
379                 return super.getRemotePort();
380             else
381                 return plainEndPoint.getRemotePort();
382         }
383 
384         public boolean isBlocking()
385         {
386             if (upgraded)
387                 return super.isBlocking();
388             else
389                 return plainEndPoint.isBlocking();
390         }
391 
392         public boolean isBufferred()
393         {
394             if (upgraded)
395                 return super.isBufferred();
396             else
397                 return plainEndPoint.isBufferred();
398         }
399 
400         public boolean blockReadable(long millisecs) throws IOException
401         {
402             if (upgraded)
403                 return super.blockReadable(millisecs);
404             else
405                 return plainEndPoint.blockReadable(millisecs);
406         }
407 
408         public boolean blockWritable(long millisecs) throws IOException
409         {
410             if (upgraded)
411                 return super.blockWritable(millisecs);
412             else
413                 return plainEndPoint.blockWritable(millisecs);
414         }
415 
416         public boolean isOpen()
417         {
418             if (upgraded)
419                 return super.isOpen();
420             else
421                 return plainEndPoint.isOpen();
422         }
423 
424         public Object getTransport()
425         {
426             if (upgraded)
427                 return super.getTransport();
428             else
429                 return plainEndPoint.getTransport();
430         }
431 
432         public boolean isBufferingInput()
433         {
434             if (upgraded)
435                 return super.isBufferingInput();
436             else
437                 return plainEndPoint.isBufferingInput();
438         }
439 
440         public boolean isBufferingOutput()
441         {
442             if (upgraded)
443                 return super.isBufferingOutput();
444             else
445                 return plainEndPoint.isBufferingOutput();
446         }
447 
448         public void flush() throws IOException
449         {
450             if (upgraded)
451                 super.flush();
452             else
453                 plainEndPoint.flush();
454 
455         }
456 
457         public int getMaxIdleTime()
458         {
459             if (upgraded)
460                 return super.getMaxIdleTime();
461             else
462                 return plainEndPoint.getMaxIdleTime();
463         }
464 
465         public void setMaxIdleTime(int timeMs) throws IOException
466         {
467             if (upgraded)
468                 super.setMaxIdleTime(timeMs);
469             else
470                 plainEndPoint.setMaxIdleTime(timeMs);
471         }
472     }
473 }