1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.net.SocketTimeoutException;
18 import java.nio.channels.SelectionKey;
19 import java.nio.channels.SocketChannel;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22
23 import javax.net.ssl.SSLContext;
24 import javax.net.ssl.SSLEngine;
25 import javax.net.ssl.SSLSession;
26
27 import org.eclipse.jetty.http.HttpGenerator;
28 import org.eclipse.jetty.http.HttpParser;
29 import org.eclipse.jetty.io.Buffer;
30 import org.eclipse.jetty.io.Buffers;
31 import org.eclipse.jetty.io.Buffers.Type;
32 import org.eclipse.jetty.io.BuffersFactory;
33 import org.eclipse.jetty.io.ConnectedEndPoint;
34 import org.eclipse.jetty.io.Connection;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
37 import org.eclipse.jetty.io.nio.SelectorManager;
38 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.thread.Timeout;
42
43 class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
44 {
45 private final HttpClient _httpClient;
46 private final Manager _selectorManager=new Manager();
47 private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
48 private SSLContext _sslContext;
49 private Buffers _sslBuffers;
50 private int _maxBuffers=1024;
51
52
53
54
55 SelectConnector(HttpClient httpClient)
56 {
57 _httpClient = httpClient;
58 }
59
60
61 @Override
62 protected void doStart() throws Exception
63 {
64 super.doStart();
65
66 _selectorManager.start();
67
68 final boolean direct=_httpClient.getUseDirectBuffers();
69
70 SSLEngine sslEngine=_selectorManager.newSslEngine();
71 final SSLSession ssl_session=sslEngine.getSession();
72 _sslBuffers = BuffersFactory.newBuffers(
73 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
74 direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
75 direct?Type.DIRECT:Type.INDIRECT,_maxBuffers);
76
77 _httpClient._threadPool.dispatch(this);
78 }
79
80
81 @Override
82 protected void doStop() throws Exception
83 {
84 _selectorManager.stop();
85 }
86
87
88 public void startConnection( HttpDestination destination )
89 throws IOException
90 {
91 try
92 {
93 SocketChannel channel = SocketChannel.open();
94 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
95 channel.socket().setTcpNoDelay(true);
96
97 if (!_httpClient.isConnectBlocking())
98 {
99 channel.configureBlocking( false );
100 channel.connect(address.toSocketAddress());
101 _selectorManager.register( channel, destination );
102 ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination);
103 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
104 _connectingChannels.put(channel, connectTimeout);
105 }
106 else
107 {
108 channel.configureBlocking( true );
109 channel.socket().setSoTimeout(_httpClient.getConnectTimeout());
110 channel.socket().connect(address.toSocketAddress(),_httpClient.getConnectTimeout());
111 channel.configureBlocking(false);
112 channel.socket().setSoTimeout((int)_httpClient.getTimeout());
113
114 _selectorManager.register( channel, destination );
115 }
116
117 }
118 catch(IOException ex)
119 {
120 destination.onConnectionFailed(ex);
121 }
122
123 }
124
125
126 public void run()
127 {
128 while (_httpClient.isRunning())
129 {
130 try
131 {
132 _selectorManager.doSelect(0);
133 }
134 catch (Exception e)
135 {
136 Log.warn(e.toString());
137 Log.debug(e);
138 Thread.yield();
139 }
140 }
141 }
142
143
144 class Manager extends SelectorManager
145 {
146 @Override
147 public boolean dispatch(Runnable task)
148 {
149 return SelectConnector.this._httpClient._threadPool.dispatch(task);
150 }
151
152 @Override
153 protected void endPointOpened(SelectChannelEndPoint endpoint)
154 {
155 }
156
157 @Override
158 protected void endPointClosed(SelectChannelEndPoint endpoint)
159 {
160 }
161
162 @Override
163 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
164 {
165 }
166
167 @Override
168 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
169 {
170 if (endpoint instanceof SslSelectChannelEndPoint)
171 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
172
173 return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
174 }
175
176 @Override
177 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
178 {
179
180 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
181 if (connectTimeout != null)
182 connectTimeout.cancel();
183 Log.debug("Channels with connection pending: {}", _connectingChannels.size());
184
185
186 HttpDestination dest=(HttpDestination)key.attachment();
187
188 SelectChannelEndPoint ep=null;
189
190 if (dest.isSecure())
191 {
192 if (dest.isProxied())
193 {
194 SSLEngine engine=newSslEngine();
195 ep = new ProxySelectChannelEndPoint(channel,selectSet,key,_sslBuffers,engine);
196 }
197 else
198 {
199 SSLEngine engine=newSslEngine();
200 ep = new SslSelectChannelEndPoint(_sslBuffers,channel,selectSet,key,engine);
201 }
202 }
203 else
204 {
205 ep=new SelectChannelEndPoint(channel,selectSet,key);
206 }
207
208 HttpConnection connection=(HttpConnection)ep.getConnection();
209 connection.setDestination(dest);
210 dest.onNewConnection(connection);
211 return ep;
212 }
213
214 private synchronized SSLEngine newSslEngine() throws IOException
215 {
216 if (_sslContext==null)
217 {
218 _sslContext = SelectConnector.this._httpClient.getSSLContext();
219 }
220
221 SSLEngine sslEngine = _sslContext.createSSLEngine();
222 sslEngine.setUseClientMode(true);
223 sslEngine.beginHandshake();
224
225 return sslEngine;
226 }
227
228
229
230
231
232 @Override
233 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
234 {
235 if (attachment instanceof HttpDestination)
236 ((HttpDestination)attachment).onConnectionFailed(ex);
237 else
238 super.connectionFailed(channel,ex,attachment);
239 }
240 }
241
242 private class ConnectTimeout extends Timeout.Task
243 {
244 private final SocketChannel channel;
245 private final HttpDestination destination;
246
247 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
248 {
249 this.channel = channel;
250 this.destination = destination;
251 }
252
253 @Override
254 public void expired()
255 {
256 if (channel.isConnectionPending())
257 {
258 Log.debug("Channel {} timed out while connecting, closing it", channel);
259 try
260 {
261
262 channel.close();
263 }
264 catch (IOException x)
265 {
266 Log.ignore(x);
267 }
268 destination.onConnectionFailed(new SocketTimeoutException());
269 }
270 }
271 }
272
273
274
275
276
277
278
279 public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
280 {
281 private final SelectChannelEndPoint plainEndPoint;
282 private volatile boolean upgraded = false;
283
284 public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine) throws IOException
285 {
286 super(sslBuffers, channel, selectSet, key, engine);
287 this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key);
288 }
289
290 public void upgrade()
291 {
292 upgraded = true;
293 }
294
295 public void shutdownOutput() throws IOException
296 {
297 if (upgraded)
298 super.shutdownOutput();
299 else
300 plainEndPoint.shutdownOutput();
301 }
302
303 public void close() throws IOException
304 {
305 if (upgraded)
306 super.close();
307 else
308 plainEndPoint.close();
309 }
310
311 public int fill(Buffer buffer) throws IOException
312 {
313 if (upgraded)
314 return super.fill(buffer);
315 else
316 return plainEndPoint.fill(buffer);
317 }
318
319 public int flush(Buffer buffer) throws IOException
320 {
321 if (upgraded)
322 return super.flush(buffer);
323 else
324 return plainEndPoint.flush(buffer);
325 }
326
327 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
328 {
329 if (upgraded)
330 return super.flush(header, buffer, trailer);
331 else
332 return plainEndPoint.flush(header, buffer, trailer);
333 }
334
335 public String getLocalAddr()
336 {
337 if (upgraded)
338 return super.getLocalAddr();
339 else
340 return plainEndPoint.getLocalAddr();
341 }
342
343 public String getLocalHost()
344 {
345 if (upgraded)
346 return super.getLocalHost();
347 else
348 return plainEndPoint.getLocalHost();
349 }
350
351 public int getLocalPort()
352 {
353 if (upgraded)
354 return super.getLocalPort();
355 else
356 return plainEndPoint.getLocalPort();
357 }
358
359 public String getRemoteAddr()
360 {
361 if (upgraded)
362 return super.getRemoteAddr();
363 else
364 return plainEndPoint.getRemoteAddr();
365 }
366
367 public String getRemoteHost()
368 {
369 if (upgraded)
370 return super.getRemoteHost();
371 else
372 return plainEndPoint.getRemoteHost();
373 }
374
375 public int getRemotePort()
376 {
377 if (upgraded)
378 return super.getRemotePort();
379 else
380 return plainEndPoint.getRemotePort();
381 }
382
383 public boolean isBlocking()
384 {
385 if (upgraded)
386 return super.isBlocking();
387 else
388 return plainEndPoint.isBlocking();
389 }
390
391 public boolean isBufferred()
392 {
393 if (upgraded)
394 return super.isBufferred();
395 else
396 return plainEndPoint.isBufferred();
397 }
398
399 public boolean blockReadable(long millisecs) throws IOException
400 {
401 if (upgraded)
402 return super.blockReadable(millisecs);
403 else
404 return plainEndPoint.blockReadable(millisecs);
405 }
406
407 public boolean blockWritable(long millisecs) throws IOException
408 {
409 if (upgraded)
410 return super.blockWritable(millisecs);
411 else
412 return plainEndPoint.blockWritable(millisecs);
413 }
414
415 public boolean isOpen()
416 {
417 if (upgraded)
418 return super.isOpen();
419 else
420 return plainEndPoint.isOpen();
421 }
422
423 public Object getTransport()
424 {
425 if (upgraded)
426 return super.getTransport();
427 else
428 return plainEndPoint.getTransport();
429 }
430
431 public boolean isBufferingInput()
432 {
433 if (upgraded)
434 return super.isBufferingInput();
435 else
436 return plainEndPoint.isBufferingInput();
437 }
438
439 public boolean isBufferingOutput()
440 {
441 if (upgraded)
442 return super.isBufferingOutput();
443 else
444 return plainEndPoint.isBufferingOutput();
445 }
446
447 public void flush() throws IOException
448 {
449 if (upgraded)
450 super.flush();
451 else
452 plainEndPoint.flush();
453
454 }
455
456 public int getMaxIdleTime()
457 {
458 if (upgraded)
459 return super.getMaxIdleTime();
460 else
461 return plainEndPoint.getMaxIdleTime();
462 }
463
464 public void setMaxIdleTime(int timeMs) throws IOException
465 {
466 if (upgraded)
467 super.setMaxIdleTime(timeMs);
468 else
469 plainEndPoint.setMaxIdleTime(timeMs);
470 }
471 }
472 }