View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.nio.channels.AsynchronousCloseException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Queue;
27  import java.util.concurrent.RejectedExecutionException;
28  
29  import org.eclipse.jetty.client.api.Connection;
30  import org.eclipse.jetty.client.api.Destination;
31  import org.eclipse.jetty.client.api.Response;
32  import org.eclipse.jetty.http.HttpField;
33  import org.eclipse.jetty.http.HttpHeader;
34  import org.eclipse.jetty.http.HttpScheme;
35  import org.eclipse.jetty.io.ClientConnectionFactory;
36  import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
37  import org.eclipse.jetty.util.BlockingArrayQueue;
38  import org.eclipse.jetty.util.Promise;
39  import org.eclipse.jetty.util.annotation.ManagedAttribute;
40  import org.eclipse.jetty.util.annotation.ManagedObject;
41  import org.eclipse.jetty.util.component.ContainerLifeCycle;
42  import org.eclipse.jetty.util.component.Dumpable;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  
46  @ManagedObject
47  public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Dumpable
48  {
49      protected static final Logger LOG = Log.getLogger(HttpDestination.class);
50  
51      private final HttpClient client;
52      private final Origin origin;
53      private final Queue<HttpExchange> exchanges;
54      private final RequestNotifier requestNotifier;
55      private final ResponseNotifier responseNotifier;
56      private final ProxyConfiguration.Proxy proxy;
57      private final ClientConnectionFactory connectionFactory;
58      private final HttpField hostField;
59  
60      public HttpDestination(HttpClient client, Origin origin)
61      {
62          this.client = client;
63          this.origin = origin;
64  
65          this.exchanges = newExchangeQueue(client);
66  
67          this.requestNotifier = new RequestNotifier(client);
68          this.responseNotifier = new ResponseNotifier();
69  
70          ProxyConfiguration proxyConfig = client.getProxyConfiguration();
71          proxy = proxyConfig.match(origin);
72          ClientConnectionFactory connectionFactory = client.getTransport();
73          if (proxy != null)
74          {
75              connectionFactory = proxy.newClientConnectionFactory(connectionFactory);
76          }
77          else
78          {
79              if (HttpScheme.HTTPS.is(getScheme()))
80                  connectionFactory = newSslClientConnectionFactory(connectionFactory);
81          }
82          this.connectionFactory = connectionFactory;
83  
84          String host = getHost();
85          if (!client.isDefaultPort(getScheme(), getPort()))
86              host += ":" + getPort();
87          hostField = new HttpField(HttpHeader.HOST, host);
88      }
89  
90      protected Queue<HttpExchange> newExchangeQueue(HttpClient client)
91      {
92          return new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
93      }
94  
95      protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
96      {
97          return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
98      }
99  
100     public HttpClient getHttpClient()
101     {
102         return client;
103     }
104 
105     public Origin getOrigin()
106     {
107         return origin;
108     }
109 
110     public Queue<HttpExchange> getHttpExchanges()
111     {
112         return exchanges;
113     }
114 
115     public RequestNotifier getRequestNotifier()
116     {
117         return requestNotifier;
118     }
119 
120     public ResponseNotifier getResponseNotifier()
121     {
122         return responseNotifier;
123     }
124 
125     public ProxyConfiguration.Proxy getProxy()
126     {
127         return proxy;
128     }
129 
130     public ClientConnectionFactory getClientConnectionFactory()
131     {
132         return connectionFactory;
133     }
134 
135     @Override
136     @ManagedAttribute(value = "The destination scheme", readonly = true)
137     public String getScheme()
138     {
139         return origin.getScheme();
140     }
141 
142     @Override
143     @ManagedAttribute(value = "The destination host", readonly = true)
144     public String getHost()
145     {
146         // InetSocketAddress.getHostString() transforms the host string
147         // in case of IPv6 addresses, so we return the original host string
148         return origin.getAddress().getHost();
149     }
150 
151     @Override
152     @ManagedAttribute(value = "The destination port", readonly = true)
153     public int getPort()
154     {
155         return origin.getAddress().getPort();
156     }
157 
158     @ManagedAttribute(value = "The number of queued requests", readonly = true)
159     public int getQueuedRequestCount()
160     {
161         return exchanges.size();
162     }
163 
164     public Origin.Address getConnectAddress()
165     {
166         return proxy == null ? origin.getAddress() : proxy.getAddress();
167     }
168 
169     public HttpField getHostField()
170     {
171         return hostField;
172     }
173 
174     protected void send(HttpRequest request, List<Response.ResponseListener> listeners)
175     {
176         if (!getScheme().equalsIgnoreCase(request.getScheme()))
177             throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
178         if (!getHost().equalsIgnoreCase(request.getHost()))
179             throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
180         int port = request.getPort();
181         if (port >= 0 && getPort() != port)
182             throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
183 
184         HttpExchange exchange = new HttpExchange(this, request, listeners);
185 
186         if (client.isRunning())
187         {
188             if (enqueue(exchanges, exchange))
189             {
190                 if (!client.isRunning() && exchanges.remove(exchange))
191                 {
192                     request.abort(new RejectedExecutionException(client + " is stopping"));
193                 }
194                 else
195                 {
196                     if (LOG.isDebugEnabled())
197                         LOG.debug("Queued {} for {}", request, this);
198                     requestNotifier.notifyQueued(request);
199                     send();
200                 }
201             }
202             else
203             {
204                 if (LOG.isDebugEnabled())
205                     LOG.debug("Max queue size {} exceeded by {} for {}", client.getMaxRequestsQueuedPerDestination(), request, this);
206                 request.abort(new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
207             }
208         }
209         else
210         {
211             request.abort(new RejectedExecutionException(client + " is stopped"));
212         }
213     }
214 
215     protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
216     {
217         return queue.offer(exchange);
218     }
219 
220     public abstract void send();
221 
222     public void newConnection(Promise<Connection> promise)
223     {
224         createConnection(promise);
225     }
226 
227     protected void createConnection(Promise<Connection> promise)
228     {
229         client.newConnection(this, promise);
230     }
231 
232     public boolean remove(HttpExchange exchange)
233     {
234         return exchanges.remove(exchange);
235     }
236 
237     public void close()
238     {
239         abort(new AsynchronousCloseException());
240         if (LOG.isDebugEnabled())
241             LOG.debug("Closed {}", this);
242     }
243 
244     public void release(Connection connection)
245     {
246     }
247 
248     public void close(Connection connection)
249     {
250     }
251 
252     /**
253      * Aborts all the {@link HttpExchange}s queued in this destination.
254      *
255      * @param cause the abort cause
256      */
257     public void abort(Throwable cause)
258     {
259         // Copy the queue of exchanges and fail only those that are queued at this moment.
260         // The application may queue another request from the failure/complete listener
261         // and we don't want to fail it immediately as if it was queued before the failure.
262         // The call to Request.abort() will remove the exchange from the exchanges queue.
263         for (HttpExchange exchange : new ArrayList<>(exchanges))
264             exchange.getRequest().abort(cause);
265     }
266 
267     @Override
268     public String dump()
269     {
270         return ContainerLifeCycle.dump(this);
271     }
272 
273     @Override
274     public void dump(Appendable out, String indent) throws IOException
275     {
276         ContainerLifeCycle.dumpObject(out, toString());
277     }
278 
279     public String asString()
280     {
281         return origin.asString();
282     }
283 
284     @Override
285     public String toString()
286     {
287         return String.format("%s[%s]%x%s,queue=%d",
288                 HttpDestination.class.getSimpleName(),
289                 asString(),
290                 hashCode(),
291                 proxy == null ? "" : "(via " + proxy + ")",
292                 exchanges.size());
293     }
294 }