1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.URI;
24 import java.nio.channels.AsynchronousCloseException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Queue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.eclipse.jetty.client.api.Connection;
34 import org.eclipse.jetty.client.api.Destination;
35 import org.eclipse.jetty.client.api.ProxyConfiguration;
36 import org.eclipse.jetty.client.api.Request;
37 import org.eclipse.jetty.client.api.Response;
38 import org.eclipse.jetty.client.api.Result;
39 import org.eclipse.jetty.http.HttpField;
40 import org.eclipse.jetty.http.HttpHeader;
41 import org.eclipse.jetty.http.HttpMethod;
42 import org.eclipse.jetty.http.HttpScheme;
43 import org.eclipse.jetty.util.BlockingArrayQueue;
44 import org.eclipse.jetty.util.Promise;
45 import org.eclipse.jetty.util.component.ContainerLifeCycle;
46 import org.eclipse.jetty.util.component.Dumpable;
47 import org.eclipse.jetty.util.log.Log;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.ssl.SslContextFactory;
50
51 public class HttpDestination implements Destination, Closeable, Dumpable
52 {
53 private static final Logger LOG = Log.getLogger(HttpDestination.class);
54
55 private final AtomicInteger connectionCount = new AtomicInteger();
56 private final HttpClient client;
57 private final String scheme;
58 private final String host;
59 private final Address address;
60 private final Queue<HttpExchange> exchanges;
61 private final BlockingQueue<Connection> idleConnections;
62 private final BlockingQueue<Connection> activeConnections;
63 private final RequestNotifier requestNotifier;
64 private final ResponseNotifier responseNotifier;
65 private final Address proxyAddress;
66 private final HttpField hostField;
67
68 public HttpDestination(HttpClient client, String scheme, String host, int port)
69 {
70 this.client = client;
71 this.scheme = scheme;
72 this.host = host;
73 this.address = new Address(host, port);
74
75 int maxRequestsQueued = client.getMaxRequestsQueuedPerDestination();
76 int capacity = Math.min(32, maxRequestsQueued);
77 this.exchanges = new BlockingArrayQueue<>(capacity, capacity, maxRequestsQueued);
78
79 int maxConnections = client.getMaxConnectionsPerDestination();
80 capacity = Math.min(8, maxConnections);
81 this.idleConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
82 this.activeConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
83
84 this.requestNotifier = new RequestNotifier(client);
85 this.responseNotifier = new ResponseNotifier(client);
86
87 ProxyConfiguration proxyConfig = client.getProxyConfiguration();
88 proxyAddress = proxyConfig != null && proxyConfig.matches(host, port) ?
89 new Address(proxyConfig.getHost(), proxyConfig.getPort()) : null;
90
91 if (!client.isDefaultPort(scheme, port))
92 host += ":" + port;
93 hostField = new HttpField(HttpHeader.HOST, host);
94 }
95
96 protected BlockingQueue<Connection> getIdleConnections()
97 {
98 return idleConnections;
99 }
100
101 protected BlockingQueue<Connection> getActiveConnections()
102 {
103 return activeConnections;
104 }
105
106 public RequestNotifier getRequestNotifier()
107 {
108 return requestNotifier;
109 }
110
111 public ResponseNotifier getResponseNotifier()
112 {
113 return responseNotifier;
114 }
115
116 @Override
117 public String getScheme()
118 {
119 return scheme;
120 }
121
122 @Override
123 public String getHost()
124 {
125
126
127 return host;
128 }
129
130 @Override
131 public int getPort()
132 {
133 return address.getPort();
134 }
135
136 public Address getConnectAddress()
137 {
138 return isProxied() ? proxyAddress : address;
139 }
140
141 public boolean isProxied()
142 {
143 return proxyAddress != null;
144 }
145
146 public URI getProxyURI()
147 {
148 ProxyConfiguration proxyConfiguration = client.getProxyConfiguration();
149 String uri = getScheme() + "://" + proxyConfiguration.getHost();
150 if (!client.isDefaultPort(getScheme(), proxyConfiguration.getPort()))
151 uri += ":" + proxyConfiguration.getPort();
152 return URI.create(uri);
153 }
154
155 public HttpField getHostField()
156 {
157 return hostField;
158 }
159
160 public void send(Request request, List<Response.ResponseListener> listeners)
161 {
162 if (!scheme.equals(request.getScheme()))
163 throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
164 if (!getHost().equals(request.getHost()))
165 throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
166 int port = request.getPort();
167 if (port >= 0 && getPort() != port)
168 throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
169
170 HttpConversation conversation = client.getConversation(request.getConversationID(), true);
171 HttpExchange exchange = new HttpExchange(conversation, this, request, listeners);
172
173 if (client.isRunning())
174 {
175 if (exchanges.offer(exchange))
176 {
177 if (!client.isRunning() && exchanges.remove(exchange))
178 {
179 throw new RejectedExecutionException(client + " is stopping");
180 }
181 else
182 {
183 LOG.debug("Queued {}", request);
184 requestNotifier.notifyQueued(request);
185 Connection connection = acquire();
186 if (connection != null)
187 process(connection, false);
188 }
189 }
190 else
191 {
192 LOG.debug("Max queued exceeded {}", request);
193 abort(exchange, new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
194 }
195 }
196 else
197 {
198 throw new RejectedExecutionException(client + " is stopped");
199 }
200 }
201
202 public void newConnection(Promise<Connection> promise)
203 {
204 createConnection(new ProxyPromise(promise));
205 }
206
207 protected void createConnection(Promise<Connection> promise)
208 {
209 client.newConnection(this, promise);
210 }
211
212 protected Connection acquire()
213 {
214 Connection result = idleConnections.poll();
215 if (result != null)
216 return result;
217
218 final int maxConnections = client.getMaxConnectionsPerDestination();
219 while (true)
220 {
221 int current = connectionCount.get();
222 final int next = current + 1;
223
224 if (next > maxConnections)
225 {
226 LOG.debug("Max connections per destination {} exceeded for {}", current, this);
227
228 return idleConnections.poll();
229 }
230
231 if (connectionCount.compareAndSet(current, next))
232 {
233 LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this);
234
235
236 Promise<Connection> promise = new Promise<Connection>()
237 {
238 @Override
239 public void succeeded(Connection connection)
240 {
241 process(connection, true);
242 }
243
244 @Override
245 public void failed(final Throwable x)
246 {
247 client.getExecutor().execute(new Runnable()
248 {
249 @Override
250 public void run()
251 {
252 abort(x);
253 }
254 });
255 }
256 };
257
258
259
260
261 createConnection(new ProxyPromise(promise)
262 {
263 @Override
264 public void succeeded(Connection connection)
265 {
266 LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this);
267 super.succeeded(connection);
268 }
269
270 @Override
271 public void failed(Throwable x)
272 {
273 LOG.debug("Connection failed {} for {}", x, HttpDestination.this);
274 connectionCount.decrementAndGet();
275 super.failed(x);
276 }
277 });
278
279
280 return idleConnections.poll();
281 }
282 }
283 }
284
285 private void abort(Throwable cause)
286 {
287 HttpExchange exchange;
288 while ((exchange = exchanges.poll()) != null)
289 abort(exchange, cause);
290 }
291
292
293
294
295
296
297
298
299
300
301
302 protected void process(Connection connection, boolean dispatch)
303 {
304
305 final HttpConnection httpConnection = (HttpConnection)connection;
306
307 final HttpExchange exchange = exchanges.poll();
308 if (exchange == null)
309 {
310 LOG.debug("{} idle", httpConnection);
311 if (!idleConnections.offer(httpConnection))
312 {
313 LOG.debug("{} idle overflow");
314 httpConnection.close();
315 }
316 if (!client.isRunning())
317 {
318 LOG.debug("{} is stopping", client);
319 remove(httpConnection);
320 httpConnection.close();
321 }
322 }
323 else
324 {
325 final Request request = exchange.getRequest();
326 Throwable cause = request.getAbortCause();
327 if (cause != null)
328 {
329 abort(exchange, cause);
330 LOG.debug("Aborted before processing {}: {}", exchange, cause);
331 }
332 else
333 {
334 LOG.debug("{} active", httpConnection);
335 if (!activeConnections.offer(httpConnection))
336 {
337 LOG.warn("{} active overflow");
338 }
339 if (dispatch)
340 {
341 client.getExecutor().execute(new Runnable()
342 {
343 @Override
344 public void run()
345 {
346 httpConnection.send(exchange);
347 }
348 });
349 }
350 else
351 {
352 httpConnection.send(exchange);
353 }
354 }
355 }
356 }
357
358 public void release(Connection connection)
359 {
360 LOG.debug("{} released", connection);
361 if (client.isRunning())
362 {
363 boolean removed = activeConnections.remove(connection);
364 if (removed)
365 process(connection, false);
366 else
367 LOG.debug("{} explicit", connection);
368 }
369 else
370 {
371 LOG.debug("{} is stopped", client);
372 remove(connection);
373 connection.close();
374 }
375 }
376
377 public void remove(Connection connection)
378 {
379 boolean removed = activeConnections.remove(connection);
380 removed |= idleConnections.remove(connection);
381 if (removed)
382 {
383 int open = connectionCount.decrementAndGet();
384 LOG.debug("Removed connection {} for {} - open: {}", connection, this, open);
385 }
386
387
388
389
390 if (!exchanges.isEmpty())
391 {
392 connection = acquire();
393 if (connection != null)
394 process(connection, false);
395 }
396 }
397
398 public void close()
399 {
400 for (Connection connection : idleConnections)
401 connection.close();
402 idleConnections.clear();
403
404
405 for (Connection connection : activeConnections)
406 connection.close();
407 activeConnections.clear();
408
409 abort(new AsynchronousCloseException());
410
411 connectionCount.set(0);
412
413 LOG.debug("Closed {}", this);
414 }
415
416 public boolean remove(HttpExchange exchange)
417 {
418 return exchanges.remove(exchange);
419 }
420
421 protected void abort(HttpExchange exchange, Throwable cause)
422 {
423 Request request = exchange.getRequest();
424 HttpResponse response = exchange.getResponse();
425 getRequestNotifier().notifyFailure(request, cause);
426 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
427 getResponseNotifier().notifyFailure(listeners, response, cause);
428 getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
429 }
430
431 protected void tunnelSucceeded(Connection connection, Promise<Connection> promise)
432 {
433
434 Connection tunnel = client.tunnel(connection);
435 promise.succeeded(tunnel);
436 }
437
438 protected void tunnelFailed(Connection connection, Promise<Connection> promise, Throwable failure)
439 {
440 promise.failed(failure);
441 connection.close();
442 }
443
444 @Override
445 public String dump()
446 {
447 return ContainerLifeCycle.dump(this);
448 }
449
450 @Override
451 public void dump(Appendable out, String indent) throws IOException
452 {
453 ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
454 List<String> connections = new ArrayList<>();
455 for (Connection connection : idleConnections)
456 connections.add(connection + " - IDLE");
457 for (Connection connection : activeConnections)
458 connections.add(connection + " - ACTIVE");
459 ContainerLifeCycle.dump(out, indent, connections);
460 }
461
462 @Override
463 public String toString()
464 {
465 return String.format("%s(%s://%s:%d)%s",
466 HttpDestination.class.getSimpleName(),
467 getScheme(),
468 getHost(),
469 getPort(),
470 proxyAddress == null ? "" : " via " + proxyAddress.getHost() + ":" + proxyAddress.getPort());
471 }
472
473
474
475
476
477
478
479 private class ProxyPromise implements Promise<Connection>
480 {
481 private final Promise<Connection> delegate;
482
483 private ProxyPromise(Promise<Connection> delegate)
484 {
485 this.delegate = delegate;
486 }
487
488 @Override
489 public void succeeded(Connection connection)
490 {
491 if (isProxied() && HttpScheme.HTTPS.is(getScheme()))
492 {
493 if (client.getSslContextFactory() != null)
494 {
495 tunnel(connection);
496 }
497 else
498 {
499 String message = String.format("Cannot perform requests over SSL, no %s in %s",
500 SslContextFactory.class.getSimpleName(), HttpClient.class.getSimpleName());
501 delegate.failed(new IllegalStateException(message));
502 }
503 }
504 else
505 {
506 delegate.succeeded(connection);
507 }
508 }
509
510 @Override
511 public void failed(Throwable x)
512 {
513 delegate.failed(x);
514 }
515
516 private void tunnel(final Connection connection)
517 {
518 String target = address.getHost() + ":" + address.getPort();
519 Request connect = client.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
520 .scheme(HttpScheme.HTTP.asString())
521 .method(HttpMethod.CONNECT)
522 .path(target)
523 .header(HttpHeader.HOST, target)
524 .timeout(client.getConnectTimeout(), TimeUnit.MILLISECONDS);
525 connection.send(connect, new Response.CompleteListener()
526 {
527 @Override
528 public void onComplete(Result result)
529 {
530 if (result.isFailed())
531 {
532 tunnelFailed(connection, delegate, result.getFailure());
533 }
534 else
535 {
536 Response response = result.getResponse();
537 if (response.getStatus() == 200)
538 {
539 tunnelSucceeded(connection, delegate);
540 }
541 else
542 {
543 tunnelFailed(connection, delegate, new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
544 }
545 }
546 }
547 });
548 }
549 }
550 }