1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.net.SocketTimeoutException;
18 import java.net.UnknownHostException;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21 import java.nio.channels.UnresolvedAddressException;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24
25 import javax.net.ssl.SSLContext;
26 import javax.net.ssl.SSLEngine;
27 import javax.net.ssl.SSLSession;
28
29 import org.eclipse.jetty.http.HttpGenerator;
30 import org.eclipse.jetty.http.HttpParser;
31 import org.eclipse.jetty.http.ssl.SslContextFactory;
32 import org.eclipse.jetty.io.Buffer;
33 import org.eclipse.jetty.io.Buffers;
34 import org.eclipse.jetty.io.Buffers.Type;
35 import org.eclipse.jetty.io.BuffersFactory;
36 import org.eclipse.jetty.io.ConnectedEndPoint;
37 import org.eclipse.jetty.io.Connection;
38 import org.eclipse.jetty.io.EndPoint;
39 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
40 import org.eclipse.jetty.io.nio.SelectorManager;
41 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
42 import org.eclipse.jetty.util.component.AbstractLifeCycle;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45 import org.eclipse.jetty.util.thread.Timeout;
46
47 class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
48 {
49 private static final Logger LOG = Log.getLogger(SelectConnector.class);
50
51 private final HttpClient _httpClient;
52 private final Manager _selectorManager=new Manager();
53 private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
54 private Buffers _sslBuffers;
55
56
57
58
59 SelectConnector(HttpClient httpClient)
60 {
61 _httpClient = httpClient;
62 }
63
64
65 @Override
66 protected void doStart() throws Exception
67 {
68 super.doStart();
69
70
71 final boolean direct=_httpClient.getUseDirectBuffers();
72
73 SSLEngine sslEngine=_selectorManager.newSslEngine(null);
74 final SSLSession ssl_session=sslEngine.getSession();
75 _sslBuffers = BuffersFactory.newBuffers(
76 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
77 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
78 direct?Type.DIRECT:Type.INDIRECT,1024);
79
80 _selectorManager.start();
81 }
82
83
84 @Override
85 protected void doStop() throws Exception
86 {
87 _selectorManager.stop();
88 }
89
90
91 public void startConnection( HttpDestination destination )
92 throws IOException
93 {
94 SocketChannel channel = null;
95 try
96 {
97 channel = SocketChannel.open();
98 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
99 channel.socket().setTcpNoDelay(true);
100
101 if (_httpClient.isConnectBlocking())
102 {
103 channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
104 channel.configureBlocking(false);
105 _selectorManager.register( channel, destination );
106 }
107 else
108 {
109 channel.configureBlocking(false);
110 channel.connect(address.toSocketAddress());
111 _selectorManager.register(channel,destination);
112 ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
113 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
114 _connectingChannels.put(channel,connectTimeout);
115 }
116 }
117 catch (UnresolvedAddressException ex)
118 {
119 if (channel != null)
120 channel.close();
121 destination.onConnectionFailed(ex);
122 }
123 catch(IOException ex)
124 {
125 if (channel != null)
126 channel.close();
127 destination.onConnectionFailed(ex);
128 }
129 }
130
131
132 class Manager extends SelectorManager
133 {
134 @Override
135 public boolean dispatch(Runnable task)
136 {
137 return _httpClient._threadPool.dispatch(task);
138 }
139
140 @Override
141 protected void endPointOpened(SelectChannelEndPoint endpoint)
142 {
143 }
144
145 @Override
146 protected void endPointClosed(SelectChannelEndPoint endpoint)
147 {
148 }
149
150 @Override
151 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
152 {
153 }
154
155 @Override
156 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
157 {
158 if (endpoint instanceof SslSelectChannelEndPoint)
159 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
160
161 return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
162 }
163
164 @Override
165 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
166 {
167
168 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
169 if (connectTimeout != null)
170 connectTimeout.cancel();
171 if (LOG.isDebugEnabled())
172 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
173
174
175 HttpDestination dest=(HttpDestination)key.attachment();
176
177 SelectChannelEndPoint ep=null;
178
179 if (dest.isSecure())
180 {
181 if (dest.isProxied())
182 {
183 SSLEngine engine=newSslEngine(channel);
184 ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout());
185 }
186 else
187 {
188 SSLEngine engine=newSslEngine(channel);
189 SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout());
190 sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate());
191 ep = sslEp;
192 }
193 }
194 else
195 {
196 ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
197 }
198
199 HttpConnection connection=(HttpConnection)ep.getConnection();
200 connection.setDestination(dest);
201 dest.onNewConnection(connection);
202 return ep;
203 }
204
205 private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
206 {
207 SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
208 SSLEngine sslEngine;
209 if (channel != null)
210 {
211 String peerHost = channel.socket().getInetAddress().getHostAddress();
212 int peerPort = channel.socket().getPort();
213 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
214 }
215 else
216 {
217 sslEngine = sslContextFactory.newSslEngine();
218 }
219 sslEngine.setUseClientMode(true);
220 sslEngine.beginHandshake();
221
222 return sslEngine;
223 }
224
225
226
227
228
229 @Override
230 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
231 {
232 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
233 if (connectTimeout != null)
234 connectTimeout.cancel();
235
236 if (attachment instanceof HttpDestination)
237 ((HttpDestination)attachment).onConnectionFailed(ex);
238 else
239 super.connectionFailed(channel,ex,attachment);
240 }
241 }
242
243 private class ConnectTimeout extends Timeout.Task
244 {
245 private final SocketChannel channel;
246 private final HttpDestination destination;
247
248 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
249 {
250 this.channel = channel;
251 this.destination = destination;
252 }
253
254 @Override
255 public void expired()
256 {
257 if (channel.isConnectionPending())
258 {
259 LOG.debug("Channel {} timed out while connecting, closing it", channel);
260 try
261 {
262
263 channel.close();
264 }
265 catch (IOException x)
266 {
267 LOG.ignore(x);
268 }
269 destination.onConnectionFailed(new SocketTimeoutException());
270 }
271 }
272 }
273
274
275
276
277
278
279
280 public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
281 {
282 private final SelectChannelEndPoint plainEndPoint;
283 private volatile boolean upgraded = false;
284
285 public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
286 {
287 super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
288 this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
289 }
290
291 public void upgrade()
292 {
293 upgraded = true;
294 }
295
296 public void shutdownOutput() throws IOException
297 {
298 if (upgraded)
299 super.shutdownOutput();
300 else
301 plainEndPoint.shutdownOutput();
302 }
303
304 public void close() throws IOException
305 {
306 if (upgraded)
307 super.close();
308 else
309 plainEndPoint.close();
310 }
311
312 public int fill(Buffer buffer) throws IOException
313 {
314 if (upgraded)
315 return super.fill(buffer);
316 else
317 return plainEndPoint.fill(buffer);
318 }
319
320 public int flush(Buffer buffer) throws IOException
321 {
322 if (upgraded)
323 return super.flush(buffer);
324 else
325 return plainEndPoint.flush(buffer);
326 }
327
328 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
329 {
330 if (upgraded)
331 return super.flush(header, buffer, trailer);
332 else
333 return plainEndPoint.flush(header, buffer, trailer);
334 }
335
336 public String getLocalAddr()
337 {
338 if (upgraded)
339 return super.getLocalAddr();
340 else
341 return plainEndPoint.getLocalAddr();
342 }
343
344 public String getLocalHost()
345 {
346 if (upgraded)
347 return super.getLocalHost();
348 else
349 return plainEndPoint.getLocalHost();
350 }
351
352 public int getLocalPort()
353 {
354 if (upgraded)
355 return super.getLocalPort();
356 else
357 return plainEndPoint.getLocalPort();
358 }
359
360 public String getRemoteAddr()
361 {
362 if (upgraded)
363 return super.getRemoteAddr();
364 else
365 return plainEndPoint.getRemoteAddr();
366 }
367
368 public String getRemoteHost()
369 {
370 if (upgraded)
371 return super.getRemoteHost();
372 else
373 return plainEndPoint.getRemoteHost();
374 }
375
376 public int getRemotePort()
377 {
378 if (upgraded)
379 return super.getRemotePort();
380 else
381 return plainEndPoint.getRemotePort();
382 }
383
384 public boolean isBlocking()
385 {
386 if (upgraded)
387 return super.isBlocking();
388 else
389 return plainEndPoint.isBlocking();
390 }
391
392 public boolean isBufferred()
393 {
394 if (upgraded)
395 return super.isBufferred();
396 else
397 return plainEndPoint.isBufferred();
398 }
399
400 public boolean blockReadable(long millisecs) throws IOException
401 {
402 if (upgraded)
403 return super.blockReadable(millisecs);
404 else
405 return plainEndPoint.blockReadable(millisecs);
406 }
407
408 public boolean blockWritable(long millisecs) throws IOException
409 {
410 if (upgraded)
411 return super.blockWritable(millisecs);
412 else
413 return plainEndPoint.blockWritable(millisecs);
414 }
415
416 public boolean isOpen()
417 {
418 if (upgraded)
419 return super.isOpen();
420 else
421 return plainEndPoint.isOpen();
422 }
423
424 public Object getTransport()
425 {
426 if (upgraded)
427 return super.getTransport();
428 else
429 return plainEndPoint.getTransport();
430 }
431
432 public boolean isBufferingInput()
433 {
434 if (upgraded)
435 return super.isBufferingInput();
436 else
437 return plainEndPoint.isBufferingInput();
438 }
439
440 public boolean isBufferingOutput()
441 {
442 if (upgraded)
443 return super.isBufferingOutput();
444 else
445 return plainEndPoint.isBufferingOutput();
446 }
447
448 public void flush() throws IOException
449 {
450 if (upgraded)
451 super.flush();
452 else
453 plainEndPoint.flush();
454
455 }
456
457 public int getMaxIdleTime()
458 {
459 if (upgraded)
460 return super.getMaxIdleTime();
461 else
462 return plainEndPoint.getMaxIdleTime();
463 }
464
465 public void setMaxIdleTime(int timeMs) throws IOException
466 {
467 if (upgraded)
468 super.setMaxIdleTime(timeMs);
469 else
470 plainEndPoint.setMaxIdleTime(timeMs);
471 }
472 }
473 }