1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.net.SocketTimeoutException;
18 import java.nio.channels.SelectionKey;
19 import java.nio.channels.SocketChannel;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import javax.net.ssl.SSLContext;
23 import javax.net.ssl.SSLEngine;
24 import javax.net.ssl.SSLSession;
25
26 import org.eclipse.jetty.http.HttpGenerator;
27 import org.eclipse.jetty.http.HttpParser;
28 import org.eclipse.jetty.http.ssl.SslContextFactory;
29 import org.eclipse.jetty.io.Buffer;
30 import org.eclipse.jetty.io.Buffers;
31 import org.eclipse.jetty.io.Buffers.Type;
32 import org.eclipse.jetty.io.BuffersFactory;
33 import org.eclipse.jetty.io.ConnectedEndPoint;
34 import org.eclipse.jetty.io.Connection;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
37 import org.eclipse.jetty.io.nio.SelectorManager;
38 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.thread.Timeout;
42
43 class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
44 {
45 private final HttpClient _httpClient;
46 private final Manager _selectorManager=new Manager();
47 private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
48 private SSLContext _sslContext;
49 private Buffers _sslBuffers;
50
51
52
53
54 SelectConnector(HttpClient httpClient)
55 {
56 _httpClient = httpClient;
57 }
58
59
60 @Override
61 protected void doStart() throws Exception
62 {
63 super.doStart();
64
65 _selectorManager.start();
66
67 final boolean direct=_httpClient.getUseDirectBuffers();
68
69 SSLEngine sslEngine=_selectorManager.newSslEngine(null);
70 final SSLSession ssl_session=sslEngine.getSession();
71 _sslBuffers = BuffersFactory.newBuffers(
72 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
73 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
74 direct?Type.DIRECT:Type.INDIRECT,1024);
75
76 _httpClient._threadPool.dispatch(this);
77 }
78
79
80 @Override
81 protected void doStop() throws Exception
82 {
83 _selectorManager.stop();
84 }
85
86
87 public void startConnection( HttpDestination destination )
88 throws IOException
89 {
90 try
91 {
92 SocketChannel channel = SocketChannel.open();
93 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
94 channel.socket().setTcpNoDelay(true);
95
96 if (_httpClient.isConnectBlocking())
97 {
98 channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
99 channel.configureBlocking(false);
100 _selectorManager.register( channel, destination );
101 }
102 else
103 {
104 channel.configureBlocking( false );
105 channel.connect(address.toSocketAddress());
106 _selectorManager.register( channel, destination );
107 ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination);
108 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
109 _connectingChannels.put(channel, connectTimeout);
110 }
111
112 }
113 catch(IOException ex)
114 {
115 destination.onConnectionFailed(ex);
116 }
117
118 }
119
120
121 public void run()
122 {
123 while (_httpClient.isRunning())
124 {
125 try
126 {
127 _selectorManager.doSelect(0);
128 }
129 catch (Exception e)
130 {
131 Log.warn(e.toString());
132 Log.debug(e);
133 Thread.yield();
134 }
135 }
136 }
137
138
139 class Manager extends SelectorManager
140 {
141 @Override
142 public boolean dispatch(Runnable task)
143 {
144 return _httpClient._threadPool.dispatch(task);
145 }
146
147 @Override
148 protected void endPointOpened(SelectChannelEndPoint endpoint)
149 {
150 }
151
152 @Override
153 protected void endPointClosed(SelectChannelEndPoint endpoint)
154 {
155 }
156
157 @Override
158 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
159 {
160 }
161
162 @Override
163 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
164 {
165 if (endpoint instanceof SslSelectChannelEndPoint)
166 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
167
168 return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
169 }
170
171 @Override
172 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
173 {
174
175 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
176 if (connectTimeout != null)
177 connectTimeout.cancel();
178 Log.debug("Channels with connection pending: {}", _connectingChannels.size());
179
180
181 HttpDestination dest=(HttpDestination)key.attachment();
182
183 SelectChannelEndPoint ep=null;
184
185 if (dest.isSecure())
186 {
187 if (dest.isProxied())
188 {
189 SSLEngine engine=newSslEngine(channel);
190 ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout());
191 }
192 else
193 {
194 SSLEngine engine=newSslEngine(channel);
195 SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout());
196 sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate());
197 ep = sslEp;
198 }
199 }
200 else
201 {
202 ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
203 }
204
205 HttpConnection connection=(HttpConnection)ep.getConnection();
206 connection.setDestination(dest);
207 dest.onNewConnection(connection);
208 return ep;
209 }
210
211 private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
212 {
213 SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
214 if (_sslContext == null)
215 _sslContext = sslContextFactory.getSslContext();
216
217 SSLEngine sslEngine;
218 if (channel != null && sslContextFactory.isSessionCachingEnabled())
219 {
220 String peerHost = channel.socket().getInetAddress().getHostAddress();
221 int peerPort = channel.socket().getPort();
222 sslEngine = _sslContext.createSSLEngine(peerHost, peerPort);
223 }
224 else
225 {
226 sslEngine = _sslContext.createSSLEngine();
227 }
228 sslEngine.setUseClientMode(true);
229 sslEngine.beginHandshake();
230
231 return sslEngine;
232 }
233
234
235
236
237
238 @Override
239 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
240 {
241 if (attachment instanceof HttpDestination)
242 ((HttpDestination)attachment).onConnectionFailed(ex);
243 else
244 super.connectionFailed(channel,ex,attachment);
245 }
246 }
247
248 private class ConnectTimeout extends Timeout.Task
249 {
250 private final SocketChannel channel;
251 private final HttpDestination destination;
252
253 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
254 {
255 this.channel = channel;
256 this.destination = destination;
257 }
258
259 @Override
260 public void expired()
261 {
262 if (channel.isConnectionPending())
263 {
264 Log.debug("Channel {} timed out while connecting, closing it", channel);
265 try
266 {
267
268 channel.close();
269 }
270 catch (IOException x)
271 {
272 Log.ignore(x);
273 }
274 destination.onConnectionFailed(new SocketTimeoutException());
275 }
276 }
277 }
278
279
280
281
282
283
284
285 public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
286 {
287 private final SelectChannelEndPoint plainEndPoint;
288 private volatile boolean upgraded = false;
289
290 public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
291 {
292 super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
293 this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
294 }
295
296 public void upgrade()
297 {
298 upgraded = true;
299 }
300
301 public void shutdownOutput() throws IOException
302 {
303 if (upgraded)
304 super.shutdownOutput();
305 else
306 plainEndPoint.shutdownOutput();
307 }
308
309 public void close() throws IOException
310 {
311 if (upgraded)
312 super.close();
313 else
314 plainEndPoint.close();
315 }
316
317 public int fill(Buffer buffer) throws IOException
318 {
319 if (upgraded)
320 return super.fill(buffer);
321 else
322 return plainEndPoint.fill(buffer);
323 }
324
325 public int flush(Buffer buffer) throws IOException
326 {
327 if (upgraded)
328 return super.flush(buffer);
329 else
330 return plainEndPoint.flush(buffer);
331 }
332
333 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
334 {
335 if (upgraded)
336 return super.flush(header, buffer, trailer);
337 else
338 return plainEndPoint.flush(header, buffer, trailer);
339 }
340
341 public String getLocalAddr()
342 {
343 if (upgraded)
344 return super.getLocalAddr();
345 else
346 return plainEndPoint.getLocalAddr();
347 }
348
349 public String getLocalHost()
350 {
351 if (upgraded)
352 return super.getLocalHost();
353 else
354 return plainEndPoint.getLocalHost();
355 }
356
357 public int getLocalPort()
358 {
359 if (upgraded)
360 return super.getLocalPort();
361 else
362 return plainEndPoint.getLocalPort();
363 }
364
365 public String getRemoteAddr()
366 {
367 if (upgraded)
368 return super.getRemoteAddr();
369 else
370 return plainEndPoint.getRemoteAddr();
371 }
372
373 public String getRemoteHost()
374 {
375 if (upgraded)
376 return super.getRemoteHost();
377 else
378 return plainEndPoint.getRemoteHost();
379 }
380
381 public int getRemotePort()
382 {
383 if (upgraded)
384 return super.getRemotePort();
385 else
386 return plainEndPoint.getRemotePort();
387 }
388
389 public boolean isBlocking()
390 {
391 if (upgraded)
392 return super.isBlocking();
393 else
394 return plainEndPoint.isBlocking();
395 }
396
397 public boolean isBufferred()
398 {
399 if (upgraded)
400 return super.isBufferred();
401 else
402 return plainEndPoint.isBufferred();
403 }
404
405 public boolean blockReadable(long millisecs) throws IOException
406 {
407 if (upgraded)
408 return super.blockReadable(millisecs);
409 else
410 return plainEndPoint.blockReadable(millisecs);
411 }
412
413 public boolean blockWritable(long millisecs) throws IOException
414 {
415 if (upgraded)
416 return super.blockWritable(millisecs);
417 else
418 return plainEndPoint.blockWritable(millisecs);
419 }
420
421 public boolean isOpen()
422 {
423 if (upgraded)
424 return super.isOpen();
425 else
426 return plainEndPoint.isOpen();
427 }
428
429 public Object getTransport()
430 {
431 if (upgraded)
432 return super.getTransport();
433 else
434 return plainEndPoint.getTransport();
435 }
436
437 public boolean isBufferingInput()
438 {
439 if (upgraded)
440 return super.isBufferingInput();
441 else
442 return plainEndPoint.isBufferingInput();
443 }
444
445 public boolean isBufferingOutput()
446 {
447 if (upgraded)
448 return super.isBufferingOutput();
449 else
450 return plainEndPoint.isBufferingOutput();
451 }
452
453 public void flush() throws IOException
454 {
455 if (upgraded)
456 super.flush();
457 else
458 plainEndPoint.flush();
459
460 }
461
462 public int getMaxIdleTime()
463 {
464 if (upgraded)
465 return super.getMaxIdleTime();
466 else
467 return plainEndPoint.getMaxIdleTime();
468 }
469
470 public void setMaxIdleTime(int timeMs) throws IOException
471 {
472 if (upgraded)
473 super.setMaxIdleTime(timeMs);
474 else
475 plainEndPoint.setMaxIdleTime(timeMs);
476 }
477 }
478 }