1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.concurrent.RejectedExecutionException;
27
28 import org.eclipse.jetty.client.api.Connection;
29 import org.eclipse.jetty.client.api.Destination;
30 import org.eclipse.jetty.client.api.Request;
31 import org.eclipse.jetty.client.api.Response;
32 import org.eclipse.jetty.client.api.Result;
33 import org.eclipse.jetty.http.HttpField;
34 import org.eclipse.jetty.http.HttpHeader;
35 import org.eclipse.jetty.http.HttpScheme;
36 import org.eclipse.jetty.io.ClientConnectionFactory;
37 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
38 import org.eclipse.jetty.util.BlockingArrayQueue;
39 import org.eclipse.jetty.util.Promise;
40 import org.eclipse.jetty.util.component.ContainerLifeCycle;
41 import org.eclipse.jetty.util.component.Dumpable;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44
45 public abstract class HttpDestination implements Destination, Closeable, Dumpable
46 {
47 protected static final Logger LOG = Log.getLogger(HttpDestination.class);
48
49 private final HttpClient client;
50 private final Origin origin;
51 private final Queue<HttpExchange> exchanges;
52 private final RequestNotifier requestNotifier;
53 private final ResponseNotifier responseNotifier;
54 private final ProxyConfiguration.Proxy proxy;
55 private final ClientConnectionFactory connectionFactory;
56 private final HttpField hostField;
57
58 public HttpDestination(HttpClient client, Origin origin)
59 {
60 this.client = client;
61 this.origin = origin;
62
63 this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
64
65 this.requestNotifier = new RequestNotifier(client);
66 this.responseNotifier = new ResponseNotifier(client);
67
68 ProxyConfiguration proxyConfig = client.getProxyConfiguration();
69 proxy = proxyConfig.match(origin);
70 ClientConnectionFactory connectionFactory = client.getTransport();
71 if (proxy != null)
72 {
73 connectionFactory = proxy.newClientConnectionFactory(connectionFactory);
74 }
75 else
76 {
77 if (HttpScheme.HTTPS.is(getScheme()))
78 connectionFactory = newSslClientConnectionFactory(connectionFactory);
79 }
80 this.connectionFactory = connectionFactory;
81
82 String host = getHost();
83 if (!client.isDefaultPort(getScheme(), getPort()))
84 host += ":" + getPort();
85 hostField = new HttpField(HttpHeader.HOST, host);
86 }
87
88 protected ClientConnectionFactory newSslClientConnectionFactory(ClientConnectionFactory connectionFactory)
89 {
90 return new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
91 }
92
93 public HttpClient getHttpClient()
94 {
95 return client;
96 }
97
98 public Origin getOrigin()
99 {
100 return origin;
101 }
102
103 public Queue<HttpExchange> getHttpExchanges()
104 {
105 return exchanges;
106 }
107
108 public RequestNotifier getRequestNotifier()
109 {
110 return requestNotifier;
111 }
112
113 public ResponseNotifier getResponseNotifier()
114 {
115 return responseNotifier;
116 }
117
118 public ProxyConfiguration.Proxy getProxy()
119 {
120 return proxy;
121 }
122
123 public ClientConnectionFactory getClientConnectionFactory()
124 {
125 return connectionFactory;
126 }
127
128 @Override
129 public String getScheme()
130 {
131 return origin.getScheme();
132 }
133
134 @Override
135 public String getHost()
136 {
137
138
139 return origin.getAddress().getHost();
140 }
141
142 @Override
143 public int getPort()
144 {
145 return origin.getAddress().getPort();
146 }
147
148 public Origin.Address getConnectAddress()
149 {
150 return proxy == null ? origin.getAddress() : proxy.getAddress();
151 }
152
153 public HttpField getHostField()
154 {
155 return hostField;
156 }
157
158 protected void send(Request request, List<Response.ResponseListener> listeners)
159 {
160 if (!getScheme().equals(request.getScheme()))
161 throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
162 if (!getHost().equals(request.getHost()))
163 throw new IllegalArgumentException("Invalid request host " + request.getHost() + " for destination " + this);
164 int port = request.getPort();
165 if (port >= 0 && getPort() != port)
166 throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
167
168 HttpConversation conversation = client.getConversation(request.getConversationID(), true);
169 HttpExchange exchange = new HttpExchange(conversation, this, request, listeners);
170
171 if (client.isRunning())
172 {
173 if (exchanges.offer(exchange))
174 {
175 if (!client.isRunning() && exchanges.remove(exchange))
176 {
177 throw new RejectedExecutionException(client + " is stopping");
178 }
179 else
180 {
181 LOG.debug("Queued {}", request);
182 requestNotifier.notifyQueued(request);
183 send();
184 }
185 }
186 else
187 {
188 LOG.debug("Max queued exceeded {}", request);
189 abort(exchange, new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
190 }
191 }
192 else
193 {
194 throw new RejectedExecutionException(client + " is stopped");
195 }
196 }
197
198 protected abstract void send();
199
200 public void newConnection(Promise<Connection> promise)
201 {
202 createConnection(promise);
203 }
204
205 protected void createConnection(Promise<Connection> promise)
206 {
207 client.newConnection(this, promise);
208 }
209
210 public boolean remove(HttpExchange exchange)
211 {
212 return exchanges.remove(exchange);
213 }
214
215 public void close()
216 {
217 abort(new AsynchronousCloseException());
218 LOG.debug("Closed {}", this);
219 }
220
221 public void close(Connection connection)
222 {
223 }
224
225
226
227
228
229
230
231 public void abort(Throwable cause)
232 {
233 HttpExchange exchange;
234 while ((exchange = exchanges.poll()) != null)
235 abort(exchange, cause);
236 }
237
238
239
240
241
242
243
244 protected void abort(HttpExchange exchange, Throwable cause)
245 {
246 Request request = exchange.getRequest();
247 HttpResponse response = exchange.getResponse();
248 getRequestNotifier().notifyFailure(request, cause);
249 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
250 getResponseNotifier().notifyFailure(listeners, response, cause);
251 getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
252 }
253
254 @Override
255 public String dump()
256 {
257 return ContainerLifeCycle.dump(this);
258 }
259
260 @Override
261 public void dump(Appendable out, String indent) throws IOException
262 {
263 ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
264 }
265
266 public String asString()
267 {
268 return origin.asString();
269 }
270
271 @Override
272 public String toString()
273 {
274 return String.format("%s(%s)%s",
275 HttpDestination.class.getSimpleName(),
276 asString(),
277 proxy == null ? "" : " via " + proxy);
278 }
279 }