View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.nio.channels.AsynchronousCloseException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.RejectedExecutionException;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import org.eclipse.jetty.client.api.Connection;
32  import org.eclipse.jetty.client.api.Destination;
33  import org.eclipse.jetty.client.api.ProxyConfiguration;
34  import org.eclipse.jetty.client.api.Request;
35  import org.eclipse.jetty.client.api.Response;
36  import org.eclipse.jetty.client.api.Result;
37  import org.eclipse.jetty.http.HttpField;
38  import org.eclipse.jetty.http.HttpHeader;
39  import org.eclipse.jetty.http.HttpMethod;
40  import org.eclipse.jetty.http.HttpScheme;
41  import org.eclipse.jetty.util.BlockingArrayQueue;
42  import org.eclipse.jetty.util.Promise;
43  import org.eclipse.jetty.util.component.ContainerLifeCycle;
44  import org.eclipse.jetty.util.component.Dumpable;
45  import org.eclipse.jetty.util.log.Log;
46  import org.eclipse.jetty.util.log.Logger;
47  
48  public class HttpDestination implements Destination, AutoCloseable, Dumpable
49  {
50      private static final Logger LOG = Log.getLogger(HttpDestination.class);
51  
52      private final AtomicInteger connectionCount = new AtomicInteger();
53      private final HttpClient client;
54      private final String scheme;
55      private final String host;
56      private final Address address;
57      private final Queue<HttpExchange> exchanges;
58      private final BlockingQueue<Connection> idleConnections;
59      private final BlockingQueue<Connection> activeConnections;
60      private final RequestNotifier requestNotifier;
61      private final ResponseNotifier responseNotifier;
62      private final Address proxyAddress;
63      private final HttpField hostField;
64  
65      public HttpDestination(HttpClient client, String scheme, String host, int port)
66      {
67          this.client = client;
68          this.scheme = scheme;
69          this.host = host;
70          this.address = new Address(host, port);
71  
72          int maxRequestsQueued = client.getMaxRequestsQueuedPerDestination();
73          int capacity = Math.min(32, maxRequestsQueued);
74          this.exchanges = new BlockingArrayQueue<>(capacity, capacity, maxRequestsQueued);
75  
76          int maxConnections = client.getMaxConnectionsPerDestination();
77          capacity = Math.min(8, maxConnections);
78          this.idleConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
79          this.activeConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
80  
81          this.requestNotifier = new RequestNotifier(client);
82          this.responseNotifier = new ResponseNotifier(client);
83  
84          ProxyConfiguration proxyConfig = client.getProxyConfiguration();
85          proxyAddress = proxyConfig != null && proxyConfig.matches(host, port) ?
86                  new Address(proxyConfig.getHost(), proxyConfig.getPort()) : null;
87  
88          hostField = new HttpField(HttpHeader.HOST, host + ":" + port);
89      }
90  
91      protected BlockingQueue<Connection> getIdleConnections()
92      {
93          return idleConnections;
94      }
95  
96      protected BlockingQueue<Connection> getActiveConnections()
97      {
98          return activeConnections;
99      }
100 
101     public RequestNotifier getRequestNotifier()
102     {
103         return requestNotifier;
104     }
105 
106     public ResponseNotifier getResponseNotifier()
107     {
108         return responseNotifier;
109     }
110 
111     @Override
112     public String getScheme()
113     {
114         return scheme;
115     }
116 
117     @Override
118     public String getHost()
119     {
120         // InetSocketAddress.getHostString() transforms the host string
121         // in case of IPv6 addresses, so we return the original host string
122         return host;
123     }
124 
125     @Override
126     public int getPort()
127     {
128         return address.getPort();
129     }
130 
131     public Address getConnectAddress()
132     {
133         return isProxied() ? proxyAddress : address;
134     }
135 
136     public boolean isProxied()
137     {
138         return proxyAddress != null;
139     }
140 
141     public HttpField getHostField()
142     {
143         return hostField;
144     }
145 
146     public void send(Request request, List<Response.ResponseListener> listeners)
147     {
148         if (!scheme.equals(request.getScheme()))
149             throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
150         if (!getHost().equals(request.getHost()))
151             throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
152         int port = request.getPort();
153         if (port >= 0 && getPort() != port)
154             throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
155 
156         HttpConversation conversation = client.getConversation(request.getConversationID(), true);
157         HttpExchange exchange = new HttpExchange(conversation, this, request, listeners);
158 
159         if (client.isRunning())
160         {
161             if (exchanges.offer(exchange))
162             {
163                 if (!client.isRunning() && exchanges.remove(exchange))
164                 {
165                     throw new RejectedExecutionException(client + " is stopping");
166                 }
167                 else
168                 {
169                     LOG.debug("Queued {}", request);
170                     requestNotifier.notifyQueued(request);
171                     Connection connection = acquire();
172                     if (connection != null)
173                         process(connection, false);
174                 }
175             }
176             else
177             {
178                 throw new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded");
179             }
180         }
181         else
182         {
183             throw new RejectedExecutionException(client + " is stopped");
184         }
185     }
186 
187     public void newConnection(Promise<Connection> promise)
188     {
189         createConnection(new ProxyPromise(promise));
190     }
191 
192     protected void createConnection(Promise<Connection> promise)
193     {
194         client.newConnection(this, promise);
195     }
196 
197     protected Connection acquire()
198     {
199         Connection result = idleConnections.poll();
200         if (result != null)
201             return result;
202 
203         final int maxConnections = client.getMaxConnectionsPerDestination();
204         while (true)
205         {
206             int current = connectionCount.get();
207             final int next = current + 1;
208 
209             if (next > maxConnections)
210             {
211                 LOG.debug("Max connections {} reached for {}", current, this);
212                 // Try again the idle connections
213                 return idleConnections.poll();
214             }
215 
216             if (connectionCount.compareAndSet(current, next))
217             {
218                 LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this);
219 
220                 // This is the promise that is being called when a connection (eventually proxied) succeeds or fails.
221                 Promise<Connection> promise = new Promise<Connection>()
222                 {
223                     @Override
224                     public void succeeded(Connection connection)
225                     {
226                         process(connection, true);
227                     }
228 
229                     @Override
230                     public void failed(final Throwable x)
231                     {
232                         client.getExecutor().execute(new Runnable()
233                         {
234                             @Override
235                             public void run()
236                             {
237                                 abort(x);
238                             }
239                         });
240                     }
241                 };
242 
243                 // Create a new connection, and pass a ProxyPromise to establish a proxy tunnel, if needed.
244                 // Differently from the case where the connection is created explicitly by applications, here
245                 // we need to do a bit more logging and keep track of the connection count in case of failures.
246                 createConnection(new ProxyPromise(promise)
247                 {
248                     @Override
249                     public void succeeded(Connection connection)
250                     {
251                         LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this);
252                         super.succeeded(connection);
253                     }
254 
255                     @Override
256                     public void failed(Throwable x)
257                     {
258                         LOG.debug("Connection failed {} for {}", x, HttpDestination.this);
259                         connectionCount.decrementAndGet();
260                         super.failed(x);
261                     }
262                 });
263 
264                 // Try again the idle connections
265                 return idleConnections.poll();
266             }
267         }
268     }
269 
270     private void abort(Throwable cause)
271     {
272         HttpExchange exchange;
273         while ((exchange = exchanges.poll()) != null)
274             abort(exchange, cause);
275     }
276 
277     /**
278      * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
279      * <p>A new connection is created when a request needs to be executed; it is possible that the request that
280      * triggered the request creation is executed by another connection that was just released, so the new connection
281      * may become idle.</p>
282      * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
283      *
284      * @param connection the new connection
285      * @param dispatch whether to dispatch the processing to another thread
286      */
287     protected void process(Connection connection, boolean dispatch)
288     {
289         // Ugly cast, but lack of generic reification forces it
290         final HttpConnection httpConnection = (HttpConnection)connection;
291 
292         final HttpExchange exchange = exchanges.poll();
293         if (exchange == null)
294         {
295             LOG.debug("{} idle", httpConnection);
296             if (!idleConnections.offer(httpConnection))
297             {
298                 LOG.debug("{} idle overflow");
299                 httpConnection.close();
300             }
301             if (!client.isRunning())
302             {
303                 LOG.debug("{} is stopping", client);
304                 remove(httpConnection);
305                 httpConnection.close();
306             }
307         }
308         else
309         {
310             final Request request = exchange.getRequest();
311             Throwable cause = request.getAbortCause();
312             if (cause != null)
313             {
314                 abort(exchange, cause);
315                 LOG.debug("Aborted before processing {}: {}", exchange, cause);
316             }
317             else
318             {
319                 LOG.debug("{} active", httpConnection);
320                 if (!activeConnections.offer(httpConnection))
321                 {
322                     LOG.warn("{} active overflow");
323                 }
324                 if (dispatch)
325                 {
326                     client.getExecutor().execute(new Runnable()
327                     {
328                         @Override
329                         public void run()
330                         {
331                             httpConnection.send(exchange);
332                         }
333                     });
334                 }
335                 else
336                 {
337                     httpConnection.send(exchange);
338                 }
339             }
340         }
341     }
342 
343     public void release(Connection connection)
344     {
345         LOG.debug("{} released", connection);
346         if (client.isRunning())
347         {
348             boolean removed = activeConnections.remove(connection);
349             if (removed)
350                 process(connection, false);
351             else
352                 LOG.debug("{} explicit", connection);
353         }
354         else
355         {
356             LOG.debug("{} is stopped", client);
357             remove(connection);
358             connection.close();
359         }
360     }
361 
362     public void remove(Connection connection)
363     {
364         boolean removed = activeConnections.remove(connection);
365         removed |= idleConnections.remove(connection);
366         if (removed)
367         {
368             int open = connectionCount.decrementAndGet();
369             LOG.debug("Removed connection {} for {} - open: {}", connection, this, open);
370         }
371 
372         // We need to execute queued requests even if this connection failed.
373         // We may create a connection that is not needed, but it will eventually
374         // idle timeout, so no worries
375         if (!exchanges.isEmpty())
376         {
377             connection = acquire();
378             if (connection != null)
379                 process(connection, false);
380         }
381     }
382 
383     public void close()
384     {
385         for (Connection connection : idleConnections)
386             connection.close();
387         idleConnections.clear();
388 
389         // A bit drastic, but we cannot wait for all requests to complete
390         for (Connection connection : activeConnections)
391             connection.close();
392         activeConnections.clear();
393 
394         abort(new AsynchronousCloseException());
395 
396         connectionCount.set(0);
397 
398         LOG.debug("Closed {}", this);
399     }
400 
401     public boolean remove(HttpExchange exchange)
402     {
403         return exchanges.remove(exchange);
404     }
405 
406     protected void abort(HttpExchange exchange, Throwable cause)
407     {
408         Request request = exchange.getRequest();
409         HttpResponse response = exchange.getResponse();
410         getRequestNotifier().notifyFailure(request, cause);
411         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
412         getResponseNotifier().notifyFailure(listeners, response, cause);
413         getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
414     }
415 
416     @Override
417     public String dump()
418     {
419         return ContainerLifeCycle.dump(this);
420     }
421 
422     @Override
423     public void dump(Appendable out, String indent) throws IOException
424     {
425         ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
426         List<String> connections = new ArrayList<>();
427         for (Connection connection : idleConnections)
428             connections.add(connection + " - IDLE");
429         for (Connection connection : activeConnections)
430             connections.add(connection + " - ACTIVE");
431         ContainerLifeCycle.dump(out, indent, connections);
432     }
433 
434     @Override
435     public String toString()
436     {
437         return String.format("%s(%s://%s:%d)%s",
438                 HttpDestination.class.getSimpleName(),
439                 getScheme(),
440                 getHost(),
441                 getPort(),
442                 proxyAddress == null ? "" : " via " + proxyAddress.getHost() + ":" + proxyAddress.getPort());
443     }
444 
445     /**
446      * Decides whether to establish a proxy tunnel using HTTP CONNECT.
447      * It is implemented as a promise because it needs to establish the tunnel
448      * when the TCP connection is succeeded, and needs to notify another
449      * promise when the tunnel is established (or failed).
450      */
451     private class ProxyPromise implements Promise<Connection>
452     {
453         private final Promise<Connection> delegate;
454 
455         private ProxyPromise(Promise<Connection> delegate)
456         {
457             this.delegate = delegate;
458         }
459 
460         @Override
461         public void succeeded(Connection connection)
462         {
463             boolean tunnel = isProxied() &&
464                     "https".equalsIgnoreCase(getScheme()) &&
465                     client.getSslContextFactory() != null;
466             if (tunnel)
467                 tunnel(connection);
468             else
469                 delegate.succeeded(connection);
470         }
471 
472         @Override
473         public void failed(Throwable x)
474         {
475             delegate.failed(x);
476         }
477 
478         private void tunnel(final Connection connection)
479         {
480             String target = address.getHost() + ":" + address.getPort();
481             Request connect = client.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
482                     .scheme(HttpScheme.HTTP.asString())
483                     .method(HttpMethod.CONNECT)
484                     .path(target)
485                     .header(HttpHeader.HOST.asString(), target)
486                     .timeout(client.getConnectTimeout(), TimeUnit.MILLISECONDS);
487             connection.send(connect, new Response.CompleteListener()
488             {
489                 @Override
490                 public void onComplete(Result result)
491                 {
492                     if (result.isFailed())
493                     {
494                         failed(result.getFailure());
495                         connection.close();
496                     }
497                     else
498                     {
499                         Response response = result.getResponse();
500                         if (response.getStatus() == 200)
501                         {
502                             delegate.succeeded(connection);
503                         }
504                         else
505                         {
506                             failed(new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
507                             connection.close();
508                         }
509                     }
510                 }
511             });
512         }
513     }
514 }