1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.nio.channels.AsynchronousCloseException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.RejectedExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.eclipse.jetty.client.api.Connection;
32 import org.eclipse.jetty.client.api.Destination;
33 import org.eclipse.jetty.client.api.ProxyConfiguration;
34 import org.eclipse.jetty.client.api.Request;
35 import org.eclipse.jetty.client.api.Response;
36 import org.eclipse.jetty.client.api.Result;
37 import org.eclipse.jetty.http.HttpField;
38 import org.eclipse.jetty.http.HttpHeader;
39 import org.eclipse.jetty.http.HttpMethod;
40 import org.eclipse.jetty.http.HttpScheme;
41 import org.eclipse.jetty.util.BlockingArrayQueue;
42 import org.eclipse.jetty.util.Promise;
43 import org.eclipse.jetty.util.component.ContainerLifeCycle;
44 import org.eclipse.jetty.util.component.Dumpable;
45 import org.eclipse.jetty.util.log.Log;
46 import org.eclipse.jetty.util.log.Logger;
47
48 public class HttpDestination implements Destination, AutoCloseable, Dumpable
49 {
50 private static final Logger LOG = Log.getLogger(HttpDestination.class);
51
52 private final AtomicInteger connectionCount = new AtomicInteger();
53 private final HttpClient client;
54 private final String scheme;
55 private final String host;
56 private final Address address;
57 private final Queue<HttpExchange> exchanges;
58 private final BlockingQueue<Connection> idleConnections;
59 private final BlockingQueue<Connection> activeConnections;
60 private final RequestNotifier requestNotifier;
61 private final ResponseNotifier responseNotifier;
62 private final Address proxyAddress;
63 private final HttpField hostField;
64
65 public HttpDestination(HttpClient client, String scheme, String host, int port)
66 {
67 this.client = client;
68 this.scheme = scheme;
69 this.host = host;
70 this.address = new Address(host, port);
71
72 int maxRequestsQueued = client.getMaxRequestsQueuedPerDestination();
73 int capacity = Math.min(32, maxRequestsQueued);
74 this.exchanges = new BlockingArrayQueue<>(capacity, capacity, maxRequestsQueued);
75
76 int maxConnections = client.getMaxConnectionsPerDestination();
77 capacity = Math.min(8, maxConnections);
78 this.idleConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
79 this.activeConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
80
81 this.requestNotifier = new RequestNotifier(client);
82 this.responseNotifier = new ResponseNotifier(client);
83
84 ProxyConfiguration proxyConfig = client.getProxyConfiguration();
85 proxyAddress = proxyConfig != null && proxyConfig.matches(host, port) ?
86 new Address(proxyConfig.getHost(), proxyConfig.getPort()) : null;
87
88 hostField = new HttpField(HttpHeader.HOST, host + ":" + port);
89 }
90
91 protected BlockingQueue<Connection> getIdleConnections()
92 {
93 return idleConnections;
94 }
95
96 protected BlockingQueue<Connection> getActiveConnections()
97 {
98 return activeConnections;
99 }
100
101 public RequestNotifier getRequestNotifier()
102 {
103 return requestNotifier;
104 }
105
106 public ResponseNotifier getResponseNotifier()
107 {
108 return responseNotifier;
109 }
110
111 @Override
112 public String getScheme()
113 {
114 return scheme;
115 }
116
117 @Override
118 public String getHost()
119 {
120
121
122 return host;
123 }
124
125 @Override
126 public int getPort()
127 {
128 return address.getPort();
129 }
130
131 public Address getConnectAddress()
132 {
133 return isProxied() ? proxyAddress : address;
134 }
135
136 public boolean isProxied()
137 {
138 return proxyAddress != null;
139 }
140
141 public HttpField getHostField()
142 {
143 return hostField;
144 }
145
146 public void send(Request request, List<Response.ResponseListener> listeners)
147 {
148 if (!scheme.equals(request.getScheme()))
149 throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
150 if (!getHost().equals(request.getHost()))
151 throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
152 int port = request.getPort();
153 if (port >= 0 && getPort() != port)
154 throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
155
156 HttpConversation conversation = client.getConversation(request.getConversationID(), true);
157 HttpExchange exchange = new HttpExchange(conversation, this, request, listeners);
158
159 if (client.isRunning())
160 {
161 if (exchanges.offer(exchange))
162 {
163 if (!client.isRunning() && exchanges.remove(exchange))
164 {
165 throw new RejectedExecutionException(client + " is stopping");
166 }
167 else
168 {
169 LOG.debug("Queued {}", request);
170 requestNotifier.notifyQueued(request);
171 Connection connection = acquire();
172 if (connection != null)
173 process(connection, false);
174 }
175 }
176 else
177 {
178 throw new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded");
179 }
180 }
181 else
182 {
183 throw new RejectedExecutionException(client + " is stopped");
184 }
185 }
186
187 public void newConnection(Promise<Connection> promise)
188 {
189 createConnection(new ProxyPromise(promise));
190 }
191
192 protected void createConnection(Promise<Connection> promise)
193 {
194 client.newConnection(this, promise);
195 }
196
197 protected Connection acquire()
198 {
199 Connection result = idleConnections.poll();
200 if (result != null)
201 return result;
202
203 final int maxConnections = client.getMaxConnectionsPerDestination();
204 while (true)
205 {
206 int current = connectionCount.get();
207 final int next = current + 1;
208
209 if (next > maxConnections)
210 {
211 LOG.debug("Max connections {} reached for {}", current, this);
212
213 return idleConnections.poll();
214 }
215
216 if (connectionCount.compareAndSet(current, next))
217 {
218 LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this);
219
220
221 Promise<Connection> promise = new Promise<Connection>()
222 {
223 @Override
224 public void succeeded(Connection connection)
225 {
226 process(connection, true);
227 }
228
229 @Override
230 public void failed(final Throwable x)
231 {
232 client.getExecutor().execute(new Runnable()
233 {
234 @Override
235 public void run()
236 {
237 abort(x);
238 }
239 });
240 }
241 };
242
243
244
245
246 createConnection(new ProxyPromise(promise)
247 {
248 @Override
249 public void succeeded(Connection connection)
250 {
251 LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this);
252 super.succeeded(connection);
253 }
254
255 @Override
256 public void failed(Throwable x)
257 {
258 LOG.debug("Connection failed {} for {}", x, HttpDestination.this);
259 connectionCount.decrementAndGet();
260 super.failed(x);
261 }
262 });
263
264
265 return idleConnections.poll();
266 }
267 }
268 }
269
270 private void abort(Throwable cause)
271 {
272 HttpExchange exchange;
273 while ((exchange = exchanges.poll()) != null)
274 abort(exchange, cause);
275 }
276
277
278
279
280
281
282
283
284
285
286
287 protected void process(Connection connection, boolean dispatch)
288 {
289
290 final HttpConnection httpConnection = (HttpConnection)connection;
291
292 final HttpExchange exchange = exchanges.poll();
293 if (exchange == null)
294 {
295 LOG.debug("{} idle", httpConnection);
296 if (!idleConnections.offer(httpConnection))
297 {
298 LOG.debug("{} idle overflow");
299 httpConnection.close();
300 }
301 if (!client.isRunning())
302 {
303 LOG.debug("{} is stopping", client);
304 remove(httpConnection);
305 httpConnection.close();
306 }
307 }
308 else
309 {
310 final Request request = exchange.getRequest();
311 Throwable cause = request.getAbortCause();
312 if (cause != null)
313 {
314 abort(exchange, cause);
315 LOG.debug("Aborted before processing {}: {}", exchange, cause);
316 }
317 else
318 {
319 LOG.debug("{} active", httpConnection);
320 if (!activeConnections.offer(httpConnection))
321 {
322 LOG.warn("{} active overflow");
323 }
324 if (dispatch)
325 {
326 client.getExecutor().execute(new Runnable()
327 {
328 @Override
329 public void run()
330 {
331 httpConnection.send(exchange);
332 }
333 });
334 }
335 else
336 {
337 httpConnection.send(exchange);
338 }
339 }
340 }
341 }
342
343 public void release(Connection connection)
344 {
345 LOG.debug("{} released", connection);
346 if (client.isRunning())
347 {
348 boolean removed = activeConnections.remove(connection);
349 if (removed)
350 process(connection, false);
351 else
352 LOG.debug("{} explicit", connection);
353 }
354 else
355 {
356 LOG.debug("{} is stopped", client);
357 remove(connection);
358 connection.close();
359 }
360 }
361
362 public void remove(Connection connection)
363 {
364 boolean removed = activeConnections.remove(connection);
365 removed |= idleConnections.remove(connection);
366 if (removed)
367 {
368 int open = connectionCount.decrementAndGet();
369 LOG.debug("Removed connection {} for {} - open: {}", connection, this, open);
370 }
371
372
373
374
375 if (!exchanges.isEmpty())
376 {
377 connection = acquire();
378 if (connection != null)
379 process(connection, false);
380 }
381 }
382
383 public void close()
384 {
385 for (Connection connection : idleConnections)
386 connection.close();
387 idleConnections.clear();
388
389
390 for (Connection connection : activeConnections)
391 connection.close();
392 activeConnections.clear();
393
394 abort(new AsynchronousCloseException());
395
396 connectionCount.set(0);
397
398 LOG.debug("Closed {}", this);
399 }
400
401 public boolean remove(HttpExchange exchange)
402 {
403 return exchanges.remove(exchange);
404 }
405
406 protected void abort(HttpExchange exchange, Throwable cause)
407 {
408 Request request = exchange.getRequest();
409 HttpResponse response = exchange.getResponse();
410 getRequestNotifier().notifyFailure(request, cause);
411 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
412 getResponseNotifier().notifyFailure(listeners, response, cause);
413 getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
414 }
415
416 @Override
417 public String dump()
418 {
419 return ContainerLifeCycle.dump(this);
420 }
421
422 @Override
423 public void dump(Appendable out, String indent) throws IOException
424 {
425 ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
426 List<String> connections = new ArrayList<>();
427 for (Connection connection : idleConnections)
428 connections.add(connection + " - IDLE");
429 for (Connection connection : activeConnections)
430 connections.add(connection + " - ACTIVE");
431 ContainerLifeCycle.dump(out, indent, connections);
432 }
433
434 @Override
435 public String toString()
436 {
437 return String.format("%s(%s://%s:%d)%s",
438 HttpDestination.class.getSimpleName(),
439 getScheme(),
440 getHost(),
441 getPort(),
442 proxyAddress == null ? "" : " via " + proxyAddress.getHost() + ":" + proxyAddress.getPort());
443 }
444
445
446
447
448
449
450
451 private class ProxyPromise implements Promise<Connection>
452 {
453 private final Promise<Connection> delegate;
454
455 private ProxyPromise(Promise<Connection> delegate)
456 {
457 this.delegate = delegate;
458 }
459
460 @Override
461 public void succeeded(Connection connection)
462 {
463 boolean tunnel = isProxied() &&
464 "https".equalsIgnoreCase(getScheme()) &&
465 client.getSslContextFactory() != null;
466 if (tunnel)
467 tunnel(connection);
468 else
469 delegate.succeeded(connection);
470 }
471
472 @Override
473 public void failed(Throwable x)
474 {
475 delegate.failed(x);
476 }
477
478 private void tunnel(final Connection connection)
479 {
480 String target = address.getHost() + ":" + address.getPort();
481 Request connect = client.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
482 .scheme(HttpScheme.HTTP.asString())
483 .method(HttpMethod.CONNECT)
484 .path(target)
485 .header(HttpHeader.HOST.asString(), target)
486 .timeout(client.getConnectTimeout(), TimeUnit.MILLISECONDS);
487 connection.send(connect, new Response.CompleteListener()
488 {
489 @Override
490 public void onComplete(Result result)
491 {
492 if (result.isFailed())
493 {
494 failed(result.getFailure());
495 connection.close();
496 }
497 else
498 {
499 Response response = result.getResponse();
500 if (response.getStatus() == 200)
501 {
502 delegate.succeeded(connection);
503 }
504 else
505 {
506 failed(new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
507 connection.close();
508 }
509 }
510 }
511 });
512 }
513 }
514 }