1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.net.SocketTimeoutException;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.SocketChannel;
25 import java.nio.channels.UnresolvedAddressException;
26 import java.util.Map;
27 import java.util.concurrent.ConcurrentHashMap;
28 import javax.net.ssl.SSLEngine;
29
30 import org.eclipse.jetty.io.AsyncEndPoint;
31 import org.eclipse.jetty.io.Buffer;
32 import org.eclipse.jetty.io.ConnectedEndPoint;
33 import org.eclipse.jetty.io.Connection;
34 import org.eclipse.jetty.io.nio.AsyncConnection;
35 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
36 import org.eclipse.jetty.io.nio.SelectorManager;
37 import org.eclipse.jetty.io.nio.SslConnection;
38 import org.eclipse.jetty.util.component.AggregateLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.ssl.SslContextFactory;
43 import org.eclipse.jetty.util.thread.Timeout;
44 import org.eclipse.jetty.util.thread.Timeout.Task;
45
46 class SelectConnector extends AggregateLifeCycle implements HttpClient.Connector, Dumpable
47 {
48 private static final Logger LOG = Log.getLogger(SelectConnector.class);
49
50 private final HttpClient _httpClient;
51 private final Manager _selectorManager=new Manager();
52 private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
53
54
55
56
57
58
59 SelectConnector(HttpClient httpClient)
60 {
61 _httpClient = httpClient;
62 addBean(_httpClient,false);
63 addBean(_selectorManager,true);
64 }
65
66
67 public void startConnection( HttpDestination destination )
68 throws IOException
69 {
70 SocketChannel channel = null;
71 try
72 {
73 channel = SocketChannel.open();
74 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
75 channel.socket().setTcpNoDelay(true);
76
77 if (_httpClient.isConnectBlocking())
78 {
79 channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
80 channel.configureBlocking(false);
81 _selectorManager.register( channel, destination );
82 }
83 else
84 {
85 channel.configureBlocking(false);
86 channel.connect(address.toSocketAddress());
87 _selectorManager.register(channel,destination);
88 ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
89 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
90 _connectingChannels.put(channel,connectTimeout);
91 }
92 }
93 catch (UnresolvedAddressException ex)
94 {
95 if (channel != null)
96 channel.close();
97 destination.onConnectionFailed(ex);
98 }
99 catch(IOException ex)
100 {
101 if (channel != null)
102 channel.close();
103 destination.onConnectionFailed(ex);
104 }
105 }
106
107
108 class Manager extends SelectorManager
109 {
110 Logger LOG = SelectConnector.LOG;
111
112 @Override
113 public boolean dispatch(Runnable task)
114 {
115 return _httpClient._threadPool.dispatch(task);
116 }
117
118 @Override
119 protected void endPointOpened(SelectChannelEndPoint endpoint)
120 {
121 }
122
123 @Override
124 protected void endPointClosed(SelectChannelEndPoint endpoint)
125 {
126 }
127
128 @Override
129 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
130 {
131 }
132
133 @Override
134 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
135 {
136 return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
137 }
138
139 @Override
140 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
141 {
142
143 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
144 if (connectTimeout != null)
145 connectTimeout.cancel();
146 if (LOG.isDebugEnabled())
147 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
148
149
150 HttpDestination dest=(HttpDestination)key.attachment();
151
152 SelectChannelEndPoint scep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
153 AsyncEndPoint ep = scep;
154
155 if (dest.isSecure())
156 {
157 LOG.debug("secure to {}, proxied={}",channel,dest.isProxied());
158 ep = new UpgradableEndPoint(ep,newSslEngine(channel));
159 }
160
161 AsyncConnection connection = selectSet.getManager().newConnection(channel,ep, key.attachment());
162 ep.setConnection(connection);
163
164 AbstractHttpConnection httpConnection=(AbstractHttpConnection)connection;
165 httpConnection.setDestination(dest);
166
167 if (dest.isSecure() && !dest.isProxied())
168 ((UpgradableEndPoint)ep).upgrade();
169
170 dest.onNewConnection(httpConnection);
171
172 return scep;
173 }
174
175 private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
176 {
177 SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
178 SSLEngine sslEngine;
179 if (channel != null)
180 {
181 String peerHost = channel.socket().getInetAddress().getHostAddress();
182 int peerPort = channel.socket().getPort();
183 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
184 }
185 else
186 {
187 sslEngine = sslContextFactory.newSslEngine();
188 }
189 sslEngine.setUseClientMode(true);
190 sslEngine.beginHandshake();
191
192 return sslEngine;
193 }
194
195
196
197
198
199 @Override
200 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
201 {
202 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
203 if (connectTimeout != null)
204 connectTimeout.cancel();
205
206 if (attachment instanceof HttpDestination)
207 ((HttpDestination)attachment).onConnectionFailed(ex);
208 else
209 super.connectionFailed(channel,ex,attachment);
210 }
211 }
212
213 private class ConnectTimeout extends Timeout.Task
214 {
215 private final SocketChannel channel;
216 private final HttpDestination destination;
217
218 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
219 {
220 this.channel = channel;
221 this.destination = destination;
222 }
223
224 @Override
225 public void expired()
226 {
227 if (channel.isConnectionPending())
228 {
229 LOG.debug("Channel {} timed out while connecting, closing it", channel);
230 try
231 {
232
233 channel.close();
234 }
235 catch (IOException x)
236 {
237 LOG.ignore(x);
238 }
239 destination.onConnectionFailed(new SocketTimeoutException());
240 }
241 }
242 }
243
244 public static class UpgradableEndPoint implements AsyncEndPoint
245 {
246 AsyncEndPoint _endp;
247 SSLEngine _engine;
248
249 public UpgradableEndPoint(AsyncEndPoint endp, SSLEngine engine) throws IOException
250 {
251 _engine=engine;
252 _endp=endp;
253 }
254
255 public void upgrade()
256 {
257 AsyncHttpConnection connection = (AsyncHttpConnection)_endp.getConnection();
258
259 SslConnection sslConnection = new SslConnection(_engine,_endp);
260 _endp.setConnection(sslConnection);
261
262 _endp=sslConnection.getSslEndPoint();
263 sslConnection.getSslEndPoint().setConnection(connection);
264
265 LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection);
266 }
267
268
269 public Connection getConnection()
270 {
271 return _endp.getConnection();
272 }
273
274 public void setConnection(Connection connection)
275 {
276 _endp.setConnection(connection);
277 }
278
279 public void shutdownOutput() throws IOException
280 {
281 _endp.shutdownOutput();
282 }
283
284 public void dispatch()
285 {
286 _endp.asyncDispatch();
287 }
288
289 public void asyncDispatch()
290 {
291 _endp.asyncDispatch();
292 }
293
294 public boolean isOutputShutdown()
295 {
296 return _endp.isOutputShutdown();
297 }
298
299 public void shutdownInput() throws IOException
300 {
301 _endp.shutdownInput();
302 }
303
304 public void scheduleWrite()
305 {
306 _endp.scheduleWrite();
307 }
308
309 public boolean isInputShutdown()
310 {
311 return _endp.isInputShutdown();
312 }
313
314 public void close() throws IOException
315 {
316 _endp.close();
317 }
318
319 public int fill(Buffer buffer) throws IOException
320 {
321 return _endp.fill(buffer);
322 }
323
324 public boolean isWritable()
325 {
326 return _endp.isWritable();
327 }
328
329 public boolean hasProgressed()
330 {
331 return _endp.hasProgressed();
332 }
333
334 public int flush(Buffer buffer) throws IOException
335 {
336 return _endp.flush(buffer);
337 }
338
339 public void scheduleTimeout(Task task, long timeoutMs)
340 {
341 _endp.scheduleTimeout(task,timeoutMs);
342 }
343
344 public void cancelTimeout(Task task)
345 {
346 _endp.cancelTimeout(task);
347 }
348
349 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
350 {
351 return _endp.flush(header,buffer,trailer);
352 }
353
354 public String getLocalAddr()
355 {
356 return _endp.getLocalAddr();
357 }
358
359 public String getLocalHost()
360 {
361 return _endp.getLocalHost();
362 }
363
364 public int getLocalPort()
365 {
366 return _endp.getLocalPort();
367 }
368
369 public String getRemoteAddr()
370 {
371 return _endp.getRemoteAddr();
372 }
373
374 public String getRemoteHost()
375 {
376 return _endp.getRemoteHost();
377 }
378
379 public int getRemotePort()
380 {
381 return _endp.getRemotePort();
382 }
383
384 public boolean isBlocking()
385 {
386 return _endp.isBlocking();
387 }
388
389 public boolean blockReadable(long millisecs) throws IOException
390 {
391 return _endp.blockReadable(millisecs);
392 }
393
394 public boolean blockWritable(long millisecs) throws IOException
395 {
396 return _endp.blockWritable(millisecs);
397 }
398
399 public boolean isOpen()
400 {
401 return _endp.isOpen();
402 }
403
404 public Object getTransport()
405 {
406 return _endp.getTransport();
407 }
408
409 public void flush() throws IOException
410 {
411 _endp.flush();
412 }
413
414 public int getMaxIdleTime()
415 {
416 return _endp.getMaxIdleTime();
417 }
418
419 public void setMaxIdleTime(int timeMs) throws IOException
420 {
421 _endp.setMaxIdleTime(timeMs);
422 }
423
424 public void onIdleExpired(long idleForMs)
425 {
426 _endp.onIdleExpired(idleForMs);
427 }
428
429 public void setCheckForIdle(boolean check)
430 {
431 _endp.setCheckForIdle(check);
432 }
433
434 public boolean isCheckForIdle()
435 {
436 return _endp.isCheckForIdle();
437 }
438
439 public String toString()
440 {
441 return "Upgradable:"+_endp.toString();
442 }
443 }
444 }