View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.net.URI;
24  import java.nio.channels.AsynchronousCloseException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Queue;
28  import java.util.concurrent.BlockingQueue;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.eclipse.jetty.client.api.Connection;
34  import org.eclipse.jetty.client.api.Destination;
35  import org.eclipse.jetty.client.api.ProxyConfiguration;
36  import org.eclipse.jetty.client.api.Request;
37  import org.eclipse.jetty.client.api.Response;
38  import org.eclipse.jetty.client.api.Result;
39  import org.eclipse.jetty.http.HttpField;
40  import org.eclipse.jetty.http.HttpHeader;
41  import org.eclipse.jetty.http.HttpMethod;
42  import org.eclipse.jetty.http.HttpScheme;
43  import org.eclipse.jetty.util.BlockingArrayQueue;
44  import org.eclipse.jetty.util.Promise;
45  import org.eclipse.jetty.util.component.ContainerLifeCycle;
46  import org.eclipse.jetty.util.component.Dumpable;
47  import org.eclipse.jetty.util.log.Log;
48  import org.eclipse.jetty.util.log.Logger;
49  import org.eclipse.jetty.util.ssl.SslContextFactory;
50  
51  public class HttpDestination implements Destination, Closeable, Dumpable
52  {
53      private static final Logger LOG = Log.getLogger(HttpDestination.class);
54  
55      private final AtomicInteger connectionCount = new AtomicInteger();
56      private final HttpClient client;
57      private final String scheme;
58      private final String host;
59      private final Address address;
60      private final Queue<HttpExchange> exchanges;
61      private final BlockingQueue<Connection> idleConnections;
62      private final BlockingQueue<Connection> activeConnections;
63      private final RequestNotifier requestNotifier;
64      private final ResponseNotifier responseNotifier;
65      private final Address proxyAddress;
66      private final HttpField hostField;
67  
68      public HttpDestination(HttpClient client, String scheme, String host, int port)
69      {
70          this.client = client;
71          this.scheme = scheme;
72          this.host = host;
73          this.address = new Address(host, port);
74  
75          int maxRequestsQueued = client.getMaxRequestsQueuedPerDestination();
76          int capacity = Math.min(32, maxRequestsQueued);
77          this.exchanges = new BlockingArrayQueue<>(capacity, capacity, maxRequestsQueued);
78  
79          int maxConnections = client.getMaxConnectionsPerDestination();
80          capacity = Math.min(8, maxConnections);
81          this.idleConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
82          this.activeConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
83  
84          this.requestNotifier = new RequestNotifier(client);
85          this.responseNotifier = new ResponseNotifier(client);
86  
87          ProxyConfiguration proxyConfig = client.getProxyConfiguration();
88          proxyAddress = proxyConfig != null && proxyConfig.matches(host, port) ?
89                  new Address(proxyConfig.getHost(), proxyConfig.getPort()) : null;
90  
91          if (!client.isDefaultPort(scheme, port))
92              host += ":" + port;
93          hostField = new HttpField(HttpHeader.HOST, host);
94      }
95  
96      protected BlockingQueue<Connection> getIdleConnections()
97      {
98          return idleConnections;
99      }
100 
101     protected BlockingQueue<Connection> getActiveConnections()
102     {
103         return activeConnections;
104     }
105 
106     public RequestNotifier getRequestNotifier()
107     {
108         return requestNotifier;
109     }
110 
111     public ResponseNotifier getResponseNotifier()
112     {
113         return responseNotifier;
114     }
115 
116     @Override
117     public String getScheme()
118     {
119         return scheme;
120     }
121 
122     @Override
123     public String getHost()
124     {
125         // InetSocketAddress.getHostString() transforms the host string
126         // in case of IPv6 addresses, so we return the original host string
127         return host;
128     }
129 
130     @Override
131     public int getPort()
132     {
133         return address.getPort();
134     }
135 
136     public Address getConnectAddress()
137     {
138         return isProxied() ? proxyAddress : address;
139     }
140 
141     public boolean isProxied()
142     {
143         return proxyAddress != null;
144     }
145 
146     public URI getProxyURI()
147     {
148         ProxyConfiguration proxyConfiguration = client.getProxyConfiguration();
149         String uri = getScheme() + "://" + proxyConfiguration.getHost();
150         if (!client.isDefaultPort(getScheme(), proxyConfiguration.getPort()))
151             uri += ":" + proxyConfiguration.getPort();
152         return URI.create(uri);
153     }
154 
155     public HttpField getHostField()
156     {
157         return hostField;
158     }
159 
160     public void send(Request request, List<Response.ResponseListener> listeners)
161     {
162         if (!scheme.equals(request.getScheme()))
163             throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
164         if (!getHost().equals(request.getHost()))
165             throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
166         int port = request.getPort();
167         if (port >= 0 && getPort() != port)
168             throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
169 
170         HttpConversation conversation = client.getConversation(request.getConversationID(), true);
171         HttpExchange exchange = new HttpExchange(conversation, this, request, listeners);
172 
173         if (client.isRunning())
174         {
175             if (exchanges.offer(exchange))
176             {
177                 if (!client.isRunning() && exchanges.remove(exchange))
178                 {
179                     throw new RejectedExecutionException(client + " is stopping");
180                 }
181                 else
182                 {
183                     LOG.debug("Queued {}", request);
184                     requestNotifier.notifyQueued(request);
185                     Connection connection = acquire();
186                     if (connection != null)
187                         process(connection, false);
188                 }
189             }
190             else
191             {
192                 LOG.debug("Max queued exceeded {}", request);
193                 abort(exchange, new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
194             }
195         }
196         else
197         {
198             throw new RejectedExecutionException(client + " is stopped");
199         }
200     }
201 
202     public void newConnection(Promise<Connection> promise)
203     {
204         createConnection(new ProxyPromise(promise));
205     }
206 
207     protected void createConnection(Promise<Connection> promise)
208     {
209         client.newConnection(this, promise);
210     }
211 
212     protected Connection acquire()
213     {
214         Connection result = idleConnections.poll();
215         if (result != null)
216             return result;
217 
218         final int maxConnections = client.getMaxConnectionsPerDestination();
219         while (true)
220         {
221             int current = connectionCount.get();
222             final int next = current + 1;
223 
224             if (next > maxConnections)
225             {
226                 LOG.debug("Max connections per destination {} exceeded for {}", current, this);
227                 // Try again the idle connections
228                 return idleConnections.poll();
229             }
230 
231             if (connectionCount.compareAndSet(current, next))
232             {
233                 LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this);
234 
235                 // This is the promise that is being called when a connection (eventually proxied) succeeds or fails.
236                 Promise<Connection> promise = new Promise<Connection>()
237                 {
238                     @Override
239                     public void succeeded(Connection connection)
240                     {
241                         process(connection, true);
242                     }
243 
244                     @Override
245                     public void failed(final Throwable x)
246                     {
247                         client.getExecutor().execute(new Runnable()
248                         {
249                             @Override
250                             public void run()
251                             {
252                                 abort(x);
253                             }
254                         });
255                     }
256                 };
257 
258                 // Create a new connection, and pass a ProxyPromise to establish a proxy tunnel, if needed.
259                 // Differently from the case where the connection is created explicitly by applications, here
260                 // we need to do a bit more logging and keep track of the connection count in case of failures.
261                 createConnection(new ProxyPromise(promise)
262                 {
263                     @Override
264                     public void succeeded(Connection connection)
265                     {
266                         LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this);
267                         super.succeeded(connection);
268                     }
269 
270                     @Override
271                     public void failed(Throwable x)
272                     {
273                         LOG.debug("Connection failed {} for {}", x, HttpDestination.this);
274                         connectionCount.decrementAndGet();
275                         super.failed(x);
276                     }
277                 });
278 
279                 // Try again the idle connections
280                 return idleConnections.poll();
281             }
282         }
283     }
284 
285     private void abort(Throwable cause)
286     {
287         HttpExchange exchange;
288         while ((exchange = exchanges.poll()) != null)
289             abort(exchange, cause);
290     }
291 
292     /**
293      * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
294      * <p>A new connection is created when a request needs to be executed; it is possible that the request that
295      * triggered the request creation is executed by another connection that was just released, so the new connection
296      * may become idle.</p>
297      * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
298      *
299      * @param connection the new connection
300      * @param dispatch whether to dispatch the processing to another thread
301      */
302     protected void process(Connection connection, boolean dispatch)
303     {
304         // Ugly cast, but lack of generic reification forces it
305         final HttpConnection httpConnection = (HttpConnection)connection;
306 
307         final HttpExchange exchange = exchanges.poll();
308         if (exchange == null)
309         {
310             LOG.debug("{} idle", httpConnection);
311             if (!idleConnections.offer(httpConnection))
312             {
313                 LOG.debug("{} idle overflow");
314                 httpConnection.close();
315             }
316             if (!client.isRunning())
317             {
318                 LOG.debug("{} is stopping", client);
319                 remove(httpConnection);
320                 httpConnection.close();
321             }
322         }
323         else
324         {
325             final Request request = exchange.getRequest();
326             Throwable cause = request.getAbortCause();
327             if (cause != null)
328             {
329                 abort(exchange, cause);
330                 LOG.debug("Aborted before processing {}: {}", exchange, cause);
331             }
332             else
333             {
334                 LOG.debug("{} active", httpConnection);
335                 if (!activeConnections.offer(httpConnection))
336                 {
337                     LOG.warn("{} active overflow");
338                 }
339                 if (dispatch)
340                 {
341                     client.getExecutor().execute(new Runnable()
342                     {
343                         @Override
344                         public void run()
345                         {
346                             httpConnection.send(exchange);
347                         }
348                     });
349                 }
350                 else
351                 {
352                     httpConnection.send(exchange);
353                 }
354             }
355         }
356     }
357 
358     public void release(Connection connection)
359     {
360         LOG.debug("{} released", connection);
361         if (client.isRunning())
362         {
363             boolean removed = activeConnections.remove(connection);
364             if (removed)
365                 process(connection, false);
366             else
367                 LOG.debug("{} explicit", connection);
368         }
369         else
370         {
371             LOG.debug("{} is stopped", client);
372             remove(connection);
373             connection.close();
374         }
375     }
376 
377     public void remove(Connection connection)
378     {
379         boolean removed = activeConnections.remove(connection);
380         removed |= idleConnections.remove(connection);
381         if (removed)
382         {
383             int open = connectionCount.decrementAndGet();
384             LOG.debug("Removed connection {} for {} - open: {}", connection, this, open);
385         }
386 
387         // We need to execute queued requests even if this connection failed.
388         // We may create a connection that is not needed, but it will eventually
389         // idle timeout, so no worries
390         if (!exchanges.isEmpty())
391         {
392             connection = acquire();
393             if (connection != null)
394                 process(connection, false);
395         }
396     }
397 
398     public void close()
399     {
400         for (Connection connection : idleConnections)
401             connection.close();
402         idleConnections.clear();
403 
404         // A bit drastic, but we cannot wait for all requests to complete
405         for (Connection connection : activeConnections)
406             connection.close();
407         activeConnections.clear();
408 
409         abort(new AsynchronousCloseException());
410 
411         connectionCount.set(0);
412 
413         LOG.debug("Closed {}", this);
414     }
415 
416     public boolean remove(HttpExchange exchange)
417     {
418         return exchanges.remove(exchange);
419     }
420 
421     protected void abort(HttpExchange exchange, Throwable cause)
422     {
423         Request request = exchange.getRequest();
424         HttpResponse response = exchange.getResponse();
425         getRequestNotifier().notifyFailure(request, cause);
426         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
427         getResponseNotifier().notifyFailure(listeners, response, cause);
428         getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
429     }
430 
431     protected void tunnelSucceeded(Connection connection, Promise<Connection> promise)
432     {
433         // Wrap the connection with TLS
434         Connection tunnel = client.tunnel(connection);
435         promise.succeeded(tunnel);
436     }
437 
438     protected void tunnelFailed(Connection connection, Promise<Connection> promise, Throwable failure)
439     {
440         promise.failed(failure);
441         connection.close();
442     }
443 
444     @Override
445     public String dump()
446     {
447         return ContainerLifeCycle.dump(this);
448     }
449 
450     @Override
451     public void dump(Appendable out, String indent) throws IOException
452     {
453         ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
454         List<String> connections = new ArrayList<>();
455         for (Connection connection : idleConnections)
456             connections.add(connection + " - IDLE");
457         for (Connection connection : activeConnections)
458             connections.add(connection + " - ACTIVE");
459         ContainerLifeCycle.dump(out, indent, connections);
460     }
461 
462     @Override
463     public String toString()
464     {
465         return String.format("%s(%s://%s:%d)%s",
466                 HttpDestination.class.getSimpleName(),
467                 getScheme(),
468                 getHost(),
469                 getPort(),
470                 proxyAddress == null ? "" : " via " + proxyAddress.getHost() + ":" + proxyAddress.getPort());
471     }
472 
473     /**
474      * Decides whether to establish a proxy tunnel using HTTP CONNECT.
475      * It is implemented as a promise because it needs to establish the tunnel
476      * when the TCP connection is succeeded, and needs to notify another
477      * promise when the tunnel is established (or failed).
478      */
479     private class ProxyPromise implements Promise<Connection>
480     {
481         private final Promise<Connection> delegate;
482 
483         private ProxyPromise(Promise<Connection> delegate)
484         {
485             this.delegate = delegate;
486         }
487 
488         @Override
489         public void succeeded(Connection connection)
490         {
491             if (isProxied() && HttpScheme.HTTPS.is(getScheme()))
492             {
493                 if (client.getSslContextFactory() != null)
494                 {
495                     tunnel(connection);
496                 }
497                 else
498                 {
499                     String message = String.format("Cannot perform requests over SSL, no %s in %s",
500                             SslContextFactory.class.getSimpleName(), HttpClient.class.getSimpleName());
501                     delegate.failed(new IllegalStateException(message));
502                 }
503             }
504             else
505             {
506                 delegate.succeeded(connection);
507             }
508         }
509 
510         @Override
511         public void failed(Throwable x)
512         {
513             delegate.failed(x);
514         }
515 
516         private void tunnel(final Connection connection)
517         {
518             String target = address.getHost() + ":" + address.getPort();
519             Request connect = client.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
520                     .scheme(HttpScheme.HTTP.asString())
521                     .method(HttpMethod.CONNECT)
522                     .path(target)
523                     .header(HttpHeader.HOST, target)
524                     .timeout(client.getConnectTimeout(), TimeUnit.MILLISECONDS);
525             connection.send(connect, new Response.CompleteListener()
526             {
527                 @Override
528                 public void onComplete(Result result)
529                 {
530                     if (result.isFailed())
531                     {
532                         tunnelFailed(connection, delegate, result.getFailure());
533                     }
534                     else
535                     {
536                         Response response = result.getResponse();
537                         if (response.getStatus() == 200)
538                         {
539                             tunnelSucceeded(connection, delegate);
540                         }
541                         else
542                         {
543                             tunnelFailed(connection, delegate, new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
544                         }
545                     }
546                 }
547             });
548         }
549     }
550 }