View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.net.SocketTimeoutException;
18  import java.nio.channels.SelectionKey;
19  import java.nio.channels.SocketChannel;
20  import java.util.Map;
21  import java.util.concurrent.ConcurrentHashMap;
22  import javax.net.ssl.SSLContext;
23  import javax.net.ssl.SSLEngine;
24  import javax.net.ssl.SSLSession;
25  
26  import org.eclipse.jetty.http.HttpGenerator;
27  import org.eclipse.jetty.http.HttpParser;
28  import org.eclipse.jetty.http.ssl.SslContextFactory;
29  import org.eclipse.jetty.io.Buffer;
30  import org.eclipse.jetty.io.Buffers;
31  import org.eclipse.jetty.io.Buffers.Type;
32  import org.eclipse.jetty.io.BuffersFactory;
33  import org.eclipse.jetty.io.ConnectedEndPoint;
34  import org.eclipse.jetty.io.Connection;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
37  import org.eclipse.jetty.io.nio.SelectorManager;
38  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
39  import org.eclipse.jetty.util.component.AbstractLifeCycle;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.thread.Timeout;
42  
43  class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
44  {
45      private final HttpClient _httpClient;
46      private final Manager _selectorManager=new Manager();
47      private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
48      private SSLContext _sslContext;
49      private Buffers _sslBuffers;
50  
51      /**
52       * @param httpClient the HttpClient this connector is associated to
53       */
54      SelectConnector(HttpClient httpClient)
55      {
56          _httpClient = httpClient;
57      }
58  
59      /* ------------------------------------------------------------ */
60      @Override
61      protected void doStart() throws Exception
62      {
63          super.doStart();
64  
65          _selectorManager.start();
66  
67          final boolean direct=_httpClient.getUseDirectBuffers();
68  
69          SSLEngine sslEngine=_selectorManager.newSslEngine(null);
70          final SSLSession ssl_session=sslEngine.getSession();
71          _sslBuffers = BuffersFactory.newBuffers(
72                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
73                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
74                  direct?Type.DIRECT:Type.INDIRECT,1024);
75  
76          _httpClient._threadPool.dispatch(this);
77      }
78  
79      /* ------------------------------------------------------------ */
80      @Override
81      protected void doStop() throws Exception
82      {
83          _selectorManager.stop();
84      }
85  
86      /* ------------------------------------------------------------ */
87      public void startConnection( HttpDestination destination )
88          throws IOException
89      {
90          try
91          {
92              SocketChannel channel = SocketChannel.open();
93              Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
94              channel.socket().setTcpNoDelay(true);
95  
96              if (_httpClient.isConnectBlocking())
97              {
98                  channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
99                  channel.configureBlocking(false);
100                 _selectorManager.register( channel, destination );
101             }
102             else
103             {
104                 channel.configureBlocking( false );
105                 channel.connect(address.toSocketAddress());
106                 _selectorManager.register( channel, destination );
107                 ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination);
108                 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
109                 _connectingChannels.put(channel, connectTimeout);
110             }
111 
112         }
113         catch(IOException ex)
114         {
115             destination.onConnectionFailed(ex);
116         }
117 
118     }
119 
120     /* ------------------------------------------------------------ */
121     public void run()
122     {
123         while (_httpClient.isRunning())
124         {
125             try
126             {
127                 _selectorManager.doSelect(0);
128             }
129             catch (Exception e)
130             {
131                 Log.warn(e.toString());
132                 Log.debug(e);
133                 Thread.yield();
134             }
135         }
136     }
137 
138     /* ------------------------------------------------------------ */
139     class Manager extends SelectorManager
140     {
141         @Override
142         public boolean dispatch(Runnable task)
143         {
144             return _httpClient._threadPool.dispatch(task);
145         }
146 
147         @Override
148         protected void endPointOpened(SelectChannelEndPoint endpoint)
149         {
150         }
151 
152         @Override
153         protected void endPointClosed(SelectChannelEndPoint endpoint)
154         {
155         }
156 
157         @Override
158         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
159         {
160         }
161 
162         @Override
163         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
164         {
165             if (endpoint instanceof SslSelectChannelEndPoint)
166                 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
167 
168             return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
169         }
170 
171         @Override
172         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
173         {
174             // We're connected, cancel the connect timeout
175             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
176             if (connectTimeout != null)
177                 connectTimeout.cancel();
178             Log.debug("Channels with connection pending: {}", _connectingChannels.size());
179 
180             // key should have destination at this point (will be replaced by endpoint after this call)
181             HttpDestination dest=(HttpDestination)key.attachment();
182 
183             SelectChannelEndPoint ep=null;
184 
185             if (dest.isSecure())
186             {
187                 if (dest.isProxied())
188                 {
189                     SSLEngine engine=newSslEngine(channel);
190                     ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout());
191                 }
192                 else
193                 {
194                     SSLEngine engine=newSslEngine(channel);
195                     SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout());
196                     sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate());
197                     ep = sslEp;
198                 }
199             }
200             else
201             {
202                 ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
203             }
204 
205             HttpConnection connection=(HttpConnection)ep.getConnection();
206             connection.setDestination(dest);
207             dest.onNewConnection(connection);
208             return ep;
209         }
210 
211         private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
212         {
213             SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
214             if (_sslContext == null)
215                 _sslContext = sslContextFactory.getSslContext();
216 
217             SSLEngine sslEngine;
218             if (channel != null && sslContextFactory.isSessionCachingEnabled())
219             {
220                 String peerHost = channel.socket().getInetAddress().getHostAddress();
221                 int peerPort = channel.socket().getPort();
222                 sslEngine = _sslContext.createSSLEngine(peerHost, peerPort);
223             }
224             else
225             {
226                 sslEngine = _sslContext.createSSLEngine();
227             }
228             sslEngine.setUseClientMode(true);
229             sslEngine.beginHandshake();
230 
231             return sslEngine;
232         }
233 
234         /* ------------------------------------------------------------ */
235         /* (non-Javadoc)
236          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
237          */
238         @Override
239         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
240         {
241             if (attachment instanceof HttpDestination)
242                 ((HttpDestination)attachment).onConnectionFailed(ex);
243             else
244                 super.connectionFailed(channel,ex,attachment);
245         }
246     }
247 
248     private class ConnectTimeout extends Timeout.Task
249     {
250         private final SocketChannel channel;
251         private final HttpDestination destination;
252 
253         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
254         {
255             this.channel = channel;
256             this.destination = destination;
257         }
258 
259         @Override
260         public void expired()
261         {
262             if (channel.isConnectionPending())
263             {
264                 Log.debug("Channel {} timed out while connecting, closing it", channel);
265                 try
266                 {
267                     // This will unregister the channel from the selector
268                     channel.close();
269                 }
270                 catch (IOException x)
271                 {
272                     Log.ignore(x);
273                 }
274                 destination.onConnectionFailed(new SocketTimeoutException());
275             }
276         }
277     }
278 
279     /**
280      * An endpoint that is able to "upgrade" from a normal endpoint to a SSL endpoint.
281      * Since {@link HttpParser} and {@link HttpGenerator} only depend on the {@link EndPoint}
282      * interface, this class overrides all methods of {@link EndPoint} to provide the right
283      * behavior depending on the fact that it has been upgraded or not.
284      */
285     public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
286     {
287         private final SelectChannelEndPoint plainEndPoint;
288         private volatile boolean upgraded = false;
289 
290         public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
291         {
292             super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
293             this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
294         }
295 
296         public void upgrade()
297         {
298             upgraded = true;
299         }
300 
301         public void shutdownOutput() throws IOException
302         {
303             if (upgraded)
304                 super.shutdownOutput();
305             else
306                 plainEndPoint.shutdownOutput();
307         }
308 
309         public void close() throws IOException
310         {
311             if (upgraded)
312                 super.close();
313             else
314                 plainEndPoint.close();
315         }
316 
317         public int fill(Buffer buffer) throws IOException
318         {
319             if (upgraded)
320                 return super.fill(buffer);
321             else
322                 return plainEndPoint.fill(buffer);
323         }
324 
325         public int flush(Buffer buffer) throws IOException
326         {
327             if (upgraded)
328                 return super.flush(buffer);
329             else
330                 return plainEndPoint.flush(buffer);
331         }
332 
333         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
334         {
335             if (upgraded)
336                 return super.flush(header, buffer, trailer);
337             else
338                 return plainEndPoint.flush(header, buffer, trailer);
339         }
340 
341         public String getLocalAddr()
342         {
343             if (upgraded)
344                 return super.getLocalAddr();
345             else
346                 return plainEndPoint.getLocalAddr();
347         }
348 
349         public String getLocalHost()
350         {
351             if (upgraded)
352                 return super.getLocalHost();
353             else
354                 return plainEndPoint.getLocalHost();
355         }
356 
357         public int getLocalPort()
358         {
359             if (upgraded)
360                 return super.getLocalPort();
361             else
362                 return plainEndPoint.getLocalPort();
363         }
364 
365         public String getRemoteAddr()
366         {
367             if (upgraded)
368                 return super.getRemoteAddr();
369             else
370                 return plainEndPoint.getRemoteAddr();
371         }
372 
373         public String getRemoteHost()
374         {
375             if (upgraded)
376                 return super.getRemoteHost();
377             else
378                 return plainEndPoint.getRemoteHost();
379         }
380 
381         public int getRemotePort()
382         {
383             if (upgraded)
384                 return super.getRemotePort();
385             else
386                 return plainEndPoint.getRemotePort();
387         }
388 
389         public boolean isBlocking()
390         {
391             if (upgraded)
392                 return super.isBlocking();
393             else
394                 return plainEndPoint.isBlocking();
395         }
396 
397         public boolean isBufferred()
398         {
399             if (upgraded)
400                 return super.isBufferred();
401             else
402                 return plainEndPoint.isBufferred();
403         }
404 
405         public boolean blockReadable(long millisecs) throws IOException
406         {
407             if (upgraded)
408                 return super.blockReadable(millisecs);
409             else
410                 return plainEndPoint.blockReadable(millisecs);
411         }
412 
413         public boolean blockWritable(long millisecs) throws IOException
414         {
415             if (upgraded)
416                 return super.blockWritable(millisecs);
417             else
418                 return plainEndPoint.blockWritable(millisecs);
419         }
420 
421         public boolean isOpen()
422         {
423             if (upgraded)
424                 return super.isOpen();
425             else
426                 return plainEndPoint.isOpen();
427         }
428 
429         public Object getTransport()
430         {
431             if (upgraded)
432                 return super.getTransport();
433             else
434                 return plainEndPoint.getTransport();
435         }
436 
437         public boolean isBufferingInput()
438         {
439             if (upgraded)
440                 return super.isBufferingInput();
441             else
442                 return plainEndPoint.isBufferingInput();
443         }
444 
445         public boolean isBufferingOutput()
446         {
447             if (upgraded)
448                 return super.isBufferingOutput();
449             else
450                 return plainEndPoint.isBufferingOutput();
451         }
452 
453         public void flush() throws IOException
454         {
455             if (upgraded)
456                 super.flush();
457             else
458                 plainEndPoint.flush();
459 
460         }
461 
462         public int getMaxIdleTime()
463         {
464             if (upgraded)
465                 return super.getMaxIdleTime();
466             else
467                 return plainEndPoint.getMaxIdleTime();
468         }
469 
470         public void setMaxIdleTime(int timeMs) throws IOException
471         {
472             if (upgraded)
473                 super.setMaxIdleTime(timeMs);
474             else
475                 plainEndPoint.setMaxIdleTime(timeMs);
476         }
477     }
478 }