View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.net.SocketTimeoutException;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.SocketChannel;
25  import java.nio.channels.UnresolvedAddressException;
26  import java.util.Map;
27  import java.util.concurrent.ConcurrentHashMap;
28  import javax.net.ssl.SSLEngine;
29  
30  import org.eclipse.jetty.io.AsyncEndPoint;
31  import org.eclipse.jetty.io.Buffer;
32  import org.eclipse.jetty.io.ConnectedEndPoint;
33  import org.eclipse.jetty.io.Connection;
34  import org.eclipse.jetty.io.nio.AsyncConnection;
35  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
36  import org.eclipse.jetty.io.nio.SelectorManager;
37  import org.eclipse.jetty.io.nio.SslConnection;
38  import org.eclipse.jetty.util.component.AggregateLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.ssl.SslContextFactory;
43  import org.eclipse.jetty.util.thread.Timeout;
44  import org.eclipse.jetty.util.thread.Timeout.Task;
45  
46  class SelectConnector extends AggregateLifeCycle implements HttpClient.Connector, Dumpable
47  {
48      private static final Logger LOG = Log.getLogger(SelectConnector.class);
49  
50      private final HttpClient _httpClient;
51      private final Manager _selectorManager=new Manager();
52      private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
53  
54      /* ------------------------------------------------------------ */
55      /**
56       * @param httpClient the HttpClient this connector is associated to. It is 
57       * added via the {@link #addBean(Object, boolean)} as an unmanaged bean.
58       */
59      SelectConnector(HttpClient httpClient)
60      {
61          _httpClient = httpClient;
62          addBean(_httpClient,false);
63          addBean(_selectorManager,true);
64      }
65  
66      /* ------------------------------------------------------------ */
67      public void startConnection( HttpDestination destination )
68          throws IOException
69      {
70          SocketChannel channel = null;
71          try
72          {
73              channel = SocketChannel.open();
74              Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
75              channel.socket().setTcpNoDelay(true);
76  
77              if (_httpClient.isConnectBlocking())
78              {
79                  channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
80                  channel.configureBlocking(false);
81                  _selectorManager.register( channel, destination );
82              }
83              else
84              {
85                  channel.configureBlocking(false);
86                  channel.connect(address.toSocketAddress());
87                  _selectorManager.register(channel,destination);
88                  ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
89                  _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
90                  _connectingChannels.put(channel,connectTimeout);
91              }
92          }
93          catch (UnresolvedAddressException ex)
94          {
95              if (channel != null)
96                  channel.close();
97              destination.onConnectionFailed(ex);
98          }
99          catch(IOException ex)
100         {
101             if (channel != null)
102                 channel.close();
103             destination.onConnectionFailed(ex);
104         }
105     }
106 
107     /* ------------------------------------------------------------ */
108     class Manager extends SelectorManager
109     {
110         Logger LOG = SelectConnector.LOG;
111 
112         @Override
113         public boolean dispatch(Runnable task)
114         {
115             return _httpClient._threadPool.dispatch(task);
116         }
117 
118         @Override
119         protected void endPointOpened(SelectChannelEndPoint endpoint)
120         {
121         }
122 
123         @Override
124         protected void endPointClosed(SelectChannelEndPoint endpoint)
125         {
126         }
127 
128         @Override
129         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
130         {
131         }
132 
133         @Override
134         public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
135         {
136             return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
137         }
138 
139         @Override
140         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
141         {
142             // We're connected, cancel the connect timeout
143             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
144             if (connectTimeout != null)
145                 connectTimeout.cancel();
146             if (LOG.isDebugEnabled())
147                 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
148 
149             // key should have destination at this point (will be replaced by endpoint after this call)
150             HttpDestination dest=(HttpDestination)key.attachment();
151 
152             SelectChannelEndPoint scep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
153             AsyncEndPoint ep = scep;
154 
155             if (dest.isSecure())
156             {
157                 LOG.debug("secure to {}, proxied={}",channel,dest.isProxied());
158                 ep = new UpgradableEndPoint(ep,newSslEngine(channel));
159             }
160 
161             AsyncConnection connection = selectSet.getManager().newConnection(channel,ep, key.attachment());
162             ep.setConnection(connection);
163 
164             AbstractHttpConnection httpConnection=(AbstractHttpConnection)connection;
165             httpConnection.setDestination(dest);
166 
167             if (dest.isSecure() && !dest.isProxied())
168                 ((UpgradableEndPoint)ep).upgrade();
169 
170             dest.onNewConnection(httpConnection);
171 
172             return scep;
173         }
174 
175         private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
176         {
177             SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
178             SSLEngine sslEngine;
179             if (channel != null)
180             {
181                 String peerHost = channel.socket().getInetAddress().getHostAddress();
182                 int peerPort = channel.socket().getPort();
183                 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
184             }
185             else
186             {
187                 sslEngine = sslContextFactory.newSslEngine();
188             }
189             sslEngine.setUseClientMode(true);
190             sslEngine.beginHandshake();
191 
192             return sslEngine;
193         }
194 
195         /* ------------------------------------------------------------ */
196         /* (non-Javadoc)
197          * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
198          */
199         @Override
200         protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
201         {
202             Timeout.Task connectTimeout = _connectingChannels.remove(channel);
203             if (connectTimeout != null)
204                 connectTimeout.cancel();
205 
206             if (attachment instanceof HttpDestination)
207                 ((HttpDestination)attachment).onConnectionFailed(ex);
208             else
209                 super.connectionFailed(channel,ex,attachment);
210         }
211     }
212 
213     private class ConnectTimeout extends Timeout.Task
214     {
215         private final SocketChannel channel;
216         private final HttpDestination destination;
217 
218         public ConnectTimeout(SocketChannel channel, HttpDestination destination)
219         {
220             this.channel = channel;
221             this.destination = destination;
222         }
223 
224         @Override
225         public void expired()
226         {
227             if (channel.isConnectionPending())
228             {
229                 LOG.debug("Channel {} timed out while connecting, closing it", channel);
230                 try
231                 {
232                     // This will unregister the channel from the selector
233                     channel.close();
234                 }
235                 catch (IOException x)
236                 {
237                     LOG.ignore(x);
238                 }
239                 destination.onConnectionFailed(new SocketTimeoutException());
240             }
241         }
242     }
243 
244     public static class UpgradableEndPoint implements AsyncEndPoint
245     {
246         AsyncEndPoint _endp;
247         SSLEngine _engine;
248 
249         public UpgradableEndPoint(AsyncEndPoint endp, SSLEngine engine) throws IOException
250         {
251             _engine=engine;
252             _endp=endp;
253         }
254 
255         public void upgrade()
256         {
257             AsyncHttpConnection connection = (AsyncHttpConnection)_endp.getConnection();
258 
259             SslConnection sslConnection = new SslConnection(_engine,_endp);
260             _endp.setConnection(sslConnection);
261 
262             _endp=sslConnection.getSslEndPoint();
263             sslConnection.getSslEndPoint().setConnection(connection);
264 
265             LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection);
266         }
267 
268 
269         public Connection getConnection()
270         {
271             return _endp.getConnection();
272         }
273 
274         public void setConnection(Connection connection)
275         {
276             _endp.setConnection(connection);
277         }
278 
279         public void shutdownOutput() throws IOException
280         {
281             _endp.shutdownOutput();
282         }
283 
284         public void dispatch()
285         {
286             _endp.asyncDispatch();
287         }
288 
289         public void asyncDispatch()
290         {
291             _endp.asyncDispatch();
292         }
293 
294         public boolean isOutputShutdown()
295         {
296             return _endp.isOutputShutdown();
297         }
298 
299         public void shutdownInput() throws IOException
300         {
301             _endp.shutdownInput();
302         }
303 
304         public void scheduleWrite()
305         {
306             _endp.scheduleWrite();
307         }
308 
309         public boolean isInputShutdown()
310         {
311             return _endp.isInputShutdown();
312         }
313 
314         public void close() throws IOException
315         {
316             _endp.close();
317         }
318 
319         public int fill(Buffer buffer) throws IOException
320         {
321             return _endp.fill(buffer);
322         }
323 
324         public boolean isWritable()
325         {
326             return _endp.isWritable();
327         }
328 
329         public boolean hasProgressed()
330         {
331             return _endp.hasProgressed();
332         }
333 
334         public int flush(Buffer buffer) throws IOException
335         {
336             return _endp.flush(buffer);
337         }
338 
339         public void scheduleTimeout(Task task, long timeoutMs)
340         {
341             _endp.scheduleTimeout(task,timeoutMs);
342         }
343 
344         public void cancelTimeout(Task task)
345         {
346             _endp.cancelTimeout(task);
347         }
348 
349         public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
350         {
351             return _endp.flush(header,buffer,trailer);
352         }
353 
354         public String getLocalAddr()
355         {
356             return _endp.getLocalAddr();
357         }
358 
359         public String getLocalHost()
360         {
361             return _endp.getLocalHost();
362         }
363 
364         public int getLocalPort()
365         {
366             return _endp.getLocalPort();
367         }
368 
369         public String getRemoteAddr()
370         {
371             return _endp.getRemoteAddr();
372         }
373 
374         public String getRemoteHost()
375         {
376             return _endp.getRemoteHost();
377         }
378 
379         public int getRemotePort()
380         {
381             return _endp.getRemotePort();
382         }
383 
384         public boolean isBlocking()
385         {
386             return _endp.isBlocking();
387         }
388 
389         public boolean blockReadable(long millisecs) throws IOException
390         {
391             return _endp.blockReadable(millisecs);
392         }
393 
394         public boolean blockWritable(long millisecs) throws IOException
395         {
396             return _endp.blockWritable(millisecs);
397         }
398 
399         public boolean isOpen()
400         {
401             return _endp.isOpen();
402         }
403 
404         public Object getTransport()
405         {
406             return _endp.getTransport();
407         }
408 
409         public void flush() throws IOException
410         {
411             _endp.flush();
412         }
413 
414         public int getMaxIdleTime()
415         {
416             return _endp.getMaxIdleTime();
417         }
418 
419         public void setMaxIdleTime(int timeMs) throws IOException
420         {
421             _endp.setMaxIdleTime(timeMs);
422         }
423 
424         public void onIdleExpired(long idleForMs)
425         {
426             _endp.onIdleExpired(idleForMs);
427         }
428 
429         public void setCheckForIdle(boolean check)
430         {
431             _endp.setCheckForIdle(check);
432         }
433 
434         public boolean isCheckForIdle()
435         {
436             return _endp.isCheckForIdle();
437         }
438 
439         public String toString()
440         {
441             return "Upgradable:"+_endp.toString();
442         }
443     }
444 }