1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.concurrent.RejectedExecutionException;
27
28 import org.eclipse.jetty.client.api.Connection;
29 import org.eclipse.jetty.client.api.Destination;
30 import org.eclipse.jetty.client.api.Response;
31 import org.eclipse.jetty.http.HttpField;
32 import org.eclipse.jetty.http.HttpHeader;
33 import org.eclipse.jetty.http.HttpScheme;
34 import org.eclipse.jetty.io.ClientConnectionFactory;
35 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
36 import org.eclipse.jetty.util.BlockingArrayQueue;
37 import org.eclipse.jetty.util.Promise;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43 public abstract class HttpDestination implements Destination, Closeable, Dumpable
44 {
45 protected static final Logger LOG = Log.getLogger(HttpDestination.class);
46
47 private final HttpClient client;
48 private final Origin origin;
49 private final Queue<HttpExchange> exchanges;
50 private final RequestNotifier requestNotifier;
51 private final ResponseNotifier responseNotifier;
52 private final ProxyConfiguration.Proxy proxy;
53 private final ClientConnectionFactory connectionFactory;
54 private final HttpField hostField;
55
56 public HttpDestination(HttpClient client, Origin origin)
57 {
58 this.client = client;
59 this.origin = origin;
60
61 this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
62
63 this.requestNotifier = new RequestNotifier(client);
64 this.responseNotifier = new ResponseNotifier();
65
66 ProxyConfiguration proxyConfig = client.getProxyConfiguration();
67 proxy = proxyConfig.match(origin);
68 ClientConnectionFactory connectionFactory = client.getTransport();
69 if (proxy != null)
70 {
71 connectionFactory = proxy.newClientConnectionFactory(connectionFactory);
72 }
73 else
74 {
75 if (HttpScheme.HTTPS.is(getScheme()))
76 connectionFactory = newSslClientConnectionFactory(connectionFactory);
77 }
78 this.connectionFactory = connectionFactory;
79
80 String host = getHost();
81 if (!client.isDefaultPort(getScheme(), getPort()))
82 host += ":" + getPort();
83 hostField = new HttpField(HttpHeader.HOST, host);
84 }
85
86 protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
87 {
88 return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
89 }
90
91 public HttpClient getHttpClient()
92 {
93 return client;
94 }
95
96 public Origin getOrigin()
97 {
98 return origin;
99 }
100
101 public Queue<HttpExchange> getHttpExchanges()
102 {
103 return exchanges;
104 }
105
106 public RequestNotifier getRequestNotifier()
107 {
108 return requestNotifier;
109 }
110
111 public ResponseNotifier getResponseNotifier()
112 {
113 return responseNotifier;
114 }
115
116 public ProxyConfiguration.Proxy getProxy()
117 {
118 return proxy;
119 }
120
121 public ClientConnectionFactory getClientConnectionFactory()
122 {
123 return connectionFactory;
124 }
125
126 @Override
127 public String getScheme()
128 {
129 return origin.getScheme();
130 }
131
132 @Override
133 public String getHost()
134 {
135
136
137 return origin.getAddress().getHost();
138 }
139
140 @Override
141 public int getPort()
142 {
143 return origin.getAddress().getPort();
144 }
145
146 public Origin.Address getConnectAddress()
147 {
148 return proxy == null ? origin.getAddress() : proxy.getAddress();
149 }
150
151 public HttpField getHostField()
152 {
153 return hostField;
154 }
155
156 protected void send(HttpRequest request, List<Response.ResponseListener> listeners)
157 {
158 if (!getScheme().equals(request.getScheme()))
159 throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
160 if (!getHost().equals(request.getHost()))
161 throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
162 int port = request.getPort();
163 if (port >= 0 && getPort() != port)
164 throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
165
166 HttpExchange exchange = new HttpExchange(this, request, listeners);
167
168 if (client.isRunning())
169 {
170 if (exchanges.offer(exchange))
171 {
172 if (!client.isRunning() && exchanges.remove(exchange))
173 {
174 request.abort(new RejectedExecutionException(client + " is stopping"));
175 }
176 else
177 {
178 if (LOG.isDebugEnabled())
179 LOG.debug("Queued {} for {}", request, this);
180 requestNotifier.notifyQueued(request);
181 send();
182 }
183 }
184 else
185 {
186 if (LOG.isDebugEnabled())
187 LOG.debug("Max queue size {} exceeded by {} for {}", client.getMaxRequestsQueuedPerDestination(), request, this);
188 request.abort(new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
189 }
190 }
191 else
192 {
193 request.abort(new RejectedExecutionException(client + " is stopped"));
194 }
195 }
196
197 protected abstract void send();
198
199 public void newConnection(Promise<Connection> promise)
200 {
201 createConnection(promise);
202 }
203
204 protected void createConnection(Promise<Connection> promise)
205 {
206 client.newConnection(this, promise);
207 }
208
209 public boolean remove(HttpExchange exchange)
210 {
211 return exchanges.remove(exchange);
212 }
213
214 public void close()
215 {
216 abort(new AsynchronousCloseException());
217 if (LOG.isDebugEnabled())
218 LOG.debug("Closed {}", this);
219 }
220
221 public void release(Connection connection)
222 {
223 }
224
225 public void close(Connection connection)
226 {
227 }
228
229
230
231
232
233
234 public void abort(Throwable cause)
235 {
236 HttpExchange exchange;
237
238 while ((exchange = exchanges.peek()) != null)
239 exchange.getRequest().abort(cause);
240 }
241
242 @Override
243 public String dump()
244 {
245 return ContainerLifeCycle.dump(this);
246 }
247
248 @Override
249 public void dump(Appendable out, String indent) throws IOException
250 {
251 ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
252 }
253
254 public String asString()
255 {
256 return origin.asString();
257 }
258
259 @Override
260 public String toString()
261 {
262 return String.format("%s[%s]%s,queue=%d",
263 HttpDestination.class.getSimpleName(),
264 asString(),
265 proxy == null ? "" : "(via " + proxy + ")",
266 exchanges.size());
267 }
268 }