View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.net.SocketTimeoutException;
18  import java.nio.channels.SelectionKey;
19  import java.nio.channels.SocketChannel;
20  import java.util.Map;
21  import java.util.concurrent.ConcurrentHashMap;
22  
23  import javax.net.ssl.SSLContext;
24  import javax.net.ssl.SSLEngine;
25  import javax.net.ssl.SSLSession;
26  
27  import org.eclipse.jetty.http.HttpGenerator;
28  import org.eclipse.jetty.http.HttpParser;
29  import org.eclipse.jetty.io.Buffer;
30  import org.eclipse.jetty.io.Buffers;
31  import org.eclipse.jetty.io.Buffers.Type;
32  import org.eclipse.jetty.io.BuffersFactory;
33  import org.eclipse.jetty.io.ConnectedEndPoint;
34  import org.eclipse.jetty.io.Connection;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
37  import org.eclipse.jetty.io.nio.SelectorManager;
38  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
39  import org.eclipse.jetty.util.component.AbstractLifeCycle;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.thread.Timeout;
42  
43  class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
44  {
45      private final HttpClient _httpClient;
46      private final Manager _selectorManager=new Manager();
47      private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
48      private SSLContext _sslContext;
49      private Buffers _sslBuffers;
50      private int _maxBuffers=1024;
51  
52      /**
53       * @param httpClient
54       */
55      SelectConnector(HttpClient httpClient)
56      {
57          _httpClient = httpClient;
58      }
59  
60      /* ------------------------------------------------------------ */
61      @Override
62      protected void doStart() throws Exception
63      {
64          super.doStart();
65  
66          _selectorManager.start();
67  
68          final boolean direct=_httpClient.getUseDirectBuffers();
69  
70          SSLEngine sslEngine=_selectorManager.newSslEngine();
71          final SSLSession ssl_session=sslEngine.getSession();
72          _sslBuffers = BuffersFactory.newBuffers(
73                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
74                  direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
75                  direct?Type.DIRECT:Type.INDIRECT,_maxBuffers); 
76  
77          _httpClient._threadPool.dispatch(this);
78      }
79  
80      /* ------------------------------------------------------------ */
81      @Override
82      protected void doStop() throws Exception
83      {
84          _selectorManager.stop();
85      }
86  
87      /* ------------------------------------------------------------ */
88      public void startConnection( HttpDestination destination )
89          throws IOException
90      {
91          try
92          {
93              SocketChannel channel = SocketChannel.open();
94              Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
95              channel.socket().setTcpNoDelay(true);
96  
97              if (!_httpClient.isConnectBlocking())
98              {
99                  channel.configureBlocking( false );
100                 channel.connect(address.toSocketAddress());
101                 _selectorManager.register( channel, destination );
102                 ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination);
103                 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
104                 _connectingChannels.put(channel, connectTimeout);
105             }
106             else
107             {
108                 channel.configureBlocking( true );
109                 channel.socket().setSoTimeout(_httpClient.getConnectTimeout());
110                 channel.socket().connect(address.toSocketAddress(),_httpClient.getConnectTimeout());
111                 channel.configureBlocking(false);
112                 channel.socket().setSoTimeout((int)_httpClient.getTimeout());
113 
114                 _selectorManager.register( channel, destination );
115             }
116 
117         }
118         catch(IOException ex)
119         {
120             destination.onConnectionFailed(ex);
121         }
122 
123     }
124 
125     /* ------------------------------------------------------------ */
126     public void run()
127     {
128         while (_httpClient.isRunning())
129         {
130             try
131             {
132                 _selectorManager.doSelect(0);
133             }
134             catch (Exception e)
135             {
136                 Log.warn(e.toString());
137                 Log.debug(e);
138                 Thread.yield();
139             }
140         }
141     }
142 
143     /* ------------------------------------------------------------ */
144     class Manager extends SelectorManager
145     {
146         @Override
147         public boolean dispatch(Runnable task)
148         {
149             return SelectConnector.this._httpClient._threadPool.dispatch(task);
150         }
151 
152         @Override
153         protected void endPointOpened(SelectChannelEndPoint endpoint)
154         {
155         }
156 
157         @Override
158         protected void endPointClosed(SelectChannelEndPoint endpoint)
159         {
160         }
161 
162         @Override
163         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
164         {
165         }
166 
167         @Override
168         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
169         {
170             if (endpoint instanceof SslSelectChannelEndPoint)
171                 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
172 
173             return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
174         }
175 
176         @Override
177         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
178         {
179             // We're connected, cancel the connect timeout
180             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
181             if (connectTimeout != null)
182                 connectTimeout.cancel();
183             Log.debug("Channels with connection pending: {}", _connectingChannels.size());
184 
185             // key should have destination at this point (will be replaced by endpoint after this call)
186             HttpDestination dest=(HttpDestination)key.attachment();
187 
188             SelectChannelEndPoint ep=null;
189 
190             if (dest.isSecure())
191             {
192                 if (dest.isProxied())
193                 {
194                     SSLEngine engine=newSslEngine();
195                     ep = new ProxySelectChannelEndPoint(channel,selectSet,key,_sslBuffers,engine);
196                 }
197                 else
198                 {
199                     SSLEngine engine=newSslEngine();
200                     ep = new SslSelectChannelEndPoint(_sslBuffers,channel,selectSet,key,engine);
201                 }
202             }
203             else
204             {
205                 ep=new SelectChannelEndPoint(channel,selectSet,key);
206             }
207 
208             HttpConnection connection=(HttpConnection)ep.getConnection();
209             connection.setDestination(dest);
210             dest.onNewConnection(connection);
211             return ep;
212         }
213 
214         private synchronized SSLEngine newSslEngine() throws IOException
215         {
216             if (_sslContext==null)
217             {
218                 _sslContext = SelectConnector.this._httpClient.getSSLContext();
219             }
220 
221             SSLEngine sslEngine = _sslContext.createSSLEngine();
222             sslEngine.setUseClientMode(true);
223             sslEngine.beginHandshake();
224 
225             return sslEngine;
226         }
227 
228         /* ------------------------------------------------------------ */
229         /* (non-Javadoc)
230          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
231          */
232         @Override
233         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
234         {
235             if (attachment instanceof HttpDestination)
236                 ((HttpDestination)attachment).onConnectionFailed(ex);
237             else
238                 super.connectionFailed(channel,ex,attachment);
239         }
240     }
241 
242     private class ConnectTimeout extends Timeout.Task
243     {
244         private final SocketChannel channel;
245         private final HttpDestination destination;
246 
247         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
248         {
249             this.channel = channel;
250             this.destination = destination;
251         }
252 
253         @Override
254         public void expired()
255         {
256             if (channel.isConnectionPending())
257             {
258                 Log.debug("Channel {} timed out while connecting, closing it", channel);
259                 try
260                 {
261                     // This will unregister the channel from the selector
262                     channel.close();
263                 }
264                 catch (IOException x)
265                 {
266                     Log.ignore(x);
267                 }
268                 destination.onConnectionFailed(new SocketTimeoutException());
269             }
270         }
271     }
272 
273     /**
274      * An endpoint that is able to "upgrade" from a normal endpoint to a SSL endpoint.
275      * Since {@link HttpParser} and {@link HttpGenerator} only depend on the {@link EndPoint}
276      * interface, this class overrides all methods of {@link EndPoint} to provide the right
277      * behavior depending on the fact that it has been upgraded or not.
278      */
279     public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
280     {
281         private final SelectChannelEndPoint plainEndPoint;
282         private volatile boolean upgraded = false;
283 
284         public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine) throws IOException
285         {
286             super(sslBuffers, channel, selectSet, key, engine);
287             this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key);
288         }
289 
290         public void upgrade()
291         {
292             upgraded = true;
293         }
294 
295         public void shutdownOutput() throws IOException
296         {
297             if (upgraded)
298                 super.shutdownOutput();
299             else
300                 plainEndPoint.shutdownOutput();
301         }
302 
303         public void close() throws IOException
304         {
305             if (upgraded)
306                 super.close();
307             else
308                 plainEndPoint.close();
309         }
310 
311         public int fill(Buffer buffer) throws IOException
312         {
313             if (upgraded)
314                 return super.fill(buffer);
315             else
316                 return plainEndPoint.fill(buffer);
317         }
318 
319         public int flush(Buffer buffer) throws IOException
320         {
321             if (upgraded)
322                 return super.flush(buffer);
323             else
324                 return plainEndPoint.flush(buffer);
325         }
326 
327         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
328         {
329             if (upgraded)
330                 return super.flush(header, buffer, trailer);
331             else
332                 return plainEndPoint.flush(header, buffer, trailer);
333         }
334 
335         public String getLocalAddr()
336         {
337             if (upgraded)
338                 return super.getLocalAddr();
339             else
340                 return plainEndPoint.getLocalAddr();
341         }
342 
343         public String getLocalHost()
344         {
345             if (upgraded)
346                 return super.getLocalHost();
347             else
348                 return plainEndPoint.getLocalHost();
349         }
350 
351         public int getLocalPort()
352         {
353             if (upgraded)
354                 return super.getLocalPort();
355             else
356                 return plainEndPoint.getLocalPort();
357         }
358 
359         public String getRemoteAddr()
360         {
361             if (upgraded)
362                 return super.getRemoteAddr();
363             else
364                 return plainEndPoint.getRemoteAddr();
365         }
366 
367         public String getRemoteHost()
368         {
369             if (upgraded)
370                 return super.getRemoteHost();
371             else
372                 return plainEndPoint.getRemoteHost();
373         }
374 
375         public int getRemotePort()
376         {
377             if (upgraded)
378                 return super.getRemotePort();
379             else
380                 return plainEndPoint.getRemotePort();
381         }
382 
383         public boolean isBlocking()
384         {
385             if (upgraded)
386                 return super.isBlocking();
387             else
388                 return plainEndPoint.isBlocking();
389         }
390 
391         public boolean isBufferred()
392         {
393             if (upgraded)
394                 return super.isBufferred();
395             else
396                 return plainEndPoint.isBufferred();
397         }
398 
399         public boolean blockReadable(long millisecs) throws IOException
400         {
401             if (upgraded)
402                 return super.blockReadable(millisecs);
403             else
404                 return plainEndPoint.blockReadable(millisecs);
405         }
406 
407         public boolean blockWritable(long millisecs) throws IOException
408         {
409             if (upgraded)
410                 return super.blockWritable(millisecs);
411             else
412                 return plainEndPoint.blockWritable(millisecs);
413         }
414 
415         public boolean isOpen()
416         {
417             if (upgraded)
418                 return super.isOpen();
419             else
420                 return plainEndPoint.isOpen();
421         }
422 
423         public Object getTransport()
424         {
425             if (upgraded)
426                 return super.getTransport();
427             else
428                 return plainEndPoint.getTransport();
429         }
430 
431         public boolean isBufferingInput()
432         {
433             if (upgraded)
434                 return super.isBufferingInput();
435             else
436                 return plainEndPoint.isBufferingInput();
437         }
438 
439         public boolean isBufferingOutput()
440         {
441             if (upgraded)
442                 return super.isBufferingOutput();
443             else
444                 return plainEndPoint.isBufferingOutput();
445         }
446 
447         public void flush() throws IOException
448         {
449             if (upgraded)
450                 super.flush();
451             else
452                 plainEndPoint.flush();
453 
454         }
455 
456         public int getMaxIdleTime()
457         {
458             if (upgraded)
459                 return super.getMaxIdleTime();
460             else
461                 return plainEndPoint.getMaxIdleTime();
462         }
463 
464         public void setMaxIdleTime(int timeMs) throws IOException
465         {
466             if (upgraded)
467                 super.setMaxIdleTime(timeMs);
468             else
469                 plainEndPoint.setMaxIdleTime(timeMs);
470         }
471     }
472 }