1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.net.SocketTimeoutException;
18 import java.nio.channels.SelectionKey;
19 import java.nio.channels.SocketChannel;
20 import java.nio.channels.UnresolvedAddressException;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import javax.net.ssl.SSLEngine;
24
25 import org.eclipse.jetty.io.AsyncEndPoint;
26 import org.eclipse.jetty.io.Buffer;
27 import org.eclipse.jetty.io.ConnectedEndPoint;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.nio.AsyncConnection;
30 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
31 import org.eclipse.jetty.io.nio.SelectorManager;
32 import org.eclipse.jetty.io.nio.SslConnection;
33 import org.eclipse.jetty.util.component.AggregateLifeCycle;
34 import org.eclipse.jetty.util.component.Dumpable;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.util.ssl.SslContextFactory;
38 import org.eclipse.jetty.util.thread.Timeout;
39 import org.eclipse.jetty.util.thread.Timeout.Task;
40
41 class SelectConnector extends AggregateLifeCycle implements HttpClient.Connector, Dumpable
42 {
43 private static final Logger LOG = Log.getLogger(SelectConnector.class);
44
45 private final HttpClient _httpClient;
46 private final Manager _selectorManager=new Manager();
47 private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
48
49
50
51
52
53
54 SelectConnector(HttpClient httpClient)
55 {
56 _httpClient = httpClient;
57 addBean(_httpClient,false);
58 addBean(_selectorManager,true);
59 }
60
61
62 public void startConnection( HttpDestination destination )
63 throws IOException
64 {
65 SocketChannel channel = null;
66 try
67 {
68 channel = SocketChannel.open();
69 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
70 channel.socket().setTcpNoDelay(true);
71
72 if (_httpClient.isConnectBlocking())
73 {
74 channel.socket().connect(address.toSocketAddress(), _httpClient.getConnectTimeout());
75 channel.configureBlocking(false);
76 _selectorManager.register( channel, destination );
77 }
78 else
79 {
80 channel.configureBlocking(false);
81 channel.connect(address.toSocketAddress());
82 _selectorManager.register(channel,destination);
83 ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
84 _httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
85 _connectingChannels.put(channel,connectTimeout);
86 }
87 }
88 catch (UnresolvedAddressException ex)
89 {
90 if (channel != null)
91 channel.close();
92 destination.onConnectionFailed(ex);
93 }
94 catch(IOException ex)
95 {
96 if (channel != null)
97 channel.close();
98 destination.onConnectionFailed(ex);
99 }
100 }
101
102
103 class Manager extends SelectorManager
104 {
105 Logger LOG = SelectConnector.LOG;
106
107 @Override
108 public boolean dispatch(Runnable task)
109 {
110 return _httpClient._threadPool.dispatch(task);
111 }
112
113 @Override
114 protected void endPointOpened(SelectChannelEndPoint endpoint)
115 {
116 }
117
118 @Override
119 protected void endPointClosed(SelectChannelEndPoint endpoint)
120 {
121 }
122
123 @Override
124 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
125 {
126 }
127
128 @Override
129 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
130 {
131 return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
132 }
133
134 @Override
135 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
136 {
137
138 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
139 if (connectTimeout != null)
140 connectTimeout.cancel();
141 if (LOG.isDebugEnabled())
142 LOG.debug("Channels with connection pending: {}", _connectingChannels.size());
143
144
145 HttpDestination dest=(HttpDestination)key.attachment();
146
147 SelectChannelEndPoint scep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
148 AsyncEndPoint ep = scep;
149
150 if (dest.isSecure())
151 {
152 LOG.debug("secure to {}, proxied={}",channel,dest.isProxied());
153 ep = new UpgradableEndPoint(ep,newSslEngine(channel));
154 }
155
156 AsyncConnection connection = selectSet.getManager().newConnection(channel,ep, key.attachment());
157 ep.setConnection(connection);
158
159 AbstractHttpConnection httpConnection=(AbstractHttpConnection)connection;
160 httpConnection.setDestination(dest);
161
162 if (dest.isSecure() && !dest.isProxied())
163 ((UpgradableEndPoint)ep).upgrade();
164
165 dest.onNewConnection(httpConnection);
166
167 return scep;
168 }
169
170 private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
171 {
172 SslContextFactory sslContextFactory = _httpClient.getSslContextFactory();
173 SSLEngine sslEngine;
174 if (channel != null)
175 {
176 String peerHost = channel.socket().getInetAddress().getHostAddress();
177 int peerPort = channel.socket().getPort();
178 sslEngine = sslContextFactory.newSslEngine(peerHost, peerPort);
179 }
180 else
181 {
182 sslEngine = sslContextFactory.newSslEngine();
183 }
184 sslEngine.setUseClientMode(true);
185 sslEngine.beginHandshake();
186
187 return sslEngine;
188 }
189
190
191
192
193
194 @Override
195 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
196 {
197 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
198 if (connectTimeout != null)
199 connectTimeout.cancel();
200
201 if (attachment instanceof HttpDestination)
202 ((HttpDestination)attachment).onConnectionFailed(ex);
203 else
204 super.connectionFailed(channel,ex,attachment);
205 }
206 }
207
208 private class ConnectTimeout extends Timeout.Task
209 {
210 private final SocketChannel channel;
211 private final HttpDestination destination;
212
213 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
214 {
215 this.channel = channel;
216 this.destination = destination;
217 }
218
219 @Override
220 public void expired()
221 {
222 if (channel.isConnectionPending())
223 {
224 LOG.debug("Channel {} timed out while connecting, closing it", channel);
225 try
226 {
227
228 channel.close();
229 }
230 catch (IOException x)
231 {
232 LOG.ignore(x);
233 }
234 destination.onConnectionFailed(new SocketTimeoutException());
235 }
236 }
237 }
238
239 public static class UpgradableEndPoint implements AsyncEndPoint
240 {
241 AsyncEndPoint _endp;
242 SSLEngine _engine;
243
244 public UpgradableEndPoint(AsyncEndPoint endp, SSLEngine engine) throws IOException
245 {
246 _engine=engine;
247 _endp=endp;
248 }
249
250 public void upgrade()
251 {
252 AsyncHttpConnection connection = (AsyncHttpConnection)_endp.getConnection();
253
254 SslConnection sslConnection = new SslConnection(_engine,_endp);
255 _endp.setConnection(sslConnection);
256
257 _endp=sslConnection.getSslEndPoint();
258 sslConnection.getSslEndPoint().setConnection(connection);
259
260 LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection);
261 }
262
263
264 public Connection getConnection()
265 {
266 return _endp.getConnection();
267 }
268
269 public void setConnection(Connection connection)
270 {
271 _endp.setConnection(connection);
272 }
273
274 public void shutdownOutput() throws IOException
275 {
276 _endp.shutdownOutput();
277 }
278
279 public void dispatch()
280 {
281 _endp.asyncDispatch();
282 }
283
284 public void asyncDispatch()
285 {
286 _endp.asyncDispatch();
287 }
288
289 public boolean isOutputShutdown()
290 {
291 return _endp.isOutputShutdown();
292 }
293
294 public void shutdownInput() throws IOException
295 {
296 _endp.shutdownInput();
297 }
298
299 public void scheduleWrite()
300 {
301 _endp.scheduleWrite();
302 }
303
304 public boolean isInputShutdown()
305 {
306 return _endp.isInputShutdown();
307 }
308
309 public void close() throws IOException
310 {
311 _endp.close();
312 }
313
314 public int fill(Buffer buffer) throws IOException
315 {
316 return _endp.fill(buffer);
317 }
318
319 public boolean isWritable()
320 {
321 return _endp.isWritable();
322 }
323
324 public boolean hasProgressed()
325 {
326 return _endp.hasProgressed();
327 }
328
329 public int flush(Buffer buffer) throws IOException
330 {
331 return _endp.flush(buffer);
332 }
333
334 public void scheduleTimeout(Task task, long timeoutMs)
335 {
336 _endp.scheduleTimeout(task,timeoutMs);
337 }
338
339 public void cancelTimeout(Task task)
340 {
341 _endp.cancelTimeout(task);
342 }
343
344 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
345 {
346 return _endp.flush(header,buffer,trailer);
347 }
348
349 public String getLocalAddr()
350 {
351 return _endp.getLocalAddr();
352 }
353
354 public String getLocalHost()
355 {
356 return _endp.getLocalHost();
357 }
358
359 public int getLocalPort()
360 {
361 return _endp.getLocalPort();
362 }
363
364 public String getRemoteAddr()
365 {
366 return _endp.getRemoteAddr();
367 }
368
369 public String getRemoteHost()
370 {
371 return _endp.getRemoteHost();
372 }
373
374 public int getRemotePort()
375 {
376 return _endp.getRemotePort();
377 }
378
379 public boolean isBlocking()
380 {
381 return _endp.isBlocking();
382 }
383
384 public boolean blockReadable(long millisecs) throws IOException
385 {
386 return _endp.blockReadable(millisecs);
387 }
388
389 public boolean blockWritable(long millisecs) throws IOException
390 {
391 return _endp.blockWritable(millisecs);
392 }
393
394 public boolean isOpen()
395 {
396 return _endp.isOpen();
397 }
398
399 public Object getTransport()
400 {
401 return _endp.getTransport();
402 }
403
404 public void flush() throws IOException
405 {
406 _endp.flush();
407 }
408
409 public int getMaxIdleTime()
410 {
411 return _endp.getMaxIdleTime();
412 }
413
414 public void setMaxIdleTime(int timeMs) throws IOException
415 {
416 _endp.setMaxIdleTime(timeMs);
417 }
418
419 public void onIdleExpired(long idleForMs)
420 {
421 _endp.onIdleExpired(idleForMs);
422 }
423
424 public void setCheckForIdle(boolean check)
425 {
426 _endp.setCheckForIdle(check);
427 }
428
429 public boolean isCheckForIdle()
430 {
431 return _endp.isCheckForIdle();
432 }
433
434 public String toString()
435 {
436 return "Upgradable:"+_endp.toString();
437 }
438 }
439 }