View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.net.SocketTimeoutException;
18  import java.nio.channels.SelectionKey;
19  import java.nio.channels.SocketChannel;
20  import java.nio.channels.UnresolvedAddressException;
21  import java.util.Arrays;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  
25  import javax.net.ssl.SSLEngine;
26  
27  import org.eclipse.jetty.io.AsyncEndPoint;
28  import org.eclipse.jetty.io.Buffer;
29  import org.eclipse.jetty.io.ConnectedEndPoint;
30  import org.eclipse.jetty.io.Connection;
31  import org.eclipse.jetty.io.nio.AsyncConnection;
32  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
33  import org.eclipse.jetty.io.nio.SelectorManager;
34  import org.eclipse.jetty.io.nio.SslConnection;
35  import org.eclipse.jetty.util.component.AbstractLifeCycle;
36  import org.eclipse.jetty.util.component.AggregateLifeCycle;
37  import org.eclipse.jetty.util.component.Dumpable;
38  import org.eclipse.jetty.util.log.Log;
39  import org.eclipse.jetty.util.log.Logger;
40  import org.eclipse.jetty.util.ssl.SslContextFactory;
41  import org.eclipse.jetty.util.thread.Timeout;
42  import org.eclipse.jetty.util.thread.Timeout.Task;
43  
44  class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Dumpable
45  {
46      private static final Logger LOG = Log.getLogger(SelectConnector.class);
47  
48      private final HttpClient _httpClient;
49      private final Manager _selectorManager=new Manager();
50      private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
51  
52      /**
53       * @param httpClient the HttpClient this connector is associated to
54       */
55      SelectConnector(HttpClient httpClient)
56      {
57          _httpClient = httpClient;
58      }
59  
60      /* ------------------------------------------------------------ */
61      @Override
62      protected void doStart() throws Exception
63      {
64          super.doStart();
65  
66          _selectorManager.start();
67      }
68  
69      /* ------------------------------------------------------------ */
70      @Override
71      protected void doStop() throws Exception
72      {
73          _selectorManager.stop();
74      }
75  
76      public String dump()
77      {
78          return AggregateLifeCycle.dump(this);
79      }
80  
81      public void dump(Appendable out, String indent) throws IOException
82      {
83          out.append(String.valueOf(this)).append("\n");
84          AggregateLifeCycle.dump(out, indent, Arrays.asList(_selectorManager));
85      }
86  
87      /* ------------------------------------------------------------ */
88      public void startConnection( HttpDestination destination )
89          throws IOException
90      {
91          SocketChannel channel = null;
92          try
93          {
94              channel = SocketChannel.open();
95              Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
96              channel.socket().setTcpNoDelay(true);
97  
98              if (_httpClient.isConnectBlocking())
99              {
100                     channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
101                     channel.configureBlocking(false);
102                     _selectorManager.register( channel, destination );
103             }
104             else
105             {
106                 channel.configureBlocking(false);
107                 channel.connect(address.toSocketAddress());
108                 _selectorManager.register(channel,destination);
109                 ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
110                 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
111                 _connectingChannels.put(channel,connectTimeout);
112             }
113         }
114         catch (UnresolvedAddressException ex)
115         {
116             if (channel != null)
117                 channel.close();
118             destination.onConnectionFailed(ex);
119         }
120         catch(IOException ex)
121         {
122             if (channel != null)
123                 channel.close();
124             destination.onConnectionFailed(ex);
125         }
126     }
127 
128     /* ------------------------------------------------------------ */
129     class Manager extends SelectorManager
130     {
131         Logger LOG = SelectConnector.LOG;
132 
133         @Override
134         public boolean dispatch(Runnable task)
135         {
136             return _httpClient._threadPool.dispatch(task);
137         }
138 
139         @Override
140         protected void endPointOpened(SelectChannelEndPoint endpoint)
141         {
142         }
143 
144         @Override
145         protected void endPointClosed(SelectChannelEndPoint endpoint)
146         {
147         }
148 
149         @Override
150         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
151         {
152         }
153 
154         @Override
155         public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
156         {
157             return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
158         }
159 
160         @Override
161         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
162         {
163             // We're connected, cancel the connect timeout
164             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
165             if (connectTimeout != null)
166                 connectTimeout.cancel();
167             if (LOG.isDebugEnabled())
168                 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
169 
170             // key should have destination at this point (will be replaced by endpoint after this call)
171             HttpDestination dest=(HttpDestination)key.attachment();
172 
173             SelectChannelEndPoint scep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
174             AsyncEndPoint ep = scep;
175 
176             if (dest.isSecure())
177             {
178                 LOG.debug("secure to {}, proxied={}",channel,dest.isProxied());
179                 ep = new UpgradableEndPoint(ep,newSslEngine(channel));
180             }
181 
182             AsyncConnection connection = selectSet.getManager().newConnection(channel,ep, key.attachment());
183             ep.setConnection(connection);
184 
185             AbstractHttpConnection httpConnection=(AbstractHttpConnection)connection;
186             httpConnection.setDestination(dest);
187 
188             if (dest.isSecure() && !dest.isProxied())
189                 ((UpgradableEndPoint)ep).upgrade();
190 
191             dest.onNewConnection(httpConnection);
192 
193             return scep;
194         }
195 
196         private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
197         {
198             SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
199             SSLEngine sslEngine;
200             if (channel != null)
201             {
202                 String peerHost = channel.socket().getInetAddress().getHostAddress();
203                 int peerPort = channel.socket().getPort();
204                 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
205             }
206             else
207             {
208                 sslEngine = sslContextFactory.newSslEngine();
209             }
210             sslEngine.setUseClientMode(true);
211             sslEngine.beginHandshake();
212 
213             return sslEngine;
214         }
215 
216         /* ------------------------------------------------------------ */
217         /* (non-Javadoc)
218          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
219          */
220         @Override
221         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
222         {
223             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
224             if (connectTimeout != null)
225                 connectTimeout.cancel();
226 
227             if (attachment instanceof HttpDestination)
228                 ((HttpDestination)attachment).onConnectionFailed(ex);
229             else
230                 super.connectionFailed(channel,ex,attachment);
231         }
232     }
233 
234     private class ConnectTimeout extends Timeout.Task
235     {
236         private final SocketChannel channel;
237         private final HttpDestination destination;
238 
239         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
240         {
241             this.channel = channel;
242             this.destination = destination;
243         }
244 
245         @Override
246         public void expired()
247         {
248             if (channel.isConnectionPending())
249             {
250                 LOG.debug("Channel {} timed out while connecting, closing it", channel);
251                 try
252                 {
253                     // This will unregister the channel from the selector
254                     channel.close();
255                 }
256                 catch (IOException x)
257                 {
258                     LOG.ignore(x);
259                 }
260                 destination.onConnectionFailed(new SocketTimeoutException());
261             }
262         }
263     }
264 
265     public static class UpgradableEndPoint implements AsyncEndPoint
266     {
267         AsyncEndPoint _endp;
268         SSLEngine _engine;
269 
270         public UpgradableEndPoint(AsyncEndPoint endp, SSLEngine engine) throws IOException
271         {
272             _engine=engine;
273             _endp=endp;
274         }
275 
276         public void upgrade()
277         {
278             AsyncHttpConnection connection = (AsyncHttpConnection)_endp.getConnection();
279 
280             SslConnection sslConnection = new SslConnection(_engine,_endp);
281             _endp.setConnection(sslConnection);
282 
283             _endp=sslConnection.getSslEndPoint();
284             sslConnection.getSslEndPoint().setConnection(connection);
285 
286             LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection);
287         }
288 
289 
290         public Connection getConnection()
291         {
292             return _endp.getConnection();
293         }
294 
295         public void setConnection(Connection connection)
296         {
297             _endp.setConnection(connection);
298         }
299 
300         public void shutdownOutput() throws IOException
301         {
302             _endp.shutdownOutput();
303         }
304 
305         public void asyncDispatch()
306         {
307             _endp.asyncDispatch();
308         }
309 
310         public boolean isOutputShutdown()
311         {
312             return _endp.isOutputShutdown();
313         }
314 
315         public void shutdownInput() throws IOException
316         {
317             _endp.shutdownInput();
318         }
319 
320         public void scheduleWrite()
321         {
322             _endp.scheduleWrite();
323         }
324 
325         public boolean isInputShutdown()
326         {
327             return _endp.isInputShutdown();
328         }
329 
330         public void close() throws IOException
331         {
332             _endp.close();
333         }
334 
335         public int fill(Buffer buffer) throws IOException
336         {
337             return _endp.fill(buffer);
338         }
339 
340         public boolean isWritable()
341         {
342             return _endp.isWritable();
343         }
344 
345         public boolean hasProgressed()
346         {
347             return _endp.hasProgressed();
348         }
349 
350         public int flush(Buffer buffer) throws IOException
351         {
352             return _endp.flush(buffer);
353         }
354 
355         public void scheduleTimeout(Task task, long timeoutMs)
356         {
357             _endp.scheduleTimeout(task,timeoutMs);
358         }
359 
360         public void cancelTimeout(Task task)
361         {
362             _endp.cancelTimeout(task);
363         }
364 
365         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
366         {
367             return _endp.flush(header,buffer,trailer);
368         }
369 
370         public String getLocalAddr()
371         {
372             return _endp.getLocalAddr();
373         }
374 
375         public String getLocalHost()
376         {
377             return _endp.getLocalHost();
378         }
379 
380         public int getLocalPort()
381         {
382             return _endp.getLocalPort();
383         }
384 
385         public String getRemoteAddr()
386         {
387             return _endp.getRemoteAddr();
388         }
389 
390         public String getRemoteHost()
391         {
392             return _endp.getRemoteHost();
393         }
394 
395         public int getRemotePort()
396         {
397             return _endp.getRemotePort();
398         }
399 
400         public boolean isBlocking()
401         {
402             return _endp.isBlocking();
403         }
404 
405         public boolean blockReadable(long millisecs) throws IOException
406         {
407             return _endp.blockReadable(millisecs);
408         }
409 
410         public boolean blockWritable(long millisecs) throws IOException
411         {
412             return _endp.blockWritable(millisecs);
413         }
414 
415         public boolean isOpen()
416         {
417             return _endp.isOpen();
418         }
419 
420         public Object getTransport()
421         {
422             return _endp.getTransport();
423         }
424 
425         public void flush() throws IOException
426         {
427             _endp.flush();
428         }
429 
430         public int getMaxIdleTime()
431         {
432             return _endp.getMaxIdleTime();
433         }
434 
435         public void setMaxIdleTime(int timeMs) throws IOException
436         {
437             _endp.setMaxIdleTime(timeMs);
438         }
439 
440         public void onIdleExpired()
441         {
442             _endp.onIdleExpired();
443         }
444 
445         public void setCheckForIdle(boolean check)
446         {
447             _endp.setCheckForIdle(check);
448         }
449 
450         public boolean isCheckForIdle()
451         {
452             return _endp.isCheckForIdle();
453         }
454         
455         public String toString()
456         {
457             return "Upgradable:"+_endp.toString();
458         }
459 
460     }
461 }