View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.nio.channels.AsynchronousCloseException;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.concurrent.RejectedExecutionException;
27  
28  import org.eclipse.jetty.client.api.Connection;
29  import org.eclipse.jetty.client.api.Destination;
30  import org.eclipse.jetty.client.api.Response;
31  import org.eclipse.jetty.http.HttpField;
32  import org.eclipse.jetty.http.HttpHeader;
33  import org.eclipse.jetty.http.HttpScheme;
34  import org.eclipse.jetty.io.ClientConnectionFactory;
35  import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
36  import org.eclipse.jetty.util.BlockingArrayQueue;
37  import org.eclipse.jetty.util.Promise;
38  import org.eclipse.jetty.util.component.ContainerLifeCycle;
39  import org.eclipse.jetty.util.component.Dumpable;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  
43  public abstract class HttpDestination implements Destination, Closeable, Dumpable
44  {
45      protected static final Logger LOG = Log.getLogger(HttpDestination.class);
46  
47      private final HttpClient client;
48      private final Origin origin;
49      private final Queue<HttpExchange> exchanges;
50      private final RequestNotifier requestNotifier;
51      private final ResponseNotifier responseNotifier;
52      private final ProxyConfiguration.Proxy proxy;
53      private final ClientConnectionFactory connectionFactory;
54      private final HttpField hostField;
55  
56      public HttpDestination(HttpClient client, Origin origin)
57      {
58          this.client = client;
59          this.origin = origin;
60  
61          this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
62  
63          this.requestNotifier = new RequestNotifier(client);
64          this.responseNotifier = new ResponseNotifier();
65  
66          ProxyConfiguration proxyConfig = client.getProxyConfiguration();
67          proxy = proxyConfig.match(origin);
68          ClientConnectionFactory connectionFactory = client.getTransport();
69          if (proxy != null)
70          {
71              connectionFactory = proxy.newClientConnectionFactory(connectionFactory);
72          }
73          else
74          {
75              if (HttpScheme.HTTPS.is(getScheme()))
76                  connectionFactory = newSslClientConnectionFactory(connectionFactory);
77          }
78          this.connectionFactory = connectionFactory;
79  
80          String host = getHost();
81          if (!client.isDefaultPort(getScheme(), getPort()))
82              host += ":" + getPort();
83          hostField = new HttpField(HttpHeader.HOST, host);
84      }
85  
86      protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
87      {
88          return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
89      }
90  
91      public HttpClient getHttpClient()
92      {
93          return client;
94      }
95  
96      public Origin getOrigin()
97      {
98          return origin;
99      }
100 
101     public Queue<HttpExchange> getHttpExchanges()
102     {
103         return exchanges;
104     }
105 
106     public RequestNotifier getRequestNotifier()
107     {
108         return requestNotifier;
109     }
110 
111     public ResponseNotifier getResponseNotifier()
112     {
113         return responseNotifier;
114     }
115 
116     public ProxyConfiguration.Proxy getProxy()
117     {
118         return proxy;
119     }
120 
121     public ClientConnectionFactory getClientConnectionFactory()
122     {
123         return connectionFactory;
124     }
125 
126     @Override
127     public String getScheme()
128     {
129         return origin.getScheme();
130     }
131 
132     @Override
133     public String getHost()
134     {
135         // InetSocketAddress.getHostString() transforms the host string
136         // in case of IPv6 addresses, so we return the original host string
137         return origin.getAddress().getHost();
138     }
139 
140     @Override
141     public int getPort()
142     {
143         return origin.getAddress().getPort();
144     }
145 
146     public Origin.Address getConnectAddress()
147     {
148         return proxy == null ? origin.getAddress() : proxy.getAddress();
149     }
150 
151     public HttpField getHostField()
152     {
153         return hostField;
154     }
155 
156     protected void send(HttpRequest request, List<Response.ResponseListener> listeners)
157     {
158         if (!getScheme().equals(request.getScheme()))
159             throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
160         if (!getHost().equals(request.getHost()))
161             throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
162         int port = request.getPort();
163         if (port >= 0 && getPort() != port)
164             throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
165 
166         HttpExchange exchange = new HttpExchange(this, request, listeners);
167 
168         if (client.isRunning())
169         {
170             if (exchanges.offer(exchange))
171             {
172                 if (!client.isRunning() && exchanges.remove(exchange))
173                 {
174                     request.abort(new RejectedExecutionException(client + " is stopping"));
175                 }
176                 else
177                 {
178                     if (LOG.isDebugEnabled())
179                         LOG.debug("Queued {} for {}", request, this);
180                     requestNotifier.notifyQueued(request);
181                     send();
182                 }
183             }
184             else
185             {
186                 if (LOG.isDebugEnabled())
187                     LOG.debug("Max queue size {} exceeded by {} for {}", client.getMaxRequestsQueuedPerDestination(), request, this);
188                 request.abort(new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
189             }
190         }
191         else
192         {
193             request.abort(new RejectedExecutionException(client + " is stopped"));
194         }
195     }
196 
197     protected abstract void send();
198 
199     public void newConnection(Promise<Connection> promise)
200     {
201         createConnection(promise);
202     }
203 
204     protected void createConnection(Promise<Connection> promise)
205     {
206         client.newConnection(this, promise);
207     }
208 
209     public boolean remove(HttpExchange exchange)
210     {
211         return exchanges.remove(exchange);
212     }
213 
214     public void close()
215     {
216         abort(new AsynchronousCloseException());
217         if (LOG.isDebugEnabled())
218             LOG.debug("Closed {}", this);
219     }
220 
221     public void release(Connection connection)
222     {
223     }
224 
225     public void close(Connection connection)
226     {
227     }
228 
229     /**
230      * Aborts all the {@link HttpExchange}s queued in this destination.
231      *
232      * @param cause the abort cause
233      */
234     public void abort(Throwable cause)
235     {
236         HttpExchange exchange;
237         // Just peek(), the abort() will remove it from the queue.
238         while ((exchange = exchanges.peek()) != null)
239             exchange.getRequest().abort(cause);
240     }
241 
242     @Override
243     public String dump()
244     {
245         return ContainerLifeCycle.dump(this);
246     }
247 
248     @Override
249     public void dump(Appendable out, String indent) throws IOException
250     {
251         ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
252     }
253 
254     public String asString()
255     {
256         return origin.asString();
257     }
258 
259     @Override
260     public String toString()
261     {
262         return String.format("%s[%s]%s,queue=%d",
263                 HttpDestination.class.getSimpleName(),
264                 asString(),
265                 proxy == null ? "" : "(via " + proxy + ")",
266                 exchanges.size());
267     }
268 }