1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.net.SocketTimeoutException;
18 import java.nio.channels.SelectionKey;
19 import java.nio.channels.SocketChannel;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import javax.net.ssl.SSLContext;
23 import javax.net.ssl.SSLEngine;
24 import javax.net.ssl.SSLSession;
25
26 import org.eclipse.jetty.http.HttpMethods;
27 import org.eclipse.jetty.http.HttpVersions;
28 import org.eclipse.jetty.io.Buffer;
29 import org.eclipse.jetty.io.Buffers;
30 import org.eclipse.jetty.io.ConnectedEndPoint;
31 import org.eclipse.jetty.io.Connection;
32 import org.eclipse.jetty.io.ThreadLocalBuffers;
33 import org.eclipse.jetty.io.nio.DirectNIOBuffer;
34 import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
35 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
36 import org.eclipse.jetty.io.nio.SelectorManager;
37 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
38 import org.eclipse.jetty.util.component.AbstractLifeCycle;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.thread.Timeout;
41
42 class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable
43 {
44 private final HttpClient _httpClient;
45 private final Manager _selectorManager=new Manager();
46 private final Timeout _connectTimer = new Timeout();
47 private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
48 private SSLContext _sslContext;
49 private Buffers _sslBuffers;
50 private boolean _blockingConnect;
51
52
53
54
55 SelectConnector(HttpClient httpClient)
56 {
57 _httpClient = httpClient;
58 }
59
60
61
62
63
64 public boolean isBlockingConnect()
65 {
66 return _blockingConnect;
67 }
68
69
70
71
72
73 public void setBlockingConnect(boolean blockingConnect)
74 {
75 _blockingConnect = blockingConnect;
76 }
77
78
79 @Override
80 protected void doStart() throws Exception
81 {
82 super.doStart();
83
84 _connectTimer.setDuration(_httpClient.getConnectTimeout());
85 _connectTimer.setNow();
86 _httpClient._threadPool.dispatch(new Runnable()
87 {
88 public void run()
89 {
90 while (isRunning())
91 {
92 _connectTimer.tick(System.currentTimeMillis());
93 try
94 {
95 Thread.sleep(200);
96 }
97 catch (InterruptedException x)
98 {
99 Thread.currentThread().interrupt();
100 break;
101 }
102 }
103 }
104 });
105
106 _selectorManager.start();
107
108 final boolean direct=_httpClient.getUseDirectBuffers();
109
110 SSLEngine sslEngine=_selectorManager.newSslEngine();
111 final SSLSession ssl_session=sslEngine.getSession();
112 ThreadLocalBuffers ssl_buffers = new ThreadLocalBuffers()
113 {
114 {
115 super.setBufferSize(ssl_session.getApplicationBufferSize());
116 super.setHeaderSize(ssl_session.getApplicationBufferSize());
117 }
118
119 @Override
120 protected Buffer newBuffer(int size)
121 {
122 return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
123 }
124 @Override
125 protected Buffer newHeader(int size)
126 {
127 return direct?new DirectNIOBuffer(size):new IndirectNIOBuffer(size);
128 }
129 @Override
130 protected boolean isHeader(Buffer buffer)
131 {
132 return true;
133 }
134
135 @Override
136 public void setBufferSize(int size)
137 {
138 }
139
140 @Override
141 public void setHeaderSize(int size)
142 {
143 }
144 };
145 _sslBuffers=ssl_buffers;
146
147 _httpClient._threadPool.dispatch(this);
148 }
149
150
151 @Override
152 protected void doStop() throws Exception
153 {
154 _connectTimer.cancelAll();
155 _selectorManager.stop();
156 }
157
158
159 public void startConnection( HttpDestination destination )
160 throws IOException
161 {
162 SocketChannel channel = SocketChannel.open();
163 Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
164 channel.configureBlocking( false );
165 channel.socket().setTcpNoDelay(true);
166 channel.connect(address.toSocketAddress());
167 _selectorManager.register( channel, destination );
168 ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination);
169 _connectTimer.schedule(connectTimeout);
170 _connectingChannels.put(channel, connectTimeout);
171 }
172
173
174 public void run()
175 {
176 while (_httpClient.isRunning())
177 {
178 try
179 {
180 _selectorManager.doSelect(0);
181 }
182 catch (Exception e)
183 {
184 Log.warn(e.toString());
185 Log.debug(e);
186 Thread.yield();
187 }
188 }
189 }
190
191
192 class Manager extends SelectorManager
193 {
194 @Override
195 protected SocketChannel acceptChannel(SelectionKey key) throws IOException
196 {
197 throw new IllegalStateException();
198 }
199
200 @Override
201 public boolean dispatch(Runnable task)
202 {
203 return SelectConnector.this._httpClient._threadPool.dispatch(task);
204 }
205
206 @Override
207 protected void endPointOpened(SelectChannelEndPoint endpoint)
208 {
209 }
210
211 @Override
212 protected void endPointClosed(SelectChannelEndPoint endpoint)
213 {
214 }
215
216 @Override
217 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
218 {
219 }
220
221 @Override
222 protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
223 {
224 if (endpoint instanceof SslSelectChannelEndPoint)
225 return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
226
227 return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
228 }
229
230 @Override
231 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
232 {
233
234 Timeout.Task connectTimeout = _connectingChannels.remove(channel);
235 if (connectTimeout != null)
236 connectTimeout.cancel();
237 Log.debug("Channels with connection pending: {}", _connectingChannels.size());
238
239
240 HttpDestination dest=(HttpDestination)key.attachment();
241
242 SelectChannelEndPoint ep=null;
243
244 if (dest.isSecure())
245 {
246 if (dest.isProxied())
247 {
248 String connect = HttpMethods.CONNECT+" "+dest.getAddress()+HttpVersions.HTTP_1_0+"\r\n\r\n";
249
250
251 throw new IllegalStateException("Not Implemented");
252 }
253
254 SSLEngine engine=newSslEngine();
255 ep = new SslSelectChannelEndPoint(_sslBuffers,channel,selectSet,key,engine);
256 }
257 else
258 {
259 ep=new SelectChannelEndPoint(channel,selectSet,key);
260 }
261
262 HttpConnection connection=(HttpConnection)ep.getConnection();
263 connection.setDestination(dest);
264 dest.onNewConnection(connection);
265 return ep;
266 }
267
268 private synchronized SSLEngine newSslEngine() throws IOException
269 {
270 if (_sslContext==null)
271 {
272 _sslContext = SelectConnector.this._httpClient.getSSLContext();
273 }
274
275 SSLEngine sslEngine = _sslContext.createSSLEngine();
276 sslEngine.setUseClientMode(true);
277 sslEngine.beginHandshake();
278
279 return sslEngine;
280 }
281
282
283
284
285
286 @Override
287 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
288 {
289 if (attachment instanceof HttpDestination)
290 ((HttpDestination)attachment).onConnectionFailed(ex);
291 else
292 super.connectionFailed(channel,ex,attachment);
293 }
294 }
295
296 private class ConnectTimeout extends Timeout.Task
297 {
298 private final SocketChannel channel;
299 private final HttpDestination destination;
300
301 public ConnectTimeout(SocketChannel channel, HttpDestination destination)
302 {
303 this.channel = channel;
304 this.destination = destination;
305 }
306
307 @Override
308 public void expired()
309 {
310 _connectingChannels.remove(channel);
311 if (channel.isConnectionPending())
312 {
313 Log.debug("Channel {} timed out while connecting, closing it", channel);
314 try
315 {
316
317 channel.close();
318 }
319 catch (IOException x)
320 {
321 Log.ignore(x);
322 }
323 destination.onConnectionFailed(new SocketTimeoutException());
324 }
325 }
326 }
327 }