1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.net.SocketTimeoutException;
18 import java.nio.channels.SelectionKey;
19 import java.nio.channels.SocketChannel;
20 import java.nio.channels.UnresolvedAddressException;
21 import java.util.Arrays;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24
25 import javax.net.ssl.SSLEngine;
26
27 import org.eclipse.jetty.io.AsyncEndPoint;
28 import org.eclipse.jetty.io.Buffer;
29 import org.eclipse.jetty.io.ConnectedEndPoint;
30 import org.eclipse.jetty.io.Connection;
31 import org.eclipse.jetty.io.nio.AsyncConnection;
32 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
33 import org.eclipse.jetty.io.nio.SelectorManager;
34 import org.eclipse.jetty.io.nio.SslConnection;
35 import org.eclipse.jetty.util.component.AbstractLifeCycle;
36 import org.eclipse.jetty.util.component.AggregateLifeCycle;
37 import org.eclipse.jetty.util.component.Dumpable;
38 import org.eclipse.jetty.util.log.Log;
39 import org.eclipse.jetty.util.log.Logger;
40 import org.eclipse.jetty.util.ssl.SslContextFactory;
41 import org.eclipse.jetty.util.thread.Timeout;
42 import org.eclipse.jetty.util.thread.Timeout.Task;
43
44 class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Dumpable
45 {
46 private static final Logger LOG = Log.getLogger(SelectConnector.class);
47
48 private final HttpClient _httpClient;
49 private final Manager _selectorManager=new Manager();
50 private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
51
52
53
54
55 SelectConnector(HttpClient httpClient)
56 {
57 _httpClient = httpClient;
58 }
59
60
61 @Override
62 protected void doStart() throws Exception
63 {
64 super.doStart();
65
66 _selectorManager.start();
67 }
68
69
70 @Override
71 protected void doStop() throws Exception
72 {
73 _selectorManager.stop();
74 }
75
76 public String dump()
77 {
78 return AggregateLifeCycle.dump(this);
79 }
80
81 public void dump(Appendable out, String indent) throws IOException
82 {
83 out.append(String.valueOf(this)).append("\n");
84 AggregateLifeCycle.dump(out, indent, Arrays.asList(_selectorManager));
85 }
86
87
88 public void startConnection( HttpDestination destination )
89 throws IOException
90 {
91 SocketChannel channel = null;
92 try
93 {
94 channel = SocketChannel.open();
95 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
96 channel.socket().setTcpNoDelay(true);
97
98 if (_httpClient.isConnectBlocking())
99 {
100 channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
101 channel.configureBlocking(false);
102 _selectorManager.register( channel, destination );
103 }
104 else
105 {
106 channel.configureBlocking(false);
107 channel.connect(address.toSocketAddress());
108 _selectorManager.register(channel,destination);
109 ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
110 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
111 _connectingChannels.put(channel,connectTimeout);
112 }
113 }
114 catch (UnresolvedAddressException ex)
115 {
116 if (channel != null)
117 channel.close();
118 destination.onConnectionFailed(ex);
119 }
120 catch(IOException ex)
121 {
122 if (channel != null)
123 channel.close();
124 destination.onConnectionFailed(ex);
125 }
126 }
127
128
129 class Manager extends SelectorManager
130 {
131 Logger LOG = SelectConnector.LOG;
132
133 @Override
134 public boolean dispatch(Runnable task)
135 {
136 return _httpClient._threadPool.dispatch(task);
137 }
138
139 @Override
140 protected void endPointOpened(SelectChannelEndPoint endpoint)
141 {
142 }
143
144 @Override
145 protected void endPointClosed(SelectChannelEndPoint endpoint)
146 {
147 }
148
149 @Override
150 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
151 {
152 }
153
154 @Override
155 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
156 {
157 return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
158 }
159
160 @Override
161 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
162 {
163
164 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
165 if (connectTimeout != null)
166 connectTimeout.cancel();
167 if (LOG.isDebugEnabled())
168 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
169
170
171 HttpDestination dest=(HttpDestination)key.attachment();
172
173 SelectChannelEndPoint scep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
174 AsyncEndPoint ep = scep;
175
176 if (dest.isSecure())
177 {
178 LOG.debug("secure to {}, proxied={}",channel,dest.isProxied());
179 ep = new UpgradableEndPoint(ep,newSslEngine(channel));
180 }
181
182 AsyncConnection connection = selectSet.getManager().newConnection(channel,ep, key.attachment());
183 ep.setConnection(connection);
184
185 AbstractHttpConnection httpConnection=(AbstractHttpConnection)connection;
186 httpConnection.setDestination(dest);
187
188 if (dest.isSecure() && !dest.isProxied())
189 ((UpgradableEndPoint)ep).upgrade();
190
191 dest.onNewConnection(httpConnection);
192
193 return scep;
194 }
195
196 private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
197 {
198 SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
199 SSLEngine sslEngine;
200 if (channel != null)
201 {
202 String peerHost = channel.socket().getInetAddress().getHostAddress();
203 int peerPort = channel.socket().getPort();
204 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
205 }
206 else
207 {
208 sslEngine = sslContextFactory.newSslEngine();
209 }
210 sslEngine.setUseClientMode(true);
211 sslEngine.beginHandshake();
212
213 return sslEngine;
214 }
215
216
217
218
219
220 @Override
221 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
222 {
223 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
224 if (connectTimeout != null)
225 connectTimeout.cancel();
226
227 if (attachment instanceof HttpDestination)
228 ((HttpDestination)attachment).onConnectionFailed(ex);
229 else
230 super.connectionFailed(channel,ex,attachment);
231 }
232 }
233
234 private class ConnectTimeout extends Timeout.Task
235 {
236 private final SocketChannel channel;
237 private final HttpDestination destination;
238
239 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
240 {
241 this.channel = channel;
242 this.destination = destination;
243 }
244
245 @Override
246 public void expired()
247 {
248 if (channel.isConnectionPending())
249 {
250 LOG.debug("Channel {} timed out while connecting, closing it", channel);
251 try
252 {
253
254 channel.close();
255 }
256 catch (IOException x)
257 {
258 LOG.ignore(x);
259 }
260 destination.onConnectionFailed(new SocketTimeoutException());
261 }
262 }
263 }
264
265 public static class UpgradableEndPoint implements AsyncEndPoint
266 {
267 AsyncEndPoint _endp;
268 SSLEngine _engine;
269
270 public UpgradableEndPoint(AsyncEndPoint endp, SSLEngine engine) throws IOException
271 {
272 _engine=engine;
273 _endp=endp;
274 }
275
276 public void upgrade()
277 {
278 AsyncHttpConnection connection = (AsyncHttpConnection)_endp.getConnection();
279
280 SslConnection sslConnection = new SslConnection(_engine,_endp);
281 _endp.setConnection(sslConnection);
282
283 _endp=sslConnection.getSslEndPoint();
284 sslConnection.getSslEndPoint().setConnection(connection);
285
286 LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection);
287 }
288
289
290 public Connection getConnection()
291 {
292 return _endp.getConnection();
293 }
294
295 public void setConnection(Connection connection)
296 {
297 _endp.setConnection(connection);
298 }
299
300 public void shutdownOutput() throws IOException
301 {
302 _endp.shutdownOutput();
303 }
304
305 public void asyncDispatch()
306 {
307 _endp.asyncDispatch();
308 }
309
310 public boolean isOutputShutdown()
311 {
312 return _endp.isOutputShutdown();
313 }
314
315 public void shutdownInput() throws IOException
316 {
317 _endp.shutdownInput();
318 }
319
320 public void scheduleWrite()
321 {
322 _endp.scheduleWrite();
323 }
324
325 public boolean isInputShutdown()
326 {
327 return _endp.isInputShutdown();
328 }
329
330 public void close() throws IOException
331 {
332 _endp.close();
333 }
334
335 public int fill(Buffer buffer) throws IOException
336 {
337 return _endp.fill(buffer);
338 }
339
340 public boolean isWritable()
341 {
342 return _endp.isWritable();
343 }
344
345 public boolean hasProgressed()
346 {
347 return _endp.hasProgressed();
348 }
349
350 public int flush(Buffer buffer) throws IOException
351 {
352 return _endp.flush(buffer);
353 }
354
355 public void scheduleTimeout(Task task, long timeoutMs)
356 {
357 _endp.scheduleTimeout(task,timeoutMs);
358 }
359
360 public void cancelTimeout(Task task)
361 {
362 _endp.cancelTimeout(task);
363 }
364
365 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
366 {
367 return _endp.flush(header,buffer,trailer);
368 }
369
370 public String getLocalAddr()
371 {
372 return _endp.getLocalAddr();
373 }
374
375 public String getLocalHost()
376 {
377 return _endp.getLocalHost();
378 }
379
380 public int getLocalPort()
381 {
382 return _endp.getLocalPort();
383 }
384
385 public String getRemoteAddr()
386 {
387 return _endp.getRemoteAddr();
388 }
389
390 public String getRemoteHost()
391 {
392 return _endp.getRemoteHost();
393 }
394
395 public int getRemotePort()
396 {
397 return _endp.getRemotePort();
398 }
399
400 public boolean isBlocking()
401 {
402 return _endp.isBlocking();
403 }
404
405 public boolean blockReadable(long millisecs) throws IOException
406 {
407 return _endp.blockReadable(millisecs);
408 }
409
410 public boolean blockWritable(long millisecs) throws IOException
411 {
412 return _endp.blockWritable(millisecs);
413 }
414
415 public boolean isOpen()
416 {
417 return _endp.isOpen();
418 }
419
420 public Object getTransport()
421 {
422 return _endp.getTransport();
423 }
424
425 public void flush() throws IOException
426 {
427 _endp.flush();
428 }
429
430 public int getMaxIdleTime()
431 {
432 return _endp.getMaxIdleTime();
433 }
434
435 public void setMaxIdleTime(int timeMs) throws IOException
436 {
437 _endp.setMaxIdleTime(timeMs);
438 }
439
440 public void onIdleExpired()
441 {
442 _endp.onIdleExpired();
443 }
444
445 public void setCheckForIdle(boolean check)
446 {
447 _endp.setCheckForIdle(check);
448 }
449
450 public boolean isCheckForIdle()
451 {
452 return _endp.isCheckForIdle();
453 }
454
455 public String toString()
456 {
457 return "Upgradable:"+_endp.toString();
458 }
459
460 }
461 }