1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.net.SocketTimeoutException;
18 import java.net.UnknownHostException;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21 import java.nio.channels.UnresolvedAddressException;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24
25 import javax.net.ssl.SSLContext;
26 import javax.net.ssl.SSLEngine;
27 import javax.net.ssl.SSLSession;
28
29 import org.eclipse.jetty.http.HttpGenerator;
30 import org.eclipse.jetty.http.HttpParser;
31 import org.eclipse.jetty.http.ssl.SslContextFactory;
32 import org.eclipse.jetty.io.Buffer;
33 import org.eclipse.jetty.io.Buffers;
34 import org.eclipse.jetty.io.Buffers.Type;
35 import org.eclipse.jetty.io.BuffersFactory;
36 import org.eclipse.jetty.io.ConnectedEndPoint;
37 import org.eclipse.jetty.io.Connection;
38 import org.eclipse.jetty.io.EndPoint;
39 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
40 import org.eclipse.jetty.io.nio.SelectorManager;
41 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
42 import org.eclipse.jetty.util.component.AbstractLifeCycle;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45 import org.eclipse.jetty.util.thread.Timeout;
46
47 class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
48 {
49 private static final Logger LOG = Log.getLogger(SelectConnector.class);
50
51 private final HttpClient _httpClient;
52 private final Manager _selectorManager=new Manager();
53 private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
54 private Buffers _sslBuffers;
55
56
57
58
59 SelectConnector(HttpClient httpClient)
60 {
61 _httpClient = httpClient;
62 }
63
64
65 @Override
66 protected void doStart() throws Exception
67 {
68 super.doStart();
69
70
71 final boolean direct=_httpClient.getUseDirectBuffers();
72
73 SSLEngine sslEngine=_selectorManager.newSslEngine(null);
74 final SSLSession ssl_session=sslEngine.getSession();
75 _sslBuffers = BuffersFactory.newBuffers(
76 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
77 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
78 direct?Type.DIRECT:Type.INDIRECT,1024);
79
80 _selectorManager.start();
81 }
82
83
84 @Override
85 protected void doStop() throws Exception
86 {
87 _selectorManager.stop();
88 }
89
90
91 public void startConnection( HttpDestination destination )
92 throws IOException
93 {
94 SocketChannel channel = null;
95 try
96 {
97 channel = SocketChannel.open();
98 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
99 channel.socket().setTcpNoDelay(true);
100
101 if (_httpClient.isConnectBlocking())
102 {
103 channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
104 channel.configureBlocking(false);
105 _selectorManager.register( channel, destination );
106 }
107 else
108 {
109 channel.configureBlocking(false);
110 channel.connect(address.toSocketAddress());
111 _selectorManager.register(channel,destination);
112 ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
113 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
114 _connectingChannels.put(channel,connectTimeout);
115 }
116 }
117 catch (UnresolvedAddressException ex)
118 {
119 if (channel != null)
120 channel.close();
121 destination.onConnectionFailed(ex);
122 }
123 catch(IOException ex)
124 {
125 if (channel != null)
126 channel.close();
127 destination.onConnectionFailed(ex);
128 }
129 }
130
131
132 class Manager extends SelectorManager
133 {
134 @Override
135 public boolean dispatch(Runnable task)
136 {
137 return _httpClient._threadPool.dispatch(task);
138 }
139
140 @Override
141 protected void endPointOpened(SelectChannelEndPoint endpoint)
142 {
143 }
144
145 @Override
146 protected void endPointClosed(SelectChannelEndPoint endpoint)
147 {
148 }
149
150 @Override
151 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
152 {
153 }
154
155 @Override
156 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
157 {
158 if (endpoint instanceof SslSelectChannelEndPoint)
159 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
160
161 return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
162 }
163
164 @Override
165 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
166 {
167
168 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
169 if (connectTimeout != null)
170 connectTimeout.cancel();
171 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
172
173
174 HttpDestination dest=(HttpDestination)key.attachment();
175
176 SelectChannelEndPoint ep=null;
177
178 if (dest.isSecure())
179 {
180 if (dest.isProxied())
181 {
182 SSLEngine engine=newSslEngine(channel);
183 ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout());
184 }
185 else
186 {
187 SSLEngine engine=newSslEngine(channel);
188 SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout());
189 sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate());
190 ep = sslEp;
191 }
192 }
193 else
194 {
195 ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
196 }
197
198 HttpConnection connection=(HttpConnection)ep.getConnection();
199 connection.setDestination(dest);
200 dest.onNewConnection(connection);
201 return ep;
202 }
203
204 private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
205 {
206 SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
207 SSLEngine sslEngine;
208 if (channel != null)
209 {
210 String peerHost = channel.socket().getInetAddress().getHostAddress();
211 int peerPort = channel.socket().getPort();
212 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
213 }
214 else
215 {
216 sslEngine = sslContextFactory.newSslEngine();
217 }
218 sslEngine.setUseClientMode(true);
219 sslEngine.beginHandshake();
220
221 return sslEngine;
222 }
223
224
225
226
227
228 @Override
229 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
230 {
231 if (attachment instanceof HttpDestination)
232 ((HttpDestination)attachment).onConnectionFailed(ex);
233 else
234 super.connectionFailed(channel,ex,attachment);
235 }
236 }
237
238 private class ConnectTimeout extends Timeout.Task
239 {
240 private final SocketChannel channel;
241 private final HttpDestination destination;
242
243 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
244 {
245 this.channel = channel;
246 this.destination = destination;
247 }
248
249 @Override
250 public void expired()
251 {
252 if (channel.isConnectionPending())
253 {
254 LOG.debug("Channel {} timed out while connecting, closing it", channel);
255 try
256 {
257
258 channel.close();
259 }
260 catch (IOException x)
261 {
262 LOG.ignore(x);
263 }
264 destination.onConnectionFailed(new SocketTimeoutException());
265 }
266 }
267 }
268
269
270
271
272
273
274
275 public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
276 {
277 private final SelectChannelEndPoint plainEndPoint;
278 private volatile boolean upgraded = false;
279
280 public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
281 {
282 super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
283 this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
284 }
285
286 public void upgrade()
287 {
288 upgraded = true;
289 }
290
291 public void shutdownOutput() throws IOException
292 {
293 if (upgraded)
294 super.shutdownOutput();
295 else
296 plainEndPoint.shutdownOutput();
297 }
298
299 public void close() throws IOException
300 {
301 if (upgraded)
302 super.close();
303 else
304 plainEndPoint.close();
305 }
306
307 public int fill(Buffer buffer) throws IOException
308 {
309 if (upgraded)
310 return super.fill(buffer);
311 else
312 return plainEndPoint.fill(buffer);
313 }
314
315 public int flush(Buffer buffer) throws IOException
316 {
317 if (upgraded)
318 return super.flush(buffer);
319 else
320 return plainEndPoint.flush(buffer);
321 }
322
323 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
324 {
325 if (upgraded)
326 return super.flush(header, buffer, trailer);
327 else
328 return plainEndPoint.flush(header, buffer, trailer);
329 }
330
331 public String getLocalAddr()
332 {
333 if (upgraded)
334 return super.getLocalAddr();
335 else
336 return plainEndPoint.getLocalAddr();
337 }
338
339 public String getLocalHost()
340 {
341 if (upgraded)
342 return super.getLocalHost();
343 else
344 return plainEndPoint.getLocalHost();
345 }
346
347 public int getLocalPort()
348 {
349 if (upgraded)
350 return super.getLocalPort();
351 else
352 return plainEndPoint.getLocalPort();
353 }
354
355 public String getRemoteAddr()
356 {
357 if (upgraded)
358 return super.getRemoteAddr();
359 else
360 return plainEndPoint.getRemoteAddr();
361 }
362
363 public String getRemoteHost()
364 {
365 if (upgraded)
366 return super.getRemoteHost();
367 else
368 return plainEndPoint.getRemoteHost();
369 }
370
371 public int getRemotePort()
372 {
373 if (upgraded)
374 return super.getRemotePort();
375 else
376 return plainEndPoint.getRemotePort();
377 }
378
379 public boolean isBlocking()
380 {
381 if (upgraded)
382 return super.isBlocking();
383 else
384 return plainEndPoint.isBlocking();
385 }
386
387 public boolean isBufferred()
388 {
389 if (upgraded)
390 return super.isBufferred();
391 else
392 return plainEndPoint.isBufferred();
393 }
394
395 public boolean blockReadable(long millisecs) throws IOException
396 {
397 if (upgraded)
398 return super.blockReadable(millisecs);
399 else
400 return plainEndPoint.blockReadable(millisecs);
401 }
402
403 public boolean blockWritable(long millisecs) throws IOException
404 {
405 if (upgraded)
406 return super.blockWritable(millisecs);
407 else
408 return plainEndPoint.blockWritable(millisecs);
409 }
410
411 public boolean isOpen()
412 {
413 if (upgraded)
414 return super.isOpen();
415 else
416 return plainEndPoint.isOpen();
417 }
418
419 public Object getTransport()
420 {
421 if (upgraded)
422 return super.getTransport();
423 else
424 return plainEndPoint.getTransport();
425 }
426
427 public boolean isBufferingInput()
428 {
429 if (upgraded)
430 return super.isBufferingInput();
431 else
432 return plainEndPoint.isBufferingInput();
433 }
434
435 public boolean isBufferingOutput()
436 {
437 if (upgraded)
438 return super.isBufferingOutput();
439 else
440 return plainEndPoint.isBufferingOutput();
441 }
442
443 public void flush() throws IOException
444 {
445 if (upgraded)
446 super.flush();
447 else
448 plainEndPoint.flush();
449
450 }
451
452 public int getMaxIdleTime()
453 {
454 if (upgraded)
455 return super.getMaxIdleTime();
456 else
457 return plainEndPoint.getMaxIdleTime();
458 }
459
460 public void setMaxIdleTime(int timeMs) throws IOException
461 {
462 if (upgraded)
463 super.setMaxIdleTime(timeMs);
464 else
465 plainEndPoint.setMaxIdleTime(timeMs);
466 }
467 }
468 }