1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2.client;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.SocketChannel;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.Executor;
30
31 import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
32 import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
33 import org.eclipse.jetty.http2.FlowControlStrategy;
34 import org.eclipse.jetty.http2.api.Session;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.ClientConnectionFactory;
37 import org.eclipse.jetty.io.Connection;
38 import org.eclipse.jetty.io.EndPoint;
39 import org.eclipse.jetty.io.ManagedSelector;
40 import org.eclipse.jetty.io.MappedByteBufferPool;
41 import org.eclipse.jetty.io.SelectChannelEndPoint;
42 import org.eclipse.jetty.io.SelectorManager;
43 import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
44 import org.eclipse.jetty.util.Promise;
45 import org.eclipse.jetty.util.annotation.ManagedAttribute;
46 import org.eclipse.jetty.util.annotation.ManagedObject;
47 import org.eclipse.jetty.util.component.ContainerLifeCycle;
48 import org.eclipse.jetty.util.ssl.SslContextFactory;
49 import org.eclipse.jetty.util.thread.QueuedThreadPool;
50 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
51 import org.eclipse.jetty.util.thread.Scheduler;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 @ManagedObject
114 public class HTTP2Client extends ContainerLifeCycle
115 {
116 private Executor executor;
117 private Scheduler scheduler;
118 private ByteBufferPool bufferPool;
119 private ClientConnectionFactory connectionFactory;
120 private SelectorManager selector;
121 private int selectors = 1;
122 private long idleTimeout = 30000;
123 private long connectTimeout = 10000;
124 private int inputBufferSize = 8192;
125 private List<String> protocols = Arrays.asList("h2", "h2-17", "h2-16", "h2-15", "h2-14");
126 private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
127 private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
128 private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
129
130 @Override
131 protected void doStart() throws Exception
132 {
133 if (executor == null)
134 setExecutor(new QueuedThreadPool());
135
136 if (scheduler == null)
137 setScheduler(new ScheduledExecutorScheduler());
138
139 if (bufferPool == null)
140 setByteBufferPool(new MappedByteBufferPool());
141
142 if (connectionFactory == null)
143 {
144 HTTP2ClientConnectionFactory h2 = new HTTP2ClientConnectionFactory();
145 setClientConnectionFactory((endPoint, context) ->
146 {
147 ClientConnectionFactory factory = h2;
148 SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
149 if (sslContextFactory != null)
150 {
151 ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), h2, getProtocols());
152 factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), alpn);
153 }
154 return factory.newConnection(endPoint, context);
155 });
156 }
157
158 if (selector == null)
159 {
160 selector = newSelectorManager();
161 addBean(selector);
162 }
163 selector.setConnectTimeout(getConnectTimeout());
164
165 super.doStart();
166 }
167
168 protected SelectorManager newSelectorManager()
169 {
170 return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
171 }
172
173 public Executor getExecutor()
174 {
175 return executor;
176 }
177
178 public void setExecutor(Executor executor)
179 {
180 this.updateBean(this.executor, executor);
181 this.executor = executor;
182 }
183
184 public Scheduler getScheduler()
185 {
186 return scheduler;
187 }
188
189 public void setScheduler(Scheduler scheduler)
190 {
191 this.updateBean(this.scheduler, scheduler);
192 this.scheduler = scheduler;
193 }
194
195 public ByteBufferPool getByteBufferPool()
196 {
197 return bufferPool;
198 }
199
200 public void setByteBufferPool(ByteBufferPool bufferPool)
201 {
202 this.updateBean(this.bufferPool, bufferPool);
203 this.bufferPool = bufferPool;
204 }
205
206 public ClientConnectionFactory getClientConnectionFactory()
207 {
208 return connectionFactory;
209 }
210
211 public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
212 {
213 this.updateBean(this.connectionFactory, connectionFactory);
214 this.connectionFactory = connectionFactory;
215 }
216
217 public FlowControlStrategy.Factory getFlowControlStrategyFactory()
218 {
219 return flowControlStrategyFactory;
220 }
221
222 public void setFlowControlStrategyFactory(FlowControlStrategy.Factory flowControlStrategyFactory)
223 {
224 this.flowControlStrategyFactory = flowControlStrategyFactory;
225 }
226
227 @ManagedAttribute("The number of selectors")
228 public int getSelectors()
229 {
230 return selectors;
231 }
232
233 public void setSelectors(int selectors)
234 {
235 this.selectors = selectors;
236 }
237
238 @ManagedAttribute("The idle timeout in milliseconds")
239 public long getIdleTimeout()
240 {
241 return idleTimeout;
242 }
243
244 public void setIdleTimeout(long idleTimeout)
245 {
246 this.idleTimeout = idleTimeout;
247 }
248
249 @ManagedAttribute("The connect timeout in milliseconds")
250 public long getConnectTimeout()
251 {
252 return connectTimeout;
253 }
254
255 public void setConnectTimeout(long connectTimeout)
256 {
257 this.connectTimeout = connectTimeout;
258 SelectorManager selector = this.selector;
259 if (selector != null)
260 selector.setConnectTimeout(connectTimeout);
261 }
262
263 @ManagedAttribute("The size of the buffer used to read from the network")
264 public int getInputBufferSize()
265 {
266 return inputBufferSize;
267 }
268
269 public void setInputBufferSize(int inputBufferSize)
270 {
271 this.inputBufferSize = inputBufferSize;
272 }
273
274 @ManagedAttribute("The ALPN protocol list")
275 public List<String> getProtocols()
276 {
277 return protocols;
278 }
279
280 public void setProtocols(List<String> protocols)
281 {
282 this.protocols = protocols;
283 }
284
285 @ManagedAttribute("The initial size of session's flow control receive window")
286 public int getInitialSessionRecvWindow()
287 {
288 return initialSessionRecvWindow;
289 }
290
291 public void setInitialSessionRecvWindow(int initialSessionRecvWindow)
292 {
293 this.initialSessionRecvWindow = initialSessionRecvWindow;
294 }
295
296 @ManagedAttribute("The initial size of stream's flow control receive window")
297 public int getInitialStreamRecvWindow()
298 {
299 return initialStreamRecvWindow;
300 }
301
302 public void setInitialStreamRecvWindow(int initialStreamRecvWindow)
303 {
304 this.initialStreamRecvWindow = initialStreamRecvWindow;
305 }
306
307 public void connect(InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
308 {
309 connect(null, address, listener, promise);
310 }
311
312 public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
313 {
314 connect(sslContextFactory, address, listener, promise, null);
315 }
316
317 public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
318 {
319 try
320 {
321 SocketChannel channel = SocketChannel.open();
322 configure(channel);
323 channel.configureBlocking(false);
324 context = contextFrom(sslContextFactory, address, listener, promise, context);
325 if (channel.connect(address))
326 selector.accept(channel, context);
327 else
328 selector.connect(channel, context);
329 }
330 catch (Throwable x)
331 {
332 promise.failed(x);
333 }
334 }
335
336 public void accept(SslContextFactory sslContextFactory, SocketChannel channel, Session.Listener listener, Promise<Session> promise)
337 {
338 try
339 {
340 if (!channel.isConnected())
341 throw new IllegalStateException("SocketChannel must be connected");
342 channel.configureBlocking(false);
343 Map<String, Object> context = contextFrom(sslContextFactory, (InetSocketAddress)channel.getRemoteAddress(), listener, promise, null);
344 selector.accept(channel, context);
345 }
346 catch (Throwable x)
347 {
348 promise.failed(x);
349 }
350 }
351
352 private Map<String, Object> contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
353 {
354 if (context == null)
355 context = new HashMap<>();
356 context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
357 context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
358 context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
359 if (sslContextFactory != null)
360 context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
361 context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
362 context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());
363 return context;
364 }
365
366 protected void configure(SocketChannel channel) throws IOException
367 {
368 channel.socket().setTcpNoDelay(true);
369 }
370
371 private class ClientSelectorManager extends SelectorManager
372 {
373 private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
374 {
375 super(executor, scheduler, selectors);
376 }
377
378 @Override
379 protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
380 {
381 return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout());
382 }
383
384 @Override
385 public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
386 {
387 @SuppressWarnings("unchecked")
388 Map<String, Object> context = (Map<String, Object>)attachment;
389 context.put(HTTP2ClientConnectionFactory.BYTE_BUFFER_POOL_CONTEXT_KEY, getByteBufferPool());
390 context.put(HTTP2ClientConnectionFactory.EXECUTOR_CONTEXT_KEY, getExecutor());
391 context.put(HTTP2ClientConnectionFactory.SCHEDULER_CONTEXT_KEY, getScheduler());
392 return getClientConnectionFactory().newConnection(endpoint, context);
393 }
394
395 @Override
396 protected void connectionFailed(SocketChannel channel, Throwable failure, Object attachment)
397 {
398 @SuppressWarnings("unchecked")
399 Map<String, Object> context = (Map<String, Object>)attachment;
400 if (LOG.isDebugEnabled())
401 {
402 Object host = context.get(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY);
403 Object port = context.get(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY);
404 LOG.debug("Could not connect to {}:{}", host, port);
405 }
406 @SuppressWarnings("unchecked")
407 Promise<Session> promise = (Promise<Session>)context.get(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY);
408 promise.failed(failure);
409 }
410 }
411 }