1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.net.SocketTimeoutException;
18 import java.nio.channels.SelectionKey;
19 import java.nio.channels.SocketChannel;
20 import javax.net.ssl.SSLContext;
21 import javax.net.ssl.SSLEngine;
22 import javax.net.ssl.SSLSession;
23
24 import org.eclipse.jetty.http.HttpGenerator;
25 import org.eclipse.jetty.http.HttpParser;
26 import org.eclipse.jetty.io.Buffer;
27 import org.eclipse.jetty.io.Buffers;
28 import org.eclipse.jetty.io.ConnectedEndPoint;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.io.ThreadLocalBuffers;
32 import org.eclipse.jetty.io.nio.DirectNIOBuffer;
33 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
34 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
35 import org.eclipse.jetty.io.nio.SelectorManager;
36 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
37 import org.eclipse.jetty.util.component.AbstractLifeCycle;
38 import org.eclipse.jetty.util.log.Log;
39 import org.eclipse.jetty.util.thread.Timeout;
40
41 class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
42 {
43 private final HttpClient _httpClient;
44 private final Manager _selectorManager=new Manager();
45 private SSLContext _sslContext;
46 private Buffers _sslBuffers;
47 private boolean _blockingConnect;
48
49
50
51
52 SelectConnector(HttpClient httpClient)
53 {
54 _httpClient = httpClient;
55 }
56
57
58
59
60
61 public boolean isBlockingConnect()
62 {
63 return _blockingConnect;
64 }
65
66
67
68
69
70 public void setBlockingConnect(boolean blockingConnect)
71 {
72 _blockingConnect = blockingConnect;
73 }
74
75
76 @Override
77 protected void doStart() throws Exception
78 {
79 super.doStart();
80
81 _selectorManager.start();
82
83 final boolean direct=_httpClient.getUseDirectBuffers();
84
85 SSLEngine sslEngine=_selectorManager.newSslEngine();
86 final SSLSession ssl_session=sslEngine.getSession();
87 ThreadLocalBuffers ssl_buffers = new ThreadLocalBuffers()
88 {
89 {
90 super.setBufferSize(ssl_session.getApplicationBufferSize());
91 super.setHeaderSize(ssl_session.getApplicationBufferSize());
92 }
93
94 @Override
95 protected Buffer newBuffer(int size)
96 {
97 return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
98 }
99 @Override
100 protected Buffer newHeader(int size)
101 {
102 return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
103 }
104 @Override
105 protected boolean isHeader(Buffer buffer)
106 {
107 return true;
108 }
109
110 @Override
111 public void setBufferSize(int size)
112 {
113 }
114
115 @Override
116 public void setHeaderSize(int size)
117 {
118 }
119 };
120 _sslBuffers=ssl_buffers;
121
122 _httpClient._threadPool.dispatch(this);
123 }
124
125
126 @Override
127 protected void doStop() throws Exception
128 {
129 _selectorManager.stop();
130 }
131
132
133 public void startConnection( HttpDestination destination )
134 throws IOException
135 {
136 try
137 {
138 SocketChannel channel = SocketChannel.open();
139 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
140 channel.configureBlocking( true );
141 channel.socket().setTcpNoDelay(true);
142 channel.socket().setSoTimeout(_httpClient.getConnectTimeout());
143 channel.connect(address.toSocketAddress());
144 channel.configureBlocking(false);
145 channel.socket().setSoTimeout((int)_httpClient.getTimeout());
146
147 _selectorManager.register( channel, destination );
148 }
149 catch(IOException ex)
150 {
151 destination.onConnectionFailed(ex);
152 }
153
154 }
155
156
157 public void run()
158 {
159 while (_httpClient.isRunning())
160 {
161 try
162 {
163 _selectorManager.doSelect(0);
164 }
165 catch (Exception e)
166 {
167 Log.warn(e.toString());
168 Log.debug(e);
169 Thread.yield();
170 }
171 }
172 }
173
174
175 class Manager extends SelectorManager
176 {
177 @Override
178 public boolean dispatch(Runnable task)
179 {
180 return SelectConnector.this._httpClient._threadPool.dispatch(task);
181 }
182
183 @Override
184 protected void endPointOpened(SelectChannelEndPoint endpoint)
185 {
186 }
187
188 @Override
189 protected void endPointClosed(SelectChannelEndPoint endpoint)
190 {
191 }
192
193 @Override
194 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
195 {
196 }
197
198 @Override
199 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
200 {
201 if (endpoint instanceof SslSelectChannelEndPoint)
202 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
203
204 return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
205 }
206
207 @Override
208 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
209 {
210
211 HttpDestination dest=(HttpDestination)key.attachment();
212
213 SelectChannelEndPoint ep=null;
214
215 if (dest.isSecure())
216 {
217 if (dest.isProxied())
218 {
219 SSLEngine engine=newSslEngine();
220 ep = new ProxySelectChannelEndPoint(channel,selectSet,key,_sslBuffers,engine);
221 }
222 else
223 {
224 SSLEngine engine=newSslEngine();
225 ep = new SslSelectChannelEndPoint(_sslBuffers,channel,selectSet,key,engine);
226 }
227 }
228 else
229 {
230 ep=new SelectChannelEndPoint(channel,selectSet,key);
231 }
232
233 HttpConnection connection=(HttpConnection)ep.getConnection();
234 connection.setDestination(dest);
235 dest.onNewConnection(connection);
236 return ep;
237 }
238
239 private synchronized SSLEngine newSslEngine() throws IOException
240 {
241 if (_sslContext==null)
242 {
243 _sslContext = SelectConnector.this._httpClient.getSSLContext();
244 }
245
246 SSLEngine sslEngine = _sslContext.createSSLEngine();
247 sslEngine.setUseClientMode(true);
248 sslEngine.beginHandshake();
249
250 return sslEngine;
251 }
252 }
253
254 private class ConnectTimeout extends Timeout.Task
255 {
256 private final SocketChannel channel;
257 private final HttpDestination destination;
258
259 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
260 {
261 this.channel = channel;
262 this.destination = destination;
263 }
264
265 @Override
266 public void expired()
267 {
268 if (channel.isConnectionPending())
269 {
270 Log.debug("Channel {} timed out while connecting, closing it", channel);
271 try
272 {
273
274 channel.close();
275 }
276 catch (IOException x)
277 {
278 Log.ignore(x);
279 }
280 destination.onConnectionFailed(new SocketTimeoutException());
281 }
282 }
283 }
284
285
286
287
288
289
290
291 public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
292 {
293 private final SelectChannelEndPoint plainEndPoint;
294 private volatile boolean upgraded = false;
295
296 public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine) throws IOException
297 {
298 super(sslBuffers, channel, selectSet, key, engine);
299 this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key);
300 }
301
302 public void upgrade()
303 {
304 upgraded = true;
305 }
306
307 public void shutdownOutput() throws IOException
308 {
309 if (upgraded)
310 super.shutdownOutput();
311 else
312 plainEndPoint.shutdownOutput();
313 }
314
315 public void close() throws IOException
316 {
317 if (upgraded)
318 super.close();
319 else
320 plainEndPoint.close();
321 }
322
323 public int fill(Buffer buffer) throws IOException
324 {
325 if (upgraded)
326 return super.fill(buffer);
327 else
328 return plainEndPoint.fill(buffer);
329 }
330
331 public int flush(Buffer buffer) throws IOException
332 {
333 if (upgraded)
334 return super.flush(buffer);
335 else
336 return plainEndPoint.flush(buffer);
337 }
338
339 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
340 {
341 if (upgraded)
342 return super.flush(header, buffer, trailer);
343 else
344 return plainEndPoint.flush(header, buffer, trailer);
345 }
346
347 public String getLocalAddr()
348 {
349 if (upgraded)
350 return super.getLocalAddr();
351 else
352 return plainEndPoint.getLocalAddr();
353 }
354
355 public String getLocalHost()
356 {
357 if (upgraded)
358 return super.getLocalHost();
359 else
360 return plainEndPoint.getLocalHost();
361 }
362
363 public int getLocalPort()
364 {
365 if (upgraded)
366 return super.getLocalPort();
367 else
368 return plainEndPoint.getLocalPort();
369 }
370
371 public String getRemoteAddr()
372 {
373 if (upgraded)
374 return super.getRemoteAddr();
375 else
376 return plainEndPoint.getRemoteAddr();
377 }
378
379 public String getRemoteHost()
380 {
381 if (upgraded)
382 return super.getRemoteHost();
383 else
384 return plainEndPoint.getRemoteHost();
385 }
386
387 public int getRemotePort()
388 {
389 if (upgraded)
390 return super.getRemotePort();
391 else
392 return plainEndPoint.getRemotePort();
393 }
394
395 public boolean isBlocking()
396 {
397 if (upgraded)
398 return super.isBlocking();
399 else
400 return plainEndPoint.isBlocking();
401 }
402
403 public boolean isBufferred()
404 {
405 if (upgraded)
406 return super.isBufferred();
407 else
408 return plainEndPoint.isBufferred();
409 }
410
411 public boolean blockReadable(long millisecs) throws IOException
412 {
413 if (upgraded)
414 return super.blockReadable(millisecs);
415 else
416 return plainEndPoint.blockReadable(millisecs);
417 }
418
419 public boolean blockWritable(long millisecs) throws IOException
420 {
421 if (upgraded)
422 return super.blockWritable(millisecs);
423 else
424 return plainEndPoint.blockWritable(millisecs);
425 }
426
427 public boolean isOpen()
428 {
429 if (upgraded)
430 return super.isOpen();
431 else
432 return plainEndPoint.isOpen();
433 }
434
435 public Object getTransport()
436 {
437 if (upgraded)
438 return super.getTransport();
439 else
440 return plainEndPoint.getTransport();
441 }
442
443 public boolean isBufferingInput()
444 {
445 if (upgraded)
446 return super.isBufferingInput();
447 else
448 return plainEndPoint.isBufferingInput();
449 }
450
451 public boolean isBufferingOutput()
452 {
453 if (upgraded)
454 return super.isBufferingOutput();
455 else
456 return plainEndPoint.isBufferingOutput();
457 }
458
459 public void flush() throws IOException
460 {
461 if (upgraded)
462 super.flush();
463 else
464 plainEndPoint.flush();
465
466 }
467
468 public int getMaxIdleTime()
469 {
470 if (upgraded)
471 return super.getMaxIdleTime();
472 else
473 return plainEndPoint.getMaxIdleTime();
474 }
475
476 public void setMaxIdleTime(int timeMs) throws IOException
477 {
478 if (upgraded)
479 super.setMaxIdleTime(timeMs);
480 else
481 plainEndPoint.setMaxIdleTime(timeMs);
482 }
483 }
484 }