View Javadoc

1   // ========================================================================
2   // Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.client;
15  
16  import java.io.IOException;
17  import java.net.SocketTimeoutException;
18  import java.nio.channels.SelectionKey;
19  import java.nio.channels.SocketChannel;
20  import java.nio.channels.UnresolvedAddressException;
21  import java.util.Map;
22  import java.util.concurrent.ConcurrentHashMap;
23  import javax.net.ssl.SSLEngine;
24  
25  import org.eclipse.jetty.io.AsyncEndPoint;
26  import org.eclipse.jetty.io.Buffer;
27  import org.eclipse.jetty.io.ConnectedEndPoint;
28  import org.eclipse.jetty.io.Connection;
29  import org.eclipse.jetty.io.nio.AsyncConnection;
30  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
31  import org.eclipse.jetty.io.nio.SelectorManager;
32  import org.eclipse.jetty.io.nio.SslConnection;
33  import org.eclipse.jetty.util.component.AggregateLifeCycle;
34  import org.eclipse.jetty.util.component.Dumpable;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.util.ssl.SslContextFactory;
38  import org.eclipse.jetty.util.thread.Timeout;
39  import org.eclipse.jetty.util.thread.Timeout.Task;
40  
41  class SelectConnector extends AggregateLifeCycle implements HttpClient.Connector, Dumpable
42  {
43      private static final Logger LOG = Log.getLogger(SelectConnector.class);
44  
45      private final HttpClient _httpClient;
46      private final Manager _selectorManager=new Manager();
47      private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
48  
49      /* ------------------------------------------------------------ */
50      /**
51       * @param httpClient the HttpClient this connector is associated to. It is 
52       * added via the {@link #addBean(Object, boolean)} as an unmanaged bean.
53       */
54      SelectConnector(HttpClient httpClient)
55      {
56          _httpClient = httpClient;
57          addBean(_httpClient,false);
58          addBean(_selectorManager,true);
59      }
60  
61      /* ------------------------------------------------------------ */
62      public void startConnection( HttpDestination destination )
63          throws IOException
64      {
65          SocketChannel channel = null;
66          try
67          {
68              channel = SocketChannel.open();
69              Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
70              channel.socket().setTcpNoDelay(true);
71  
72              if (_httpClient.isConnectBlocking())
73              {
74                  channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
75                  channel.configureBlocking(false);
76                  _selectorManager.register( channel, destination );
77              }
78              else
79              {
80                  channel.configureBlocking(false);
81                  channel.connect(address.toSocketAddress());
82                  _selectorManager.register(channel,destination);
83                  ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
84                  _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
85                  _connectingChannels.put(channel,connectTimeout);
86              }
87          }
88          catch (UnresolvedAddressException ex)
89          {
90              if (channel != null)
91                  channel.close();
92              destination.onConnectionFailed(ex);
93          }
94          catch(IOException ex)
95          {
96              if (channel != null)
97                  channel.close();
98              destination.onConnectionFailed(ex);
99          }
100     }
101 
102     /* ------------------------------------------------------------ */
103     class Manager extends SelectorManager
104     {
105         Logger LOG = SelectConnector.LOG;
106 
107         @Override
108         public boolean dispatch(Runnable task)
109         {
110             return _httpClient._threadPool.dispatch(task);
111         }
112 
113         @Override
114         protected void endPointOpened(SelectChannelEndPoint endpoint)
115         {
116         }
117 
118         @Override
119         protected void endPointClosed(SelectChannelEndPoint endpoint)
120         {
121         }
122 
123         @Override
124         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
125         {
126         }
127 
128         @Override
129         public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
130         {
131             return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
132         }
133 
134         @Override
135         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
136         {
137             // We're connected, cancel the connect timeout
138             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
139             if (connectTimeout != null)
140                 connectTimeout.cancel();
141             if (LOG.isDebugEnabled())
142                 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
143 
144             // key should have destination at this point (will be replaced by endpoint after this call)
145             HttpDestination dest=(HttpDestination)key.attachment();
146 
147             SelectChannelEndPoint scep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
148             AsyncEndPoint ep = scep;
149 
150             if (dest.isSecure())
151             {
152                 LOG.debug("secure to {}, proxied={}",channel,dest.isProxied());
153                 ep = new UpgradableEndPoint(ep,newSslEngine(channel));
154             }
155 
156             AsyncConnection connection = selectSet.getManager().newConnection(channel,ep, key.attachment());
157             ep.setConnection(connection);
158 
159             AbstractHttpConnection httpConnection=(AbstractHttpConnection)connection;
160             httpConnection.setDestination(dest);
161 
162             if (dest.isSecure() && !dest.isProxied())
163                 ((UpgradableEndPoint)ep).upgrade();
164 
165             dest.onNewConnection(httpConnection);
166 
167             return scep;
168         }
169 
170         private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
171         {
172             SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
173             SSLEngine sslEngine;
174             if (channel != null)
175             {
176                 String peerHost = channel.socket().getInetAddress().getHostAddress();
177                 int peerPort = channel.socket().getPort();
178                 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
179             }
180             else
181             {
182                 sslEngine = sslContextFactory.newSslEngine();
183             }
184             sslEngine.setUseClientMode(true);
185             sslEngine.beginHandshake();
186 
187             return sslEngine;
188         }
189 
190         /* ------------------------------------------------------------ */
191         /* (non-Javadoc)
192          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
193          */
194         @Override
195         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
196         {
197             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
198             if (connectTimeout != null)
199                 connectTimeout.cancel();
200 
201             if (attachment instanceof HttpDestination)
202                 ((HttpDestination)attachment).onConnectionFailed(ex);
203             else
204                 super.connectionFailed(channel,ex,attachment);
205         }
206     }
207 
208     private class ConnectTimeout extends Timeout.Task
209     {
210         private final SocketChannel channel;
211         private final HttpDestination destination;
212 
213         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
214         {
215             this.channel = channel;
216             this.destination = destination;
217         }
218 
219         @Override
220         public void expired()
221         {
222             if (channel.isConnectionPending())
223             {
224                 LOG.debug("Channel {} timed out while connecting, closing it", channel);
225                 try
226                 {
227                     // This will unregister the channel from the selector
228                     channel.close();
229                 }
230                 catch (IOException x)
231                 {
232                     LOG.ignore(x);
233                 }
234                 destination.onConnectionFailed(new SocketTimeoutException());
235             }
236         }
237     }
238 
239     public static class UpgradableEndPoint implements AsyncEndPoint
240     {
241         AsyncEndPoint _endp;
242         SSLEngine _engine;
243 
244         public UpgradableEndPoint(AsyncEndPoint endp, SSLEngine engine) throws IOException
245         {
246             _engine=engine;
247             _endp=endp;
248         }
249 
250         public void upgrade()
251         {
252             AsyncHttpConnection connection = (AsyncHttpConnection)_endp.getConnection();
253 
254             SslConnection sslConnection = new SslConnection(_engine,_endp);
255             _endp.setConnection(sslConnection);
256 
257             _endp=sslConnection.getSslEndPoint();
258             sslConnection.getSslEndPoint().setConnection(connection);
259 
260             LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection);
261         }
262 
263 
264         public Connection getConnection()
265         {
266             return _endp.getConnection();
267         }
268 
269         public void setConnection(Connection connection)
270         {
271             _endp.setConnection(connection);
272         }
273 
274         public void shutdownOutput() throws IOException
275         {
276             _endp.shutdownOutput();
277         }
278 
279         public void asyncDispatch()
280         {
281             _endp.asyncDispatch();
282         }
283 
284         public boolean isOutputShutdown()
285         {
286             return _endp.isOutputShutdown();
287         }
288 
289         public void shutdownInput() throws IOException
290         {
291             _endp.shutdownInput();
292         }
293 
294         public void scheduleWrite()
295         {
296             _endp.scheduleWrite();
297         }
298 
299         public boolean isInputShutdown()
300         {
301             return _endp.isInputShutdown();
302         }
303 
304         public void close() throws IOException
305         {
306             _endp.close();
307         }
308 
309         public int fill(Buffer buffer) throws IOException
310         {
311             return _endp.fill(buffer);
312         }
313 
314         public boolean isWritable()
315         {
316             return _endp.isWritable();
317         }
318 
319         public boolean hasProgressed()
320         {
321             return _endp.hasProgressed();
322         }
323 
324         public int flush(Buffer buffer) throws IOException
325         {
326             return _endp.flush(buffer);
327         }
328 
329         public void scheduleTimeout(Task task, long timeoutMs)
330         {
331             _endp.scheduleTimeout(task,timeoutMs);
332         }
333 
334         public void cancelTimeout(Task task)
335         {
336             _endp.cancelTimeout(task);
337         }
338 
339         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
340         {
341             return _endp.flush(header,buffer,trailer);
342         }
343 
344         public String getLocalAddr()
345         {
346             return _endp.getLocalAddr();
347         }
348 
349         public String getLocalHost()
350         {
351             return _endp.getLocalHost();
352         }
353 
354         public int getLocalPort()
355         {
356             return _endp.getLocalPort();
357         }
358 
359         public String getRemoteAddr()
360         {
361             return _endp.getRemoteAddr();
362         }
363 
364         public String getRemoteHost()
365         {
366             return _endp.getRemoteHost();
367         }
368 
369         public int getRemotePort()
370         {
371             return _endp.getRemotePort();
372         }
373 
374         public boolean isBlocking()
375         {
376             return _endp.isBlocking();
377         }
378 
379         public boolean blockReadable(long millisecs) throws IOException
380         {
381             return _endp.blockReadable(millisecs);
382         }
383 
384         public boolean blockWritable(long millisecs) throws IOException
385         {
386             return _endp.blockWritable(millisecs);
387         }
388 
389         public boolean isOpen()
390         {
391             return _endp.isOpen();
392         }
393 
394         public Object getTransport()
395         {
396             return _endp.getTransport();
397         }
398 
399         public void flush() throws IOException
400         {
401             _endp.flush();
402         }
403 
404         public int getMaxIdleTime()
405         {
406             return _endp.getMaxIdleTime();
407         }
408 
409         public void setMaxIdleTime(int timeMs) throws IOException
410         {
411             _endp.setMaxIdleTime(timeMs);
412         }
413 
414         public void onIdleExpired(long idleForMs)
415         {
416             _endp.onIdleExpired(idleForMs);
417         }
418 
419         public void setCheckForIdle(boolean check)
420         {
421             _endp.setCheckForIdle(check);
422         }
423 
424         public boolean isCheckForIdle()
425         {
426             return _endp.isCheckForIdle();
427         }
428 
429         public String toString()
430         {
431             return "Upgradable:"+_endp.toString();
432         }
433     }
434 }