1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.RejectedExecutionException;
28
29 import org.eclipse.jetty.client.api.Connection;
30 import org.eclipse.jetty.client.api.Destination;
31 import org.eclipse.jetty.client.api.Response;
32 import org.eclipse.jetty.http.HttpField;
33 import org.eclipse.jetty.http.HttpHeader;
34 import org.eclipse.jetty.http.HttpScheme;
35 import org.eclipse.jetty.io.ClientConnectionFactory;
36 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
37 import org.eclipse.jetty.util.BlockingArrayQueue;
38 import org.eclipse.jetty.util.Promise;
39 import org.eclipse.jetty.util.annotation.ManagedAttribute;
40 import org.eclipse.jetty.util.annotation.ManagedObject;
41 import org.eclipse.jetty.util.component.ContainerLifeCycle;
42 import org.eclipse.jetty.util.component.Dumpable;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45
46 @ManagedObject
47 public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Dumpable
48 {
49 protected static final Logger LOG = Log.getLogger(HttpDestination.class);
50
51 private final HttpClient client;
52 private final Origin origin;
53 private final Queue<HttpExchange> exchanges;
54 private final RequestNotifier requestNotifier;
55 private final ResponseNotifier responseNotifier;
56 private final ProxyConfiguration.Proxy proxy;
57 private final ClientConnectionFactory connectionFactory;
58 private final HttpField hostField;
59
60 public HttpDestination(HttpClient client, Origin origin)
61 {
62 this.client = client;
63 this.origin = origin;
64
65 this.exchanges = newExchangeQueue(client);
66
67 this.requestNotifier = new RequestNotifier(client);
68 this.responseNotifier = new ResponseNotifier();
69
70 ProxyConfiguration proxyConfig = client.getProxyConfiguration();
71 proxy = proxyConfig.match(origin);
72 ClientConnectionFactory connectionFactory = client.getTransport();
73 if (proxy != null)
74 {
75 connectionFactory = proxy.newClientConnectionFactory(connectionFactory);
76 }
77 else
78 {
79 if (HttpScheme.HTTPS.is(getScheme()))
80 connectionFactory = newSslClientConnectionFactory(connectionFactory);
81 }
82 this.connectionFactory = connectionFactory;
83
84 String host = getHost();
85 if (!client.isDefaultPort(getScheme(), getPort()))
86 host += ":" + getPort();
87 hostField = new HttpField(HttpHeader.HOST, host);
88 }
89
90 protected Queue<HttpExchange> newExchangeQueue(HttpClient client)
91 {
92 return new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
93 }
94
95 protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
96 {
97 return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
98 }
99
100 public HttpClient getHttpClient()
101 {
102 return client;
103 }
104
105 public Origin getOrigin()
106 {
107 return origin;
108 }
109
110 public Queue<HttpExchange> getHttpExchanges()
111 {
112 return exchanges;
113 }
114
115 public RequestNotifier getRequestNotifier()
116 {
117 return requestNotifier;
118 }
119
120 public ResponseNotifier getResponseNotifier()
121 {
122 return responseNotifier;
123 }
124
125 public ProxyConfiguration.Proxy getProxy()
126 {
127 return proxy;
128 }
129
130 public ClientConnectionFactory getClientConnectionFactory()
131 {
132 return connectionFactory;
133 }
134
135 @Override
136 @ManagedAttribute(value = "The destination scheme", readonly = true)
137 public String getScheme()
138 {
139 return origin.getScheme();
140 }
141
142 @Override
143 @ManagedAttribute(value = "The destination host", readonly = true)
144 public String getHost()
145 {
146
147
148 return origin.getAddress().getHost();
149 }
150
151 @Override
152 @ManagedAttribute(value = "The destination port", readonly = true)
153 public int getPort()
154 {
155 return origin.getAddress().getPort();
156 }
157
158 @ManagedAttribute(value = "The number of queued requests", readonly = true)
159 public int getQueuedRequestCount()
160 {
161 return exchanges.size();
162 }
163
164 public Origin.Address getConnectAddress()
165 {
166 return proxy == null ? origin.getAddress() : proxy.getAddress();
167 }
168
169 public HttpField getHostField()
170 {
171 return hostField;
172 }
173
174 protected void send(HttpRequest request, List<Response.ResponseListener> listeners)
175 {
176 if (!getScheme().equalsIgnoreCase(request.getScheme()))
177 throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
178 if (!getHost().equalsIgnoreCase(request.getHost()))
179 throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
180 int port = request.getPort();
181 if (port >= 0 && getPort() != port)
182 throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
183
184 HttpExchange exchange = new HttpExchange(this, request, listeners);
185
186 if (client.isRunning())
187 {
188 if (enqueue(exchanges, exchange))
189 {
190 if (!client.isRunning() && exchanges.remove(exchange))
191 {
192 request.abort(new RejectedExecutionException(client + " is stopping"));
193 }
194 else
195 {
196 if (LOG.isDebugEnabled())
197 LOG.debug("Queued {} for {}", request, this);
198 requestNotifier.notifyQueued(request);
199 send();
200 }
201 }
202 else
203 {
204 if (LOG.isDebugEnabled())
205 LOG.debug("Max queue size {} exceeded by {} for {}", client.getMaxRequestsQueuedPerDestination(), request, this);
206 request.abort(new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
207 }
208 }
209 else
210 {
211 request.abort(new RejectedExecutionException(client + " is stopped"));
212 }
213 }
214
215 protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
216 {
217 return queue.offer(exchange);
218 }
219
220 public abstract void send();
221
222 public void newConnection(Promise<Connection> promise)
223 {
224 createConnection(promise);
225 }
226
227 protected void createConnection(Promise<Connection> promise)
228 {
229 client.newConnection(this, promise);
230 }
231
232 public boolean remove(HttpExchange exchange)
233 {
234 return exchanges.remove(exchange);
235 }
236
237 public void close()
238 {
239 abort(new AsynchronousCloseException());
240 if (LOG.isDebugEnabled())
241 LOG.debug("Closed {}", this);
242 }
243
244 public void release(Connection connection)
245 {
246 }
247
248 public void close(Connection connection)
249 {
250 }
251
252
253
254
255
256
257 public void abort(Throwable cause)
258 {
259
260
261
262
263 for (HttpExchange exchange : new ArrayList<>(exchanges))
264 exchange.getRequest().abort(cause);
265 }
266
267 @Override
268 public String dump()
269 {
270 return ContainerLifeCycle.dump(this);
271 }
272
273 @Override
274 public void dump(Appendable out, String indent) throws IOException
275 {
276 ContainerLifeCycle.dumpObject(out, toString());
277 }
278
279 public String asString()
280 {
281 return origin.asString();
282 }
283
284 @Override
285 public String toString()
286 {
287 return String.format("%s[%s]%x%s,queue=%d",
288 HttpDestination.class.getSimpleName(),
289 asString(),
290 hashCode(),
291 proxy == null ? "" : "(via " + proxy + ")",
292 exchanges.size());
293 }
294 }