View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.net.SocketTimeoutException;
18  import java.net.UnknownHostException;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  import java.nio.channels.UnresolvedAddressException;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  
25  import javax.net.ssl.SSLContext;
26  import javax.net.ssl.SSLEngine;
27  import javax.net.ssl.SSLSession;
28  
29  import org.eclipse.jetty.http.HttpGenerator;
30  import org.eclipse.jetty.http.HttpParser;
31  import org.eclipse.jetty.http.ssl.SslContextFactory;
32  import org.eclipse.jetty.io.Buffer;
33  import org.eclipse.jetty.io.Buffers;
34  import org.eclipse.jetty.io.Buffers.Type;
35  import org.eclipse.jetty.io.BuffersFactory;
36  import org.eclipse.jetty.io.ConnectedEndPoint;
37  import org.eclipse.jetty.io.Connection;
38  import org.eclipse.jetty.io.EndPoint;
39  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
40  import org.eclipse.jetty.io.nio.SelectorManager;
41  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
42  import org.eclipse.jetty.util.component.AbstractLifeCycle;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  import org.eclipse.jetty.util.thread.Timeout;
46  
47  class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
48  {
49      private static final Logger LOG = Log.getLogger(SelectConnector.class);
50  
51      private final HttpClient _httpClient;
52      private final Manager _selectorManager=new Manager();
53      private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
54      private Buffers _sslBuffers;
55  
56      /**
57       * @param httpClient the HttpClient this connector is associated to
58       */
59      SelectConnector(HttpClient httpClient)
60      {
61          _httpClient = httpClient;
62      }
63  
64      /* ------------------------------------------------------------ */
65      @Override
66      protected void doStart() throws Exception
67      {
68          super.doStart();
69  
70  
71          final boolean direct=_httpClient.getUseDirectBuffers();
72  
73          SSLEngine sslEngine=_selectorManager.newSslEngine(null);
74          final SSLSession ssl_session=sslEngine.getSession();
75          _sslBuffers = BuffersFactory.newBuffers(
76                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
77                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
78                  direct?Type.DIRECT:Type.INDIRECT,1024);
79  
80          _selectorManager.start();
81      }
82  
83      /* ------------------------------------------------------------ */
84      @Override
85      protected void doStop() throws Exception
86      {
87          _selectorManager.stop();
88      }
89  
90      /* ------------------------------------------------------------ */
91      public void startConnection( HttpDestination destination )
92          throws IOException
93      {
94          SocketChannel channel = null;
95          try
96          {
97              channel = SocketChannel.open();
98              Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
99              channel.socket().setTcpNoDelay(true);
100 
101             if (_httpClient.isConnectBlocking())
102             {
103                     channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
104                     channel.configureBlocking(false);
105                     _selectorManager.register( channel, destination );
106             }
107             else
108             {
109                 channel.configureBlocking(false);
110                 channel.connect(address.toSocketAddress());            
111                 _selectorManager.register(channel,destination);
112                 ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
113                 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
114                 _connectingChannels.put(channel,connectTimeout);
115             }
116         }
117         catch (UnresolvedAddressException ex)
118         {
119             if (channel != null)
120                 channel.close();
121             destination.onConnectionFailed(ex);
122         }
123         catch(IOException ex)
124         {
125             if (channel != null)
126                 channel.close();
127             destination.onConnectionFailed(ex);
128         }
129     }
130 
131     /* ------------------------------------------------------------ */
132     class Manager extends SelectorManager
133     {
134         @Override
135         public boolean dispatch(Runnable task)
136         {
137             return _httpClient._threadPool.dispatch(task);
138         }
139 
140         @Override
141         protected void endPointOpened(SelectChannelEndPoint endpoint)
142         {
143         }
144 
145         @Override
146         protected void endPointClosed(SelectChannelEndPoint endpoint)
147         {
148         }
149 
150         @Override
151         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
152         {
153         }
154 
155         @Override
156         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
157         {
158             if (endpoint instanceof SslSelectChannelEndPoint)
159                 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
160 
161             return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
162         }
163 
164         @Override
165         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
166         {
167             // We're connected, cancel the connect timeout
168             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
169             if (connectTimeout != null)
170                 connectTimeout.cancel();
171             LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
172 
173             // key should have destination at this point (will be replaced by endpoint after this call)
174             HttpDestination dest=(HttpDestination)key.attachment();
175 
176             SelectChannelEndPoint ep=null;
177 
178             if (dest.isSecure())
179             {
180                 if (dest.isProxied())
181                 {
182                     SSLEngine engine=newSslEngine(channel);
183                     ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout());
184                 }
185                 else
186                 {
187                     SSLEngine engine=newSslEngine(channel);
188                     SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout());
189                     sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate());
190                     ep = sslEp;
191                 }
192             }
193             else
194             {
195                 ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
196             }
197 
198             HttpConnection connection=(HttpConnection)ep.getConnection();
199             connection.setDestination(dest);
200             dest.onNewConnection(connection);
201             return ep;
202         }
203 
204         private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
205         {
206             SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
207             SSLEngine sslEngine;
208             if (channel != null)
209             {
210                 String peerHost = channel.socket().getInetAddress().getHostAddress();
211                 int peerPort = channel.socket().getPort();
212                 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
213             }
214             else
215             {
216                 sslEngine = sslContextFactory.newSslEngine();
217             }
218             sslEngine.setUseClientMode(true);
219             sslEngine.beginHandshake();
220 
221             return sslEngine;
222         }
223 
224         /* ------------------------------------------------------------ */
225         /* (non-Javadoc)
226          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
227          */
228         @Override
229         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
230         {
231             if (attachment instanceof HttpDestination)
232                 ((HttpDestination)attachment).onConnectionFailed(ex);
233             else
234                 super.connectionFailed(channel,ex,attachment);
235         }
236     }
237 
238     private class ConnectTimeout extends Timeout.Task
239     {
240         private final SocketChannel channel;
241         private final HttpDestination destination;
242 
243         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
244         {
245             this.channel = channel;
246             this.destination = destination;
247         }
248 
249         @Override
250         public void expired()
251         {
252             if (channel.isConnectionPending())
253             {
254                 LOG.debug("Channel {} timed out while connecting, closing it", channel);
255                 try
256                 {
257                     // This will unregister the channel from the selector
258                     channel.close();
259                 }
260                 catch (IOException x)
261                 {
262                     LOG.ignore(x);
263                 }
264                 destination.onConnectionFailed(new SocketTimeoutException());
265             }
266         }
267     }
268 
269     /**
270      * An endpoint that is able to "upgrade" from a normal endpoint to a SSL endpoint.
271      * Since {@link HttpParser} and {@link HttpGenerator} only depend on the {@link EndPoint}
272      * interface, this class overrides all methods of {@link EndPoint} to provide the right
273      * behavior depending on the fact that it has been upgraded or not.
274      */
275     public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
276     {
277         private final SelectChannelEndPoint plainEndPoint;
278         private volatile boolean upgraded = false;
279 
280         public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
281         {
282             super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
283             this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
284         }
285 
286         public void upgrade()
287         {
288             upgraded = true;
289         }
290 
291         public void shutdownOutput() throws IOException
292         {
293             if (upgraded)
294                 super.shutdownOutput();
295             else
296                 plainEndPoint.shutdownOutput();
297         }
298 
299         public void close() throws IOException
300         {
301             if (upgraded)
302                 super.close();
303             else
304                 plainEndPoint.close();
305         }
306 
307         public int fill(Buffer buffer) throws IOException
308         {
309             if (upgraded)
310                 return super.fill(buffer);
311             else
312                 return plainEndPoint.fill(buffer);
313         }
314 
315         public int flush(Buffer buffer) throws IOException
316         {
317             if (upgraded)
318                 return super.flush(buffer);
319             else
320                 return plainEndPoint.flush(buffer);
321         }
322 
323         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
324         {
325             if (upgraded)
326                 return super.flush(header, buffer, trailer);
327             else
328                 return plainEndPoint.flush(header, buffer, trailer);
329         }
330 
331         public String getLocalAddr()
332         {
333             if (upgraded)
334                 return super.getLocalAddr();
335             else
336                 return plainEndPoint.getLocalAddr();
337         }
338 
339         public String getLocalHost()
340         {
341             if (upgraded)
342                 return super.getLocalHost();
343             else
344                 return plainEndPoint.getLocalHost();
345         }
346 
347         public int getLocalPort()
348         {
349             if (upgraded)
350                 return super.getLocalPort();
351             else
352                 return plainEndPoint.getLocalPort();
353         }
354 
355         public String getRemoteAddr()
356         {
357             if (upgraded)
358                 return super.getRemoteAddr();
359             else
360                 return plainEndPoint.getRemoteAddr();
361         }
362 
363         public String getRemoteHost()
364         {
365             if (upgraded)
366                 return super.getRemoteHost();
367             else
368                 return plainEndPoint.getRemoteHost();
369         }
370 
371         public int getRemotePort()
372         {
373             if (upgraded)
374                 return super.getRemotePort();
375             else
376                 return plainEndPoint.getRemotePort();
377         }
378 
379         public boolean isBlocking()
380         {
381             if (upgraded)
382                 return super.isBlocking();
383             else
384                 return plainEndPoint.isBlocking();
385         }
386 
387         public boolean isBufferred()
388         {
389             if (upgraded)
390                 return super.isBufferred();
391             else
392                 return plainEndPoint.isBufferred();
393         }
394 
395         public boolean blockReadable(long millisecs) throws IOException
396         {
397             if (upgraded)
398                 return super.blockReadable(millisecs);
399             else
400                 return plainEndPoint.blockReadable(millisecs);
401         }
402 
403         public boolean blockWritable(long millisecs) throws IOException
404         {
405             if (upgraded)
406                 return super.blockWritable(millisecs);
407             else
408                 return plainEndPoint.blockWritable(millisecs);
409         }
410 
411         public boolean isOpen()
412         {
413             if (upgraded)
414                 return super.isOpen();
415             else
416                 return plainEndPoint.isOpen();
417         }
418 
419         public Object getTransport()
420         {
421             if (upgraded)
422                 return super.getTransport();
423             else
424                 return plainEndPoint.getTransport();
425         }
426 
427         public boolean isBufferingInput()
428         {
429             if (upgraded)
430                 return super.isBufferingInput();
431             else
432                 return plainEndPoint.isBufferingInput();
433         }
434 
435         public boolean isBufferingOutput()
436         {
437             if (upgraded)
438                 return super.isBufferingOutput();
439             else
440                 return plainEndPoint.isBufferingOutput();
441         }
442 
443         public void flush() throws IOException
444         {
445             if (upgraded)
446                 super.flush();
447             else
448                 plainEndPoint.flush();
449 
450         }
451 
452         public int getMaxIdleTime()
453         {
454             if (upgraded)
455                 return super.getMaxIdleTime();
456             else
457                 return plainEndPoint.getMaxIdleTime();
458         }
459 
460         public void setMaxIdleTime(int timeMs) throws IOException
461         {
462             if (upgraded)
463                 super.setMaxIdleTime(timeMs);
464             else
465                 plainEndPoint.setMaxIdleTime(timeMs);
466         }
467     }
468 }