1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.net.SocketTimeoutException;
18 import java.nio.channels.SelectionKey;
19 import java.nio.channels.SocketChannel;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import javax.net.ssl.SSLContext;
23 import javax.net.ssl.SSLEngine;
24 import javax.net.ssl.SSLSession;
25
26 import org.eclipse.jetty.http.HttpGenerator;
27 import org.eclipse.jetty.http.HttpParser;
28 import org.eclipse.jetty.http.ssl.SslContextFactory;
29 import org.eclipse.jetty.io.Buffer;
30 import org.eclipse.jetty.io.Buffers;
31 import org.eclipse.jetty.io.Buffers.Type;
32 import org.eclipse.jetty.io.BuffersFactory;
33 import org.eclipse.jetty.io.ConnectedEndPoint;
34 import org.eclipse.jetty.io.Connection;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
37 import org.eclipse.jetty.io.nio.SelectorManager;
38 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Timeout;
43
44 class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
45 {
46 private static final Logger LOG = Log.getLogger(SelectConnector.class);
47
48 private final HttpClient _httpClient;
49 private final Manager _selectorManager=new Manager();
50 private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
51 private SSLContext _sslContext;
52 private Buffers _sslBuffers;
53
54
55
56
57 SelectConnector(HttpClient httpClient)
58 {
59 _httpClient = httpClient;
60 }
61
62
63 @Override
64 protected void doStart() throws Exception
65 {
66 super.doStart();
67
68
69 final boolean direct=_httpClient.getUseDirectBuffers();
70
71 SSLEngine sslEngine=_selectorManager.newSslEngine(null);
72 final SSLSession ssl_session=sslEngine.getSession();
73 _sslBuffers = BuffersFactory.newBuffers(
74 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
75 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
76 direct?Type.DIRECT:Type.INDIRECT,1024);
77
78 _selectorManager.start();
79 }
80
81
82 @Override
83 protected void doStop() throws Exception
84 {
85 _selectorManager.stop();
86 }
87
88
89 public void startConnection( HttpDestination destination )
90 throws IOException
91 {
92 try
93 {
94 SocketChannel channel = SocketChannel.open();
95 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
96 channel.socket().setTcpNoDelay(true);
97
98 if (_httpClient.isConnectBlocking())
99 {
100 channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
101 channel.configureBlocking(false);
102 _selectorManager.register( channel, destination );
103 }
104 else
105 {
106 channel.configureBlocking( false );
107 channel.connect(address.toSocketAddress());
108 _selectorManager.register( channel, destination );
109 ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination);
110 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
111 _connectingChannels.put(channel, connectTimeout);
112 }
113
114 }
115 catch(IOException ex)
116 {
117 destination.onConnectionFailed(ex);
118 }
119 }
120
121
122 class Manager extends SelectorManager
123 {
124 @Override
125 public boolean dispatch(Runnable task)
126 {
127 return _httpClient._threadPool.dispatch(task);
128 }
129
130 @Override
131 protected void endPointOpened(SelectChannelEndPoint endpoint)
132 {
133 }
134
135 @Override
136 protected void endPointClosed(SelectChannelEndPoint endpoint)
137 {
138 }
139
140 @Override
141 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
142 {
143 }
144
145 @Override
146 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
147 {
148 if (endpoint instanceof SslSelectChannelEndPoint)
149 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
150
151 return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
152 }
153
154 @Override
155 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
156 {
157
158 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
159 if (connectTimeout != null)
160 connectTimeout.cancel();
161 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
162
163
164 HttpDestination dest=(HttpDestination)key.attachment();
165
166 SelectChannelEndPoint ep=null;
167
168 if (dest.isSecure())
169 {
170 if (dest.isProxied())
171 {
172 SSLEngine engine=newSslEngine(channel);
173 ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout());
174 }
175 else
176 {
177 SSLEngine engine=newSslEngine(channel);
178 SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout());
179 sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate());
180 ep = sslEp;
181 }
182 }
183 else
184 {
185 ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
186 }
187
188 HttpConnection connection=(HttpConnection)ep.getConnection();
189 connection.setDestination(dest);
190 dest.onNewConnection(connection);
191 return ep;
192 }
193
194 private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
195 {
196 SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
197 if (_sslContext == null)
198 _sslContext = sslContextFactory.getSslContext();
199
200 SSLEngine sslEngine;
201 if (channel != null && sslContextFactory.isSessionCachingEnabled())
202 {
203 String peerHost = channel.socket().getInetAddress().getHostAddress();
204 int peerPort = channel.socket().getPort();
205 sslEngine = _sslContext.createSSLEngine(peerHost, peerPort);
206 }
207 else
208 {
209 sslEngine = _sslContext.createSSLEngine();
210 }
211 sslEngine.setUseClientMode(true);
212 sslEngine.beginHandshake();
213
214 return sslEngine;
215 }
216
217
218
219
220
221 @Override
222 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
223 {
224 if (attachment instanceof HttpDestination)
225 ((HttpDestination)attachment).onConnectionFailed(ex);
226 else
227 super.connectionFailed(channel,ex,attachment);
228 }
229 }
230
231 private class ConnectTimeout extends Timeout.Task
232 {
233 private final SocketChannel channel;
234 private final HttpDestination destination;
235
236 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
237 {
238 this.channel = channel;
239 this.destination = destination;
240 }
241
242 @Override
243 public void expired()
244 {
245 if (channel.isConnectionPending())
246 {
247 LOG.debug("Channel {} timed out while connecting, closing it", channel);
248 try
249 {
250
251 channel.close();
252 }
253 catch (IOException x)
254 {
255 LOG.ignore(x);
256 }
257 destination.onConnectionFailed(new SocketTimeoutException());
258 }
259 }
260 }
261
262
263
264
265
266
267
268 public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
269 {
270 private final SelectChannelEndPoint plainEndPoint;
271 private volatile boolean upgraded = false;
272
273 public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
274 {
275 super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
276 this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
277 }
278
279 public void upgrade()
280 {
281 upgraded = true;
282 }
283
284 public void shutdownOutput() throws IOException
285 {
286 if (upgraded)
287 super.shutdownOutput();
288 else
289 plainEndPoint.shutdownOutput();
290 }
291
292 public void close() throws IOException
293 {
294 if (upgraded)
295 super.close();
296 else
297 plainEndPoint.close();
298 }
299
300 public int fill(Buffer buffer) throws IOException
301 {
302 if (upgraded)
303 return super.fill(buffer);
304 else
305 return plainEndPoint.fill(buffer);
306 }
307
308 public int flush(Buffer buffer) throws IOException
309 {
310 if (upgraded)
311 return super.flush(buffer);
312 else
313 return plainEndPoint.flush(buffer);
314 }
315
316 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
317 {
318 if (upgraded)
319 return super.flush(header, buffer, trailer);
320 else
321 return plainEndPoint.flush(header, buffer, trailer);
322 }
323
324 public String getLocalAddr()
325 {
326 if (upgraded)
327 return super.getLocalAddr();
328 else
329 return plainEndPoint.getLocalAddr();
330 }
331
332 public String getLocalHost()
333 {
334 if (upgraded)
335 return super.getLocalHost();
336 else
337 return plainEndPoint.getLocalHost();
338 }
339
340 public int getLocalPort()
341 {
342 if (upgraded)
343 return super.getLocalPort();
344 else
345 return plainEndPoint.getLocalPort();
346 }
347
348 public String getRemoteAddr()
349 {
350 if (upgraded)
351 return super.getRemoteAddr();
352 else
353 return plainEndPoint.getRemoteAddr();
354 }
355
356 public String getRemoteHost()
357 {
358 if (upgraded)
359 return super.getRemoteHost();
360 else
361 return plainEndPoint.getRemoteHost();
362 }
363
364 public int getRemotePort()
365 {
366 if (upgraded)
367 return super.getRemotePort();
368 else
369 return plainEndPoint.getRemotePort();
370 }
371
372 public boolean isBlocking()
373 {
374 if (upgraded)
375 return super.isBlocking();
376 else
377 return plainEndPoint.isBlocking();
378 }
379
380 public boolean isBufferred()
381 {
382 if (upgraded)
383 return super.isBufferred();
384 else
385 return plainEndPoint.isBufferred();
386 }
387
388 public boolean blockReadable(long millisecs) throws IOException
389 {
390 if (upgraded)
391 return super.blockReadable(millisecs);
392 else
393 return plainEndPoint.blockReadable(millisecs);
394 }
395
396 public boolean blockWritable(long millisecs) throws IOException
397 {
398 if (upgraded)
399 return super.blockWritable(millisecs);
400 else
401 return plainEndPoint.blockWritable(millisecs);
402 }
403
404 public boolean isOpen()
405 {
406 if (upgraded)
407 return super.isOpen();
408 else
409 return plainEndPoint.isOpen();
410 }
411
412 public Object getTransport()
413 {
414 if (upgraded)
415 return super.getTransport();
416 else
417 return plainEndPoint.getTransport();
418 }
419
420 public boolean isBufferingInput()
421 {
422 if (upgraded)
423 return super.isBufferingInput();
424 else
425 return plainEndPoint.isBufferingInput();
426 }
427
428 public boolean isBufferingOutput()
429 {
430 if (upgraded)
431 return super.isBufferingOutput();
432 else
433 return plainEndPoint.isBufferingOutput();
434 }
435
436 public void flush() throws IOException
437 {
438 if (upgraded)
439 super.flush();
440 else
441 plainEndPoint.flush();
442
443 }
444
445 public int getMaxIdleTime()
446 {
447 if (upgraded)
448 return super.getMaxIdleTime();
449 else
450 return plainEndPoint.getMaxIdleTime();
451 }
452
453 public void setMaxIdleTime(int timeMs) throws IOException
454 {
455 if (upgraded)
456 super.setMaxIdleTime(timeMs);
457 else
458 plainEndPoint.setMaxIdleTime(timeMs);
459 }
460 }
461 }