1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.concurrent.RejectedExecutionException;
27
28 import org.eclipse.jetty.client.api.Connection;
29 import org.eclipse.jetty.client.api.Destination;
30 import org.eclipse.jetty.client.api.Response;
31 import org.eclipse.jetty.http.HttpField;
32 import org.eclipse.jetty.http.HttpHeader;
33 import org.eclipse.jetty.http.HttpScheme;
34 import org.eclipse.jetty.io.ClientConnectionFactory;
35 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
36 import org.eclipse.jetty.util.BlockingArrayQueue;
37 import org.eclipse.jetty.util.Promise;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43 public abstract class HttpDestination implements Destination, Closeable, Dumpable
44 {
45 protected static final Logger LOG = Log.getLogger(HttpDestination.class);
46
47 private final HttpClient client;
48 private final Origin origin;
49 private final Queue<HttpExchange> exchanges;
50 private final RequestNotifier requestNotifier;
51 private final ResponseNotifier responseNotifier;
52 private final ProxyConfiguration.Proxy proxy;
53 private final ClientConnectionFactory connectionFactory;
54 private final HttpField hostField;
55
56 public HttpDestination(HttpClient client, Origin origin)
57 {
58 this.client = client;
59 this.origin = origin;
60
61 this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
62
63 this.requestNotifier = new RequestNotifier(client);
64 this.responseNotifier = new ResponseNotifier();
65
66 ProxyConfiguration proxyConfig = client.getProxyConfiguration();
67 proxy = proxyConfig.match(origin);
68 ClientConnectionFactory connectionFactory = client.getTransport();
69 if (proxy != null)
70 {
71 connectionFactory = proxy.newClientConnectionFactory(connectionFactory);
72 }
73 else
74 {
75 if (HttpScheme.HTTPS.is(getScheme()))
76 connectionFactory = newSslClientConnectionFactory(connectionFactory);
77 }
78 this.connectionFactory = connectionFactory;
79
80 String host = getHost();
81 if (!client.isDefaultPort(getScheme(), getPort()))
82 host += ":" + getPort();
83 hostField = new HttpField(HttpHeader.HOST, host);
84 }
85
86 protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
87 {
88 return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
89 }
90
91 public HttpClient getHttpClient()
92 {
93 return client;
94 }
95
96 public Origin getOrigin()
97 {
98 return origin;
99 }
100
101 public Queue<HttpExchange> getHttpExchanges()
102 {
103 return exchanges;
104 }
105
106 public RequestNotifier getRequestNotifier()
107 {
108 return requestNotifier;
109 }
110
111 public ResponseNotifier getResponseNotifier()
112 {
113 return responseNotifier;
114 }
115
116 public ProxyConfiguration.Proxy getProxy()
117 {
118 return proxy;
119 }
120
121 public ClientConnectionFactory getClientConnectionFactory()
122 {
123 return connectionFactory;
124 }
125
126 @Override
127 public String getScheme()
128 {
129 return origin.getScheme();
130 }
131
132 @Override
133 public String getHost()
134 {
135
136
137 return origin.getAddress().getHost();
138 }
139
140 @Override
141 public int getPort()
142 {
143 return origin.getAddress().getPort();
144 }
145
146 public Origin.Address getConnectAddress()
147 {
148 return proxy == null ? origin.getAddress() : proxy.getAddress();
149 }
150
151 public HttpField getHostField()
152 {
153 return hostField;
154 }
155
156 protected void send(HttpRequest request, List<Response.ResponseListener> listeners)
157 {
158 if (!getScheme().equals(request.getScheme()))
159 throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
160 if (!getHost().equals(request.getHost()))
161 throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
162 int port = request.getPort();
163 if (port >= 0 && getPort() != port)
164 throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
165
166 HttpExchange exchange = new HttpExchange(this, request, listeners);
167
168 if (client.isRunning())
169 {
170 if (exchanges.offer(exchange))
171 {
172 if (!client.isRunning() && exchanges.remove(exchange))
173 {
174 request.abort(new RejectedExecutionException(client + " is stopping"));
175 }
176 else
177 {
178 LOG.debug("Queued {}", request);
179 requestNotifier.notifyQueued(request);
180 send();
181 }
182 }
183 else
184 {
185 LOG.debug("Max queue size {} exceeded by {}", client.getMaxRequestsQueuedPerDestination(), request);
186 request.abort(new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
187 }
188 }
189 else
190 {
191 request.abort(new RejectedExecutionException(client + " is stopped"));
192 }
193 }
194
195 protected abstract void send();
196
197 public void newConnection(Promise<Connection> promise)
198 {
199 createConnection(promise);
200 }
201
202 protected void createConnection(Promise<Connection> promise)
203 {
204 client.newConnection(this, promise);
205 }
206
207 public boolean remove(HttpExchange exchange)
208 {
209 return exchanges.remove(exchange);
210 }
211
212 public void close()
213 {
214 abort(new AsynchronousCloseException());
215 LOG.debug("Closed {}", this);
216 }
217
218 public void release(Connection connection)
219 {
220 }
221
222 public void close(Connection connection)
223 {
224 }
225
226
227
228
229
230
231 public void abort(Throwable cause)
232 {
233 HttpExchange exchange;
234
235 while ((exchange = exchanges.peek()) != null)
236 exchange.getRequest().abort(cause);
237 }
238
239 @Override
240 public String dump()
241 {
242 return ContainerLifeCycle.dump(this);
243 }
244
245 @Override
246 public void dump(Appendable out, String indent) throws IOException
247 {
248 ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
249 }
250
251 public String asString()
252 {
253 return origin.asString();
254 }
255
256 @Override
257 public String toString()
258 {
259 return String.format("%s(%s)%s",
260 HttpDestination.class.getSimpleName(),
261 asString(),
262 proxy == null ? "" : " via " + proxy);
263 }
264 }