1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2.client;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.SocketChannel;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.Executor;
30
31 import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
32 import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
33 import org.eclipse.jetty.http2.FlowControlStrategy;
34 import org.eclipse.jetty.http2.api.Session;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.ClientConnectionFactory;
37 import org.eclipse.jetty.io.Connection;
38 import org.eclipse.jetty.io.EndPoint;
39 import org.eclipse.jetty.io.ManagedSelector;
40 import org.eclipse.jetty.io.MappedByteBufferPool;
41 import org.eclipse.jetty.io.SelectChannelEndPoint;
42 import org.eclipse.jetty.io.SelectorManager;
43 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
44 import org.eclipse.jetty.util.Promise;
45 import org.eclipse.jetty.util.annotation.ManagedAttribute;
46 import org.eclipse.jetty.util.annotation.ManagedObject;
47 import org.eclipse.jetty.util.component.ContainerLifeCycle;
48 import org.eclipse.jetty.util.ssl.SslContextFactory;
49 import org.eclipse.jetty.util.thread.ExecutionStrategy;
50 import org.eclipse.jetty.util.thread.QueuedThreadPool;
51 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
52 import org.eclipse.jetty.util.thread.Scheduler;
53 import org.eclipse.jetty.util.thread.strategy.ProduceConsume;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 @ManagedObject
116 public class HTTP2Client extends ContainerLifeCycle
117 {
118 private Executor executor;
119 private Scheduler scheduler;
120 private ByteBufferPool bufferPool;
121 private ClientConnectionFactory connectionFactory;
122 private SelectorManager selector;
123 private int selectors = 1;
124 private long idleTimeout = 30000;
125 private long connectTimeout = 10000;
126 private int inputBufferSize = 8192;
127 private List<String> protocols = Arrays.asList("h2", "h2-17", "h2-16", "h2-15", "h2-14");
128 private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
129 private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
130 private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
131 private ExecutionStrategy.Factory executionStrategyFactory = new ProduceConsume.Factory();
132
133 @Override
134 protected void doStart() throws Exception
135 {
136 if (executor == null)
137 setExecutor(new QueuedThreadPool());
138
139 if (scheduler == null)
140 setScheduler(new ScheduledExecutorScheduler());
141
142 if (bufferPool == null)
143 setByteBufferPool(new MappedByteBufferPool());
144
145 if (connectionFactory == null)
146 {
147 HTTP2ClientConnectionFactory h2 = new HTTP2ClientConnectionFactory();
148 setClientConnectionFactory((endPoint, context) ->
149 {
150 ClientConnectionFactory factory = h2;
151 SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
152 if (sslContextFactory != null)
153 {
154 ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), h2, getProtocols());
155 factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), alpn);
156 }
157 return factory.newConnection(endPoint, context);
158 });
159 }
160
161 if (selector == null)
162 {
163 selector = newSelectorManager();
164 addBean(selector);
165 }
166 selector.setConnectTimeout(getConnectTimeout());
167
168 super.doStart();
169 }
170
171 protected SelectorManager newSelectorManager()
172 {
173 return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
174 }
175
176 public Executor getExecutor()
177 {
178 return executor;
179 }
180
181 public void setExecutor(Executor executor)
182 {
183 this.updateBean(this.executor, executor);
184 this.executor = executor;
185 }
186
187 public Scheduler getScheduler()
188 {
189 return scheduler;
190 }
191
192 public void setScheduler(Scheduler scheduler)
193 {
194 this.updateBean(this.scheduler, scheduler);
195 this.scheduler = scheduler;
196 }
197
198 public ByteBufferPool getByteBufferPool()
199 {
200 return bufferPool;
201 }
202
203 public void setByteBufferPool(ByteBufferPool bufferPool)
204 {
205 this.updateBean(this.bufferPool, bufferPool);
206 this.bufferPool = bufferPool;
207 }
208
209 public ClientConnectionFactory getClientConnectionFactory()
210 {
211 return connectionFactory;
212 }
213
214 public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
215 {
216 this.updateBean(this.connectionFactory, connectionFactory);
217 this.connectionFactory = connectionFactory;
218 }
219
220 public FlowControlStrategy.Factory getFlowControlStrategyFactory()
221 {
222 return flowControlStrategyFactory;
223 }
224
225 public void setFlowControlStrategyFactory(FlowControlStrategy.Factory flowControlStrategyFactory)
226 {
227 this.flowControlStrategyFactory = flowControlStrategyFactory;
228 }
229
230 public ExecutionStrategy.Factory getExecutionStrategyFactory()
231 {
232 return executionStrategyFactory;
233 }
234
235 public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
236 {
237 this.executionStrategyFactory = executionStrategyFactory;
238 }
239
240 @ManagedAttribute("The number of selectors")
241 public int getSelectors()
242 {
243 return selectors;
244 }
245
246 public void setSelectors(int selectors)
247 {
248 this.selectors = selectors;
249 }
250
251 @ManagedAttribute("The idle timeout in milliseconds")
252 public long getIdleTimeout()
253 {
254 return idleTimeout;
255 }
256
257 public void setIdleTimeout(long idleTimeout)
258 {
259 this.idleTimeout = idleTimeout;
260 }
261
262 @ManagedAttribute("The connect timeout in milliseconds")
263 public long getConnectTimeout()
264 {
265 return connectTimeout;
266 }
267
268 public void setConnectTimeout(long connectTimeout)
269 {
270 this.connectTimeout = connectTimeout;
271 SelectorManager selector = this.selector;
272 if (selector != null)
273 selector.setConnectTimeout(connectTimeout);
274 }
275
276 @ManagedAttribute("The size of the buffer used to read from the network")
277 public int getInputBufferSize()
278 {
279 return inputBufferSize;
280 }
281
282 public void setInputBufferSize(int inputBufferSize)
283 {
284 this.inputBufferSize = inputBufferSize;
285 }
286
287 @ManagedAttribute("The ALPN protocol list")
288 public List<String> getProtocols()
289 {
290 return protocols;
291 }
292
293 public void setProtocols(List<String> protocols)
294 {
295 this.protocols = protocols;
296 }
297
298 @ManagedAttribute("The initial size of session's flow control receive window")
299 public int getInitialSessionRecvWindow()
300 {
301 return initialSessionRecvWindow;
302 }
303
304 public void setInitialSessionRecvWindow(int initialSessionRecvWindow)
305 {
306 this.initialSessionRecvWindow = initialSessionRecvWindow;
307 }
308
309 @ManagedAttribute("The initial size of stream's flow control receive window")
310 public int getInitialStreamRecvWindow()
311 {
312 return initialStreamRecvWindow;
313 }
314
315 public void setInitialStreamRecvWindow(int initialStreamRecvWindow)
316 {
317 this.initialStreamRecvWindow = initialStreamRecvWindow;
318 }
319
320 public void connect(InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
321 {
322 connect(null, address, listener, promise);
323 }
324
325 public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
326 {
327 connect(sslContextFactory, address, listener, promise, null);
328 }
329
330 public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
331 {
332 try
333 {
334 SocketChannel channel = SocketChannel.open();
335 configure(channel);
336 channel.configureBlocking(false);
337 context = contextFrom(sslContextFactory, address, listener, promise, context);
338 if (channel.connect(address))
339 selector.accept(channel, context);
340 else
341 selector.connect(channel, context);
342 }
343 catch (Throwable x)
344 {
345 promise.failed(x);
346 }
347 }
348
349 public void accept(SslContextFactory sslContextFactory, SocketChannel channel, Session.Listener listener, Promise<Session> promise)
350 {
351 try
352 {
353 if (!channel.isConnected())
354 throw new IllegalStateException("SocketChannel must be connected");
355 channel.configureBlocking(false);
356 Map<String, Object> context = contextFrom(sslContextFactory, (InetSocketAddress)channel.getRemoteAddress(), listener, promise, null);
357 selector.accept(channel, context);
358 }
359 catch (Throwable x)
360 {
361 promise.failed(x);
362 }
363 }
364
365 private Map<String, Object> contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
366 {
367 if (context == null)
368 context = new HashMap<>();
369 context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
370 context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
371 context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
372 if (sslContextFactory != null)
373 context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
374 context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
375 context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());
376 context.putIfAbsent(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY, this);
377 return context;
378 }
379
380 protected void configure(SocketChannel channel) throws IOException
381 {
382 channel.socket().setTcpNoDelay(true);
383 }
384
385 private class ClientSelectorManager extends SelectorManager
386 {
387 private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
388 {
389 super(executor, scheduler, selectors);
390 }
391
392 @Override
393 protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
394 {
395 return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout());
396 }
397
398 @Override
399 public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
400 {
401 @SuppressWarnings("unchecked")
402 Map<String, Object> context = (Map<String, Object>)attachment;
403 context.put(HTTP2ClientConnectionFactory.BYTE_BUFFER_POOL_CONTEXT_KEY, getByteBufferPool());
404 context.put(HTTP2ClientConnectionFactory.EXECUTOR_CONTEXT_KEY, getExecutor());
405 context.put(HTTP2ClientConnectionFactory.SCHEDULER_CONTEXT_KEY, getScheduler());
406 return getClientConnectionFactory().newConnection(endpoint, context);
407 }
408
409 @Override
410 protected void connectionFailed(SocketChannel channel, Throwable failure, Object attachment)
411 {
412 @SuppressWarnings("unchecked")
413 Map<String, Object> context = (Map<String, Object>)attachment;
414 if (LOG.isDebugEnabled())
415 {
416 Object host = context.get(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY);
417 Object port = context.get(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY);
418 LOG.debug("Could not connect to {}:{}", host, port);
419 }
420 @SuppressWarnings("unchecked")
421 Promise<Session> promise = (Promise<Session>)context.get(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY);
422 promise.failed(failure);
423 }
424 }
425 }