View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.net.SocketTimeoutException;
18  import java.nio.channels.SelectionKey;
19  import java.nio.channels.SocketChannel;
20  import java.util.Map;
21  import java.util.concurrent.ConcurrentHashMap;
22  import javax.net.ssl.SSLContext;
23  import javax.net.ssl.SSLEngine;
24  import javax.net.ssl.SSLSession;
25  
26  import org.eclipse.jetty.http.HttpMethods;
27  import org.eclipse.jetty.http.HttpVersions;
28  import org.eclipse.jetty.io.Buffer;
29  import org.eclipse.jetty.io.Buffers;
30  import org.eclipse.jetty.io.ConnectedEndPoint;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.io.ThreadLocalBuffers;
33  import org.eclipse.jetty.io.nio.DirectNIOBuffer;
34  import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
35  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
36  import org.eclipse.jetty.io.nio.SelectorManager;
37  import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
38  import org.eclipse.jetty.util.component.AbstractLifeCycle;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.thread.Timeout;
41  
42  class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
43  {
44      private final HttpClient _httpClient;
45      private final Manager _selectorManager=new Manager();
46      private final Timeout _connectTimer = new Timeout();
47      private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
48      private SSLContext _sslContext;
49      private Buffers _sslBuffers;
50      private boolean _blockingConnect;
51  
52      /**
53       * @param httpClient
54       */
55      SelectConnector(HttpClient httpClient)
56      {
57          _httpClient = httpClient;
58      }
59  
60      /* ------------------------------------------------------------ */
61      /** Get the blockingConnect.
62       * @return the blockingConnect
63       */
64      public boolean isBlockingConnect()
65      {
66          return _blockingConnect;
67      }
68  
69      /* ------------------------------------------------------------ */
70      /** Set the blockingConnect.
71       * @param blockingConnect If true, connections are made in blocking mode.
72       */
73      public void setBlockingConnect(boolean blockingConnect)
74      {
75          _blockingConnect = blockingConnect;
76      }
77  
78      /* ------------------------------------------------------------ */
79      @Override
80      protected void doStart() throws Exception
81      {
82          super.doStart();
83  
84          _connectTimer.setDuration(_httpClient.getConnectTimeout());
85          _connectTimer.setNow();
86          _httpClient._threadPool.dispatch(new Runnable()
87          {
88              public void run()
89              {
90                  while (isRunning())
91                  {
92                      _connectTimer.tick(System.currentTimeMillis());
93                      try
94                      {
95                          Thread.sleep(200);
96                      }
97                      catch (InterruptedException x)
98                      {
99                          Thread.currentThread().interrupt();
100                         break;
101                     }
102                 }
103             }
104         });
105 
106         _selectorManager.start();
107 
108         final boolean direct=_httpClient.getUseDirectBuffers();
109 
110         SSLEngine sslEngine=_selectorManager.newSslEngine();
111         final SSLSession ssl_session=sslEngine.getSession();
112         ThreadLocalBuffers ssl_buffers = new ThreadLocalBuffers()
113         {
114             {
115                 super.setBufferSize(ssl_session.getApplicationBufferSize());
116                 super.setHeaderSize(ssl_session.getApplicationBufferSize());
117             }
118 
119             @Override
120             protected Buffer newBuffer(int size)
121             {
122                 return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
123             }
124             @Override
125             protected Buffer newHeader(int size)
126             {
127                 return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
128             }
129             @Override
130             protected boolean isHeader(Buffer buffer)
131             {
132                 return true;
133             }
134 
135             @Override
136             public void setBufferSize(int size)
137             {
138             }
139 
140             @Override
141             public void setHeaderSize(int size)
142             {
143             }
144         };
145         _sslBuffers=ssl_buffers;
146 
147         _httpClient._threadPool.dispatch(this);
148     }
149 
150     /* ------------------------------------------------------------ */
151     @Override
152     protected void doStop() throws Exception
153     {
154         _connectTimer.cancelAll();
155         _selectorManager.stop();
156     }
157 
158     /* ------------------------------------------------------------ */
159     public void startConnection( HttpDestination destination )
160         throws IOException
161     {
162         SocketChannel channel = SocketChannel.open();
163         Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
164         channel.configureBlocking( false );
165         channel.socket().setTcpNoDelay(true);
166         channel.connect(address.toSocketAddress());
167         _selectorManager.register( channel, destination );
168         ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination);
169         _connectTimer.schedule(connectTimeout);
170         _connectingChannels.put(channel, connectTimeout);
171     }
172 
173     /* ------------------------------------------------------------ */
174     public void run()
175     {
176         while (_httpClient.isRunning())
177         {
178             try
179             {
180                 _selectorManager.doSelect(0);
181             }
182             catch (Exception e)
183             {
184                 Log.warn(e.toString());
185                 Log.debug(e);
186                 Thread.yield();
187             }
188         }
189     }
190 
191     /* ------------------------------------------------------------ */
192     class Manager extends SelectorManager
193     {
194         @Override
195         protected SocketChannel acceptChannel(SelectionKey key) throws IOException
196         {
197             throw new IllegalStateException();
198         }
199 
200         @Override
201         public boolean dispatch(Runnable task)
202         {
203             return SelectConnector.this._httpClient._threadPool.dispatch(task);
204         }
205 
206         @Override
207         protected void endPointOpened(SelectChannelEndPoint endpoint)
208         {
209         }
210 
211         @Override
212         protected void endPointClosed(SelectChannelEndPoint endpoint)
213         {
214         }
215 
216         @Override
217         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
218         {
219         }
220 
221         @Override
222         protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
223         {
224             if (endpoint instanceof SslSelectChannelEndPoint)
225                 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
226 
227             return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
228         }
229 
230         @Override
231         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
232         {
233             // We're connected, cancel the connect timeout
234             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
235             if (connectTimeout != null)
236                 connectTimeout.cancel();
237             Log.debug("Channels with connection pending: {}", _connectingChannels.size());
238 
239             // key should have destination at this point (will be replaced by endpoint after this call)
240             HttpDestination dest=(HttpDestination)key.attachment();
241 
242             SelectChannelEndPoint ep=null;
243 
244             if (dest.isSecure())
245             {
246                 if (dest.isProxied())
247                 {
248                     String connect = HttpMethods.CONNECT+" "+dest.getAddress()+HttpVersions.HTTP_1_0+"\r\n\r\n";
249                     // TODO need to send this over channel unencrypted and setup endpoint to ignore the 200 OK response.
250 
251                     throw new IllegalStateException("Not Implemented");
252                 }
253 
254                 SSLEngine engine=newSslEngine();
255                 ep = new SslSelectChannelEndPoint(_sslBuffers,channel,selectSet,key,engine);
256             }
257             else
258             {
259                 ep=new SelectChannelEndPoint(channel,selectSet,key);
260             }
261 
262             HttpConnection connection=(HttpConnection)ep.getConnection();
263             connection.setDestination(dest);
264             dest.onNewConnection(connection);
265             return ep;
266         }
267 
268         private synchronized SSLEngine newSslEngine() throws IOException
269         {
270             if (_sslContext==null)
271             {
272                 _sslContext = SelectConnector.this._httpClient.getSSLContext();
273             }
274 
275             SSLEngine sslEngine = _sslContext.createSSLEngine();
276             sslEngine.setUseClientMode(true);
277             sslEngine.beginHandshake();
278 
279             return sslEngine;
280         }
281 
282         /* ------------------------------------------------------------ */
283         /* (non-Javadoc)
284          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
285          */
286         @Override
287         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
288         {
289             if (attachment instanceof HttpDestination)
290                 ((HttpDestination)attachment).onConnectionFailed(ex);
291             else
292                 super.connectionFailed(channel,ex,attachment);
293         }
294     }
295 
296     private class ConnectTimeout extends Timeout.Task
297     {
298         private final SocketChannel channel;
299         private final HttpDestination destination;
300 
301         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
302         {
303             this.channel = channel;
304             this.destination = destination;
305         }
306 
307         @Override
308         public void expired()
309         {
310             _connectingChannels.remove(channel);
311             if (channel.isConnectionPending())
312             {
313                 Log.debug("Channel {} timed out while connecting, closing it", channel);
314                 try
315                 {
316                     // This will unregister the channel from the selector
317                     channel.close();
318                 }
319                 catch (IOException x)
320                 {
321                     Log.ignore(x);
322                 }
323                 destination.onConnectionFailed(new SocketTimeoutException());
324             }
325         }
326     }
327 }