View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.nio.channels.AsynchronousCloseException;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.concurrent.RejectedExecutionException;
27  
28  import org.eclipse.jetty.client.api.Connection;
29  import org.eclipse.jetty.client.api.Destination;
30  import org.eclipse.jetty.client.api.Request;
31  import org.eclipse.jetty.client.api.Response;
32  import org.eclipse.jetty.client.api.Result;
33  import org.eclipse.jetty.http.HttpField;
34  import org.eclipse.jetty.http.HttpHeader;
35  import org.eclipse.jetty.http.HttpScheme;
36  import org.eclipse.jetty.io.ClientConnectionFactory;
37  import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
38  import org.eclipse.jetty.util.BlockingArrayQueue;
39  import org.eclipse.jetty.util.Promise;
40  import org.eclipse.jetty.util.component.ContainerLifeCycle;
41  import org.eclipse.jetty.util.component.Dumpable;
42  import org.eclipse.jetty.util.log.Log;
43  import org.eclipse.jetty.util.log.Logger;
44  
45  public abstract class HttpDestination implements Destination, Closeable, Dumpable
46  {
47      protected static final Logger LOG = Log.getLogger(HttpDestination.class);
48  
49      private final HttpClient client;
50      private final Origin origin;
51      private final Queue<HttpExchange> exchanges;
52      private final RequestNotifier requestNotifier;
53      private final ResponseNotifier responseNotifier;
54      private final ProxyConfiguration.Proxy proxy;
55      private final ClientConnectionFactory connectionFactory;
56      private final HttpField hostField;
57  
58      public HttpDestination(HttpClient client, Origin origin)
59      {
60          this.client = client;
61          this.origin = origin;
62  
63          this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
64  
65          this.requestNotifier = new RequestNotifier(client);
66          this.responseNotifier = new ResponseNotifier(client);
67  
68          ProxyConfiguration proxyConfig = client.getProxyConfiguration();
69          proxy = proxyConfig.match(origin);
70          ClientConnectionFactory connectionFactory = client.getTransport();
71          if (proxy != null)
72          {
73              connectionFactory = proxy.newClientConnectionFactory(connectionFactory);
74          }
75          else
76          {
77              if (HttpScheme.HTTPS.is(getScheme()))
78                  connectionFactory = newSslClientConnectionFactory(connectionFactory);
79          }
80          this.connectionFactory = connectionFactory;
81  
82          String host = getHost();
83          if (!client.isDefaultPort(getScheme(), getPort()))
84              host += ":" + getPort();
85          hostField = new HttpField(HttpHeader.HOST, host);
86      }
87  
88      protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
89      {
90          return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
91      }
92  
93      public HttpClient getHttpClient()
94      {
95          return client;
96      }
97  
98      public Origin getOrigin()
99      {
100         return origin;
101     }
102 
103     public Queue<HttpExchange> getHttpExchanges()
104     {
105         return exchanges;
106     }
107 
108     public RequestNotifier getRequestNotifier()
109     {
110         return requestNotifier;
111     }
112 
113     public ResponseNotifier getResponseNotifier()
114     {
115         return responseNotifier;
116     }
117 
118     public ProxyConfiguration.Proxy getProxy()
119     {
120         return proxy;
121     }
122 
123     public ClientConnectionFactory getClientConnectionFactory()
124     {
125         return connectionFactory;
126     }
127 
128     @Override
129     public String getScheme()
130     {
131         return origin.getScheme();
132     }
133 
134     @Override
135     public String getHost()
136     {
137         // InetSocketAddress.getHostString() transforms the host string
138         // in case of IPv6 addresses, so we return the original host string
139         return origin.getAddress().getHost();
140     }
141 
142     @Override
143     public int getPort()
144     {
145         return origin.getAddress().getPort();
146     }
147 
148     public Origin.Address getConnectAddress()
149     {
150         return proxy == null ? origin.getAddress() : proxy.getAddress();
151     }
152 
153     public HttpField getHostField()
154     {
155         return hostField;
156     }
157 
158     protected void send(Request request, List<Response.ResponseListener> listeners)
159     {
160         if (!getScheme().equals(request.getScheme()))
161             throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
162         if (!getHost().equals(request.getHost()))
163             throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
164         int port = request.getPort();
165         if (port >= 0 && getPort() != port)
166             throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
167 
168         HttpConversation conversation = client.getConversation(request.getConversationID(), true);
169         HttpExchange exchange = new HttpExchange(conversation, this, request, listeners);
170 
171         if (client.isRunning())
172         {
173             if (exchanges.offer(exchange))
174             {
175                 if (!client.isRunning() && exchanges.remove(exchange))
176                 {
177                     throw new RejectedExecutionException(client + " is stopping");
178                 }
179                 else
180                 {
181                     LOG.debug("Queued {}", request);
182                     requestNotifier.notifyQueued(request);
183                     send();
184                 }
185             }
186             else
187             {
188                 LOG.debug("Max queued exceeded {}", request);
189                 abort(exchange, new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
190             }
191         }
192         else
193         {
194             throw new RejectedExecutionException(client + " is stopped");
195         }
196     }
197 
198     protected abstract void send();
199 
200     public void newConnection(Promise<Connection> promise)
201     {
202         createConnection(promise);
203     }
204 
205     protected void createConnection(Promise<Connection> promise)
206     {
207         client.newConnection(this, promise);
208     }
209 
210     public boolean remove(HttpExchange exchange)
211     {
212         return exchanges.remove(exchange);
213     }
214 
215     public void close()
216     {
217         abort(new AsynchronousCloseException());
218         LOG.debug("Closed {}", this);
219     }
220 
221     public void close(Connection connection)
222     {
223     }
224 
225     /**
226      * Aborts all the {@link HttpExchange}s queued in this destination.
227      *
228      * @param cause the abort cause
229      * @see #abort(HttpExchange, Throwable)
230      */
231     public void abort(Throwable cause)
232     {
233         HttpExchange exchange;
234         while ((exchange = exchanges.poll()) != null)
235             abort(exchange, cause);
236     }
237 
238     /**
239      * Aborts the given {@code exchange}, notifies listeners of the failure, and completes the exchange.
240      *
241      * @param exchange the {@link HttpExchange} to abort
242      * @param cause the abort cause
243      */
244     protected void abort(HttpExchange exchange, Throwable cause)
245     {
246         Request request = exchange.getRequest();
247         HttpResponse response = exchange.getResponse();
248         getRequestNotifier().notifyFailure(request, cause);
249         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
250         getResponseNotifier().notifyFailure(listeners, response, cause);
251         getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
252     }
253 
254     @Override
255     public String dump()
256     {
257         return ContainerLifeCycle.dump(this);
258     }
259 
260     @Override
261     public void dump(Appendable out, String indent) throws IOException
262     {
263         ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
264     }
265 
266     public String asString()
267     {
268         return origin.asString();
269     }
270 
271     @Override
272     public String toString()
273     {
274         return String.format("%s(%s)%s",
275                 HttpDestination.class.getSimpleName(),
276                 asString(),
277                 proxy == null ? "" : " via " + proxy);
278     }
279 }