View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.net.SocketTimeoutException;
18  import java.nio.channels.SelectionKey;
19  import java.nio.channels.SocketChannel;
20  import java.util.Map;
21  import java.util.concurrent.ConcurrentHashMap;
22  import javax.net.ssl.SSLContext;
23  import javax.net.ssl.SSLEngine;
24  import javax.net.ssl.SSLSession;
25  
26  import org.eclipse.jetty.http.HttpGenerator;
27  import org.eclipse.jetty.http.HttpParser;
28  import org.eclipse.jetty.http.ssl.SslContextFactory;
29  import org.eclipse.jetty.io.Buffer;
30  import org.eclipse.jetty.io.Buffers;
31  import org.eclipse.jetty.io.Buffers.Type;
32  import org.eclipse.jetty.io.BuffersFactory;
33  import org.eclipse.jetty.io.ConnectedEndPoint;
34  import org.eclipse.jetty.io.Connection;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
37  import org.eclipse.jetty.io.nio.SelectorManager;
38  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
39  import org.eclipse.jetty.util.component.AbstractLifeCycle;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Timeout;
43  
44  class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
45  {
46      private static final Logger LOG = Log.getLogger(SelectConnector.class);
47  
48      private final HttpClient _httpClient;
49      private final Manager _selectorManager=new Manager();
50      private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
51      private SSLContext _sslContext;
52      private Buffers _sslBuffers;
53  
54      /**
55       * @param httpClient the HttpClient this connector is associated to
56       */
57      SelectConnector(HttpClient httpClient)
58      {
59          _httpClient = httpClient;
60      }
61  
62      /* ------------------------------------------------------------ */
63      @Override
64      protected void doStart() throws Exception
65      {
66          super.doStart();
67  
68  
69          final boolean direct=_httpClient.getUseDirectBuffers();
70  
71          SSLEngine sslEngine=_selectorManager.newSslEngine(null);
72          final SSLSession ssl_session=sslEngine.getSession();
73          _sslBuffers = BuffersFactory.newBuffers(
74                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
75                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
76                  direct?Type.DIRECT:Type.INDIRECT,1024);
77  
78          _selectorManager.start();
79      }
80  
81      /* ------------------------------------------------------------ */
82      @Override
83      protected void doStop() throws Exception
84      {
85          _selectorManager.stop();
86      }
87  
88      /* ------------------------------------------------------------ */
89      public void startConnection( HttpDestination destination )
90          throws IOException
91      {
92          try
93          {
94              SocketChannel channel = SocketChannel.open();
95              Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
96              channel.socket().setTcpNoDelay(true);
97  
98              if (_httpClient.isConnectBlocking())
99              {
100                 channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
101                 channel.configureBlocking(false);
102                 _selectorManager.register( channel, destination );
103             }
104             else
105             {
106                 channel.configureBlocking( false );
107                 channel.connect(address.toSocketAddress());
108                 _selectorManager.register( channel, destination );
109                 ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination);
110                 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
111                 _connectingChannels.put(channel, connectTimeout);
112             }
113 
114         }
115         catch(IOException ex)
116         {
117             destination.onConnectionFailed(ex);
118         }
119     }
120 
121     /* ------------------------------------------------------------ */
122     class Manager extends SelectorManager
123     {
124         @Override
125         public boolean dispatch(Runnable task)
126         {
127             return _httpClient._threadPool.dispatch(task);
128         }
129 
130         @Override
131         protected void endPointOpened(SelectChannelEndPoint endpoint)
132         {
133         }
134 
135         @Override
136         protected void endPointClosed(SelectChannelEndPoint endpoint)
137         {
138         }
139 
140         @Override
141         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
142         {
143         }
144 
145         @Override
146         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
147         {
148             if (endpoint instanceof SslSelectChannelEndPoint)
149                 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
150 
151             return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
152         }
153 
154         @Override
155         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
156         {
157             // We're connected, cancel the connect timeout
158             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
159             if (connectTimeout != null)
160                 connectTimeout.cancel();
161             LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
162 
163             // key should have destination at this point (will be replaced by endpoint after this call)
164             HttpDestination dest=(HttpDestination)key.attachment();
165 
166             SelectChannelEndPoint ep=null;
167 
168             if (dest.isSecure())
169             {
170                 if (dest.isProxied())
171                 {
172                     SSLEngine engine=newSslEngine(channel);
173                     ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout());
174                 }
175                 else
176                 {
177                     SSLEngine engine=newSslEngine(channel);
178                     SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout());
179                     sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate());
180                     ep = sslEp;
181                 }
182             }
183             else
184             {
185                 ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
186             }
187 
188             HttpConnection connection=(HttpConnection)ep.getConnection();
189             connection.setDestination(dest);
190             dest.onNewConnection(connection);
191             return ep;
192         }
193 
194         private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
195         {
196             SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
197             if (_sslContext == null)
198                 _sslContext = sslContextFactory.getSslContext();
199 
200             SSLEngine sslEngine;
201             if (channel != null && sslContextFactory.isSessionCachingEnabled())
202             {
203                 String peerHost = channel.socket().getInetAddress().getHostAddress();
204                 int peerPort = channel.socket().getPort();
205                 sslEngine = _sslContext.createSSLEngine(peerHost, peerPort);
206             }
207             else
208             {
209                 sslEngine = _sslContext.createSSLEngine();
210             }
211             sslEngine.setUseClientMode(true);
212             sslEngine.beginHandshake();
213 
214             return sslEngine;
215         }
216 
217         /* ------------------------------------------------------------ */
218         /* (non-Javadoc)
219          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
220          */
221         @Override
222         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
223         {
224             if (attachment instanceof HttpDestination)
225                 ((HttpDestination)attachment).onConnectionFailed(ex);
226             else
227                 super.connectionFailed(channel,ex,attachment);
228         }
229     }
230 
231     private class ConnectTimeout extends Timeout.Task
232     {
233         private final SocketChannel channel;
234         private final HttpDestination destination;
235 
236         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
237         {
238             this.channel = channel;
239             this.destination = destination;
240         }
241 
242         @Override
243         public void expired()
244         {
245             if (channel.isConnectionPending())
246             {
247                 LOG.debug("Channel {} timed out while connecting, closing it", channel);
248                 try
249                 {
250                     // This will unregister the channel from the selector
251                     channel.close();
252                 }
253                 catch (IOException x)
254                 {
255                     LOG.ignore(x);
256                 }
257                 destination.onConnectionFailed(new SocketTimeoutException());
258             }
259         }
260     }
261 
262     /**
263      * An endpoint that is able to "upgrade" from a normal endpoint to a SSL endpoint.
264      * Since {@link HttpParser} and {@link HttpGenerator} only depend on the {@link EndPoint}
265      * interface, this class overrides all methods of {@link EndPoint} to provide the right
266      * behavior depending on the fact that it has been upgraded or not.
267      */
268     public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
269     {
270         private final SelectChannelEndPoint plainEndPoint;
271         private volatile boolean upgraded = false;
272 
273         public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
274         {
275             super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
276             this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
277         }
278 
279         public void upgrade()
280         {
281             upgraded = true;
282         }
283 
284         public void shutdownOutput() throws IOException
285         {
286             if (upgraded)
287                 super.shutdownOutput();
288             else
289                 plainEndPoint.shutdownOutput();
290         }
291 
292         public void close() throws IOException
293         {
294             if (upgraded)
295                 super.close();
296             else
297                 plainEndPoint.close();
298         }
299 
300         public int fill(Buffer buffer) throws IOException
301         {
302             if (upgraded)
303                 return super.fill(buffer);
304             else
305                 return plainEndPoint.fill(buffer);
306         }
307 
308         public int flush(Buffer buffer) throws IOException
309         {
310             if (upgraded)
311                 return super.flush(buffer);
312             else
313                 return plainEndPoint.flush(buffer);
314         }
315 
316         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
317         {
318             if (upgraded)
319                 return super.flush(header, buffer, trailer);
320             else
321                 return plainEndPoint.flush(header, buffer, trailer);
322         }
323 
324         public String getLocalAddr()
325         {
326             if (upgraded)
327                 return super.getLocalAddr();
328             else
329                 return plainEndPoint.getLocalAddr();
330         }
331 
332         public String getLocalHost()
333         {
334             if (upgraded)
335                 return super.getLocalHost();
336             else
337                 return plainEndPoint.getLocalHost();
338         }
339 
340         public int getLocalPort()
341         {
342             if (upgraded)
343                 return super.getLocalPort();
344             else
345                 return plainEndPoint.getLocalPort();
346         }
347 
348         public String getRemoteAddr()
349         {
350             if (upgraded)
351                 return super.getRemoteAddr();
352             else
353                 return plainEndPoint.getRemoteAddr();
354         }
355 
356         public String getRemoteHost()
357         {
358             if (upgraded)
359                 return super.getRemoteHost();
360             else
361                 return plainEndPoint.getRemoteHost();
362         }
363 
364         public int getRemotePort()
365         {
366             if (upgraded)
367                 return super.getRemotePort();
368             else
369                 return plainEndPoint.getRemotePort();
370         }
371 
372         public boolean isBlocking()
373         {
374             if (upgraded)
375                 return super.isBlocking();
376             else
377                 return plainEndPoint.isBlocking();
378         }
379 
380         public boolean isBufferred()
381         {
382             if (upgraded)
383                 return super.isBufferred();
384             else
385                 return plainEndPoint.isBufferred();
386         }
387 
388         public boolean blockReadable(long millisecs) throws IOException
389         {
390             if (upgraded)
391                 return super.blockReadable(millisecs);
392             else
393                 return plainEndPoint.blockReadable(millisecs);
394         }
395 
396         public boolean blockWritable(long millisecs) throws IOException
397         {
398             if (upgraded)
399                 return super.blockWritable(millisecs);
400             else
401                 return plainEndPoint.blockWritable(millisecs);
402         }
403 
404         public boolean isOpen()
405         {
406             if (upgraded)
407                 return super.isOpen();
408             else
409                 return plainEndPoint.isOpen();
410         }
411 
412         public Object getTransport()
413         {
414             if (upgraded)
415                 return super.getTransport();
416             else
417                 return plainEndPoint.getTransport();
418         }
419 
420         public boolean isBufferingInput()
421         {
422             if (upgraded)
423                 return super.isBufferingInput();
424             else
425                 return plainEndPoint.isBufferingInput();
426         }
427 
428         public boolean isBufferingOutput()
429         {
430             if (upgraded)
431                 return super.isBufferingOutput();
432             else
433                 return plainEndPoint.isBufferingOutput();
434         }
435 
436         public void flush() throws IOException
437         {
438             if (upgraded)
439                 super.flush();
440             else
441                 plainEndPoint.flush();
442 
443         }
444 
445         public int getMaxIdleTime()
446         {
447             if (upgraded)
448                 return super.getMaxIdleTime();
449             else
450                 return plainEndPoint.getMaxIdleTime();
451         }
452 
453         public void setMaxIdleTime(int timeMs) throws IOException
454         {
455             if (upgraded)
456                 super.setMaxIdleTime(timeMs);
457             else
458                 plainEndPoint.setMaxIdleTime(timeMs);
459         }
460     }
461 }