View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.net.SocketTimeoutException;
18  import java.nio.channels.SelectionKey;
19  import java.nio.channels.SocketChannel;
20  import javax.net.ssl.SSLContext;
21  import javax.net.ssl.SSLEngine;
22  import javax.net.ssl.SSLSession;
23  
24  import org.eclipse.jetty.http.HttpGenerator;
25  import org.eclipse.jetty.http.HttpParser;
26  import org.eclipse.jetty.io.Buffer;
27  import org.eclipse.jetty.io.Buffers;
28  import org.eclipse.jetty.io.ConnectedEndPoint;
29  import org.eclipse.jetty.io.Connection;
30  import org.eclipse.jetty.io.EndPoint;
31  import org.eclipse.jetty.io.ThreadLocalBuffers;
32  import org.eclipse.jetty.io.nio.DirectNIOBuffer;
33  import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
34  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
35  import org.eclipse.jetty.io.nio.SelectorManager;
36  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
37  import org.eclipse.jetty.util.component.AbstractLifeCycle;
38  import org.eclipse.jetty.util.log.Log;
39  import org.eclipse.jetty.util.thread.Timeout;
40  
41  class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
42  {
43      private final HttpClient _httpClient;
44      private final Manager _selectorManager=new Manager();
45      private SSLContext _sslContext;
46      private Buffers _sslBuffers;
47      private boolean _blockingConnect;
48  
49      /**
50       * @param httpClient
51       */
52      SelectConnector(HttpClient httpClient)
53      {
54          _httpClient = httpClient;
55      }
56  
57      /* ------------------------------------------------------------ */
58      /** Get the blockingConnect.
59       * @return the blockingConnect
60       */
61      public boolean isBlockingConnect()
62      {
63          return _blockingConnect;
64      }
65  
66      /* ------------------------------------------------------------ */
67      /** Set the blockingConnect.
68       * @param blockingConnect If true, connections are made in blocking mode.
69       */
70      public void setBlockingConnect(boolean blockingConnect)
71      {
72          _blockingConnect = blockingConnect;
73      }
74  
75      /* ------------------------------------------------------------ */
76      @Override
77      protected void doStart() throws Exception
78      {
79          super.doStart();
80  
81          _selectorManager.start();
82  
83          final boolean direct=_httpClient.getUseDirectBuffers();
84  
85          SSLEngine sslEngine=_selectorManager.newSslEngine();
86          final SSLSession ssl_session=sslEngine.getSession();
87          ThreadLocalBuffers ssl_buffers = new ThreadLocalBuffers()
88          {
89              {
90                  super.setBufferSize(ssl_session.getApplicationBufferSize());
91                  super.setHeaderSize(ssl_session.getApplicationBufferSize());
92              }
93  
94              @Override
95              protected Buffer newBuffer(int size)
96              {
97                  return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
98              }
99              @Override
100             protected Buffer newHeader(int size)
101             {
102                 return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
103             }
104             @Override
105             protected boolean isHeader(Buffer buffer)
106             {
107                 return true;
108             }
109 
110             @Override
111             public void setBufferSize(int size)
112             {
113             }
114 
115             @Override
116             public void setHeaderSize(int size)
117             {
118             }
119         };
120         _sslBuffers=ssl_buffers;
121 
122         _httpClient._threadPool.dispatch(this);
123     }
124 
125     /* ------------------------------------------------------------ */
126     @Override
127     protected void doStop() throws Exception
128     {
129         _selectorManager.stop();
130     }
131 
132     /* ------------------------------------------------------------ */
133     public void startConnection( HttpDestination destination )
134         throws IOException
135     {
136         try
137         {
138             SocketChannel channel = SocketChannel.open();
139             Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
140             channel.configureBlocking( true );
141             channel.socket().setTcpNoDelay(true);
142             channel.socket().setSoTimeout(_httpClient.getConnectTimeout());
143             channel.connect(address.toSocketAddress());
144             channel.configureBlocking(false);
145             channel.socket().setSoTimeout((int)_httpClient.getTimeout());
146 
147             _selectorManager.register( channel, destination );
148         }
149         catch(IOException ex)
150         {
151             destination.onConnectionFailed(ex);
152         }
153 
154     }
155 
156     /* ------------------------------------------------------------ */
157     public void run()
158     {
159         while (_httpClient.isRunning())
160         {
161             try
162             {
163                 _selectorManager.doSelect(0);
164             }
165             catch (Exception e)
166             {
167                 Log.warn(e.toString());
168                 Log.debug(e);
169                 Thread.yield();
170             }
171         }
172     }
173 
174     /* ------------------------------------------------------------ */
175     class Manager extends SelectorManager
176     {
177         @Override
178         public boolean dispatch(Runnable task)
179         {
180             return SelectConnector.this._httpClient._threadPool.dispatch(task);
181         }
182 
183         @Override
184         protected void endPointOpened(SelectChannelEndPoint endpoint)
185         {
186         }
187 
188         @Override
189         protected void endPointClosed(SelectChannelEndPoint endpoint)
190         {
191         }
192 
193         @Override
194         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
195         {
196         }
197 
198         @Override
199         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
200         {
201             if (endpoint instanceof SslSelectChannelEndPoint)
202                 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
203 
204             return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
205         }
206 
207         @Override
208         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
209         {
210             // key should have destination at this point (will be replaced by endpoint after this call)
211             HttpDestination dest=(HttpDestination)key.attachment();
212 
213             SelectChannelEndPoint ep=null;
214 
215             if (dest.isSecure())
216             {
217                 if (dest.isProxied())
218                 {
219                     SSLEngine engine=newSslEngine();
220                     ep = new ProxySelectChannelEndPoint(channel,selectSet,key,_sslBuffers,engine);
221                 }
222                 else
223                 {
224                     SSLEngine engine=newSslEngine();
225                     ep = new SslSelectChannelEndPoint(_sslBuffers,channel,selectSet,key,engine);
226                 }
227             }
228             else
229             {
230                 ep=new SelectChannelEndPoint(channel,selectSet,key);
231             }
232 
233             HttpConnection connection=(HttpConnection)ep.getConnection();
234             connection.setDestination(dest);
235             dest.onNewConnection(connection);
236             return ep;
237         }
238 
239         private synchronized SSLEngine newSslEngine() throws IOException
240         {
241             if (_sslContext==null)
242             {
243                 _sslContext = SelectConnector.this._httpClient.getSSLContext();
244             }
245 
246             SSLEngine sslEngine = _sslContext.createSSLEngine();
247             sslEngine.setUseClientMode(true);
248             sslEngine.beginHandshake();
249 
250             return sslEngine;
251         }
252     }
253 
254     private class ConnectTimeout extends Timeout.Task
255     {
256         private final SocketChannel channel;
257         private final HttpDestination destination;
258 
259         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
260         {
261             this.channel = channel;
262             this.destination = destination;
263         }
264 
265         @Override
266         public void expired()
267         {
268             if (channel.isConnectionPending())
269             {
270                 Log.debug("Channel {} timed out while connecting, closing it", channel);
271                 try
272                 {
273                     // This will unregister the channel from the selector
274                     channel.close();
275                 }
276                 catch (IOException x)
277                 {
278                     Log.ignore(x);
279                 }
280                 destination.onConnectionFailed(new SocketTimeoutException());
281             }
282         }
283     }
284 
285     /**
286      * An endpoint that is able to "upgrade" from a normal endpoint to a SSL endpoint.
287      * Since {@link HttpParser} and {@link HttpGenerator} only depend on the {@link EndPoint}
288      * interface, this class overrides all methods of {@link EndPoint} to provide the right
289      * behavior depending on the fact that it has been upgraded or not.
290      */
291     public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
292     {
293         private final SelectChannelEndPoint plainEndPoint;
294         private volatile boolean upgraded = false;
295 
296         public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine) throws IOException
297         {
298             super(sslBuffers, channel, selectSet, key, engine);
299             this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key);
300         }
301 
302         public void upgrade()
303         {
304             upgraded = true;
305         }
306 
307         public void shutdownOutput() throws IOException
308         {
309             if (upgraded)
310                 super.shutdownOutput();
311             else
312                 plainEndPoint.shutdownOutput();
313         }
314 
315         public void close() throws IOException
316         {
317             if (upgraded)
318                 super.close();
319             else
320                 plainEndPoint.close();
321         }
322 
323         public int fill(Buffer buffer) throws IOException
324         {
325             if (upgraded)
326                 return super.fill(buffer);
327             else
328                 return plainEndPoint.fill(buffer);
329         }
330 
331         public int flush(Buffer buffer) throws IOException
332         {
333             if (upgraded)
334                 return super.flush(buffer);
335             else
336                 return plainEndPoint.flush(buffer);
337         }
338 
339         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
340         {
341             if (upgraded)
342                 return super.flush(header, buffer, trailer);
343             else
344                 return plainEndPoint.flush(header, buffer, trailer);
345         }
346 
347         public String getLocalAddr()
348         {
349             if (upgraded)
350                 return super.getLocalAddr();
351             else
352                 return plainEndPoint.getLocalAddr();
353         }
354 
355         public String getLocalHost()
356         {
357             if (upgraded)
358                 return super.getLocalHost();
359             else
360                 return plainEndPoint.getLocalHost();
361         }
362 
363         public int getLocalPort()
364         {
365             if (upgraded)
366                 return super.getLocalPort();
367             else
368                 return plainEndPoint.getLocalPort();
369         }
370 
371         public String getRemoteAddr()
372         {
373             if (upgraded)
374                 return super.getRemoteAddr();
375             else
376                 return plainEndPoint.getRemoteAddr();
377         }
378 
379         public String getRemoteHost()
380         {
381             if (upgraded)
382                 return super.getRemoteHost();
383             else
384                 return plainEndPoint.getRemoteHost();
385         }
386 
387         public int getRemotePort()
388         {
389             if (upgraded)
390                 return super.getRemotePort();
391             else
392                 return plainEndPoint.getRemotePort();
393         }
394 
395         public boolean isBlocking()
396         {
397             if (upgraded)
398                 return super.isBlocking();
399             else
400                 return plainEndPoint.isBlocking();
401         }
402 
403         public boolean isBufferred()
404         {
405             if (upgraded)
406                 return super.isBufferred();
407             else
408                 return plainEndPoint.isBufferred();
409         }
410 
411         public boolean blockReadable(long millisecs) throws IOException
412         {
413             if (upgraded)
414                 return super.blockReadable(millisecs);
415             else
416                 return plainEndPoint.blockReadable(millisecs);
417         }
418 
419         public boolean blockWritable(long millisecs) throws IOException
420         {
421             if (upgraded)
422                 return super.blockWritable(millisecs);
423             else
424                 return plainEndPoint.blockWritable(millisecs);
425         }
426 
427         public boolean isOpen()
428         {
429             if (upgraded)
430                 return super.isOpen();
431             else
432                 return plainEndPoint.isOpen();
433         }
434 
435         public Object getTransport()
436         {
437             if (upgraded)
438                 return super.getTransport();
439             else
440                 return plainEndPoint.getTransport();
441         }
442 
443         public boolean isBufferingInput()
444         {
445             if (upgraded)
446                 return super.isBufferingInput();
447             else
448                 return plainEndPoint.isBufferingInput();
449         }
450 
451         public boolean isBufferingOutput()
452         {
453             if (upgraded)
454                 return super.isBufferingOutput();
455             else
456                 return plainEndPoint.isBufferingOutput();
457         }
458 
459         public void flush() throws IOException
460         {
461             if (upgraded)
462                 super.flush();
463             else
464                 plainEndPoint.flush();
465 
466         }
467 
468         public int getMaxIdleTime()
469         {
470             if (upgraded)
471                 return super.getMaxIdleTime();
472             else
473                 return plainEndPoint.getMaxIdleTime();
474         }
475 
476         public void setMaxIdleTime(int timeMs) throws IOException
477         {
478             if (upgraded)
479                 super.setMaxIdleTime(timeMs);
480             else
481                 plainEndPoint.setMaxIdleTime(timeMs);
482         }
483     }
484 }