View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2.client;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.SocketChannel;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.Executor;
30  
31  import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
32  import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
33  import org.eclipse.jetty.http2.FlowControlStrategy;
34  import org.eclipse.jetty.http2.api.Session;
35  import org.eclipse.jetty.io.ByteBufferPool;
36  import org.eclipse.jetty.io.ClientConnectionFactory;
37  import org.eclipse.jetty.io.Connection;
38  import org.eclipse.jetty.io.EndPoint;
39  import org.eclipse.jetty.io.ManagedSelector;
40  import org.eclipse.jetty.io.MappedByteBufferPool;
41  import org.eclipse.jetty.io.SelectChannelEndPoint;
42  import org.eclipse.jetty.io.SelectorManager;
43  import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
44  import org.eclipse.jetty.util.Promise;
45  import org.eclipse.jetty.util.annotation.ManagedAttribute;
46  import org.eclipse.jetty.util.annotation.ManagedObject;
47  import org.eclipse.jetty.util.component.ContainerLifeCycle;
48  import org.eclipse.jetty.util.ssl.SslContextFactory;
49  import org.eclipse.jetty.util.thread.QueuedThreadPool;
50  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
51  import org.eclipse.jetty.util.thread.Scheduler;
52  
53  /**
54   * <p>{@link HTTP2Client} provides an asynchronous, non-blocking implementation
55   * to send HTTP/2 frames to a server.</p>
56   * <p>Typical usage:</p>
57   * <pre>
58   * // Create and start HTTP2Client.
59   * HTTP2Client client = new HTTP2Client();
60   * SslContextFactory sslContextFactory = new SslContextFactory();
61   * client.addBean(sslContextFactory);
62   * client.start();
63   *
64   * // Connect to host.
65   * String host = "webtide.com";
66   * int port = 443;
67   *
68   * FuturePromise&lt;Session&gt; sessionPromise = new FuturePromise&lt;&gt;();
69   * client.connect(sslContextFactory, new InetSocketAddress(host, port), new ServerSessionListener.Adapter(), sessionPromise);
70   *
71   * // Obtain the client Session object.
72   * Session session = sessionPromise.get(5, TimeUnit.SECONDS);
73   *
74   * // Prepare the HTTP request headers.
75   * HttpFields requestFields = new HttpFields();
76   * requestFields.put("User-Agent", client.getClass().getName() + "/" + Jetty.VERSION);
77   * // Prepare the HTTP request object.
78   * MetaData.Request request = new MetaData.Request("PUT", new HttpURI("https://" + host + ":" + port + "/"), HttpVersion.HTTP_2, requestFields);
79   * // Create the HTTP/2 HEADERS frame representing the HTTP request.
80   * HeadersFrame headersFrame = new HeadersFrame(request, null, false);
81   *
82   * // Prepare the listener to receive the HTTP response frames.
83   * Stream.Listener responseListener = new new Stream.Listener.Adapter()
84   * {
85   *      &#64;Override
86   *      public void onHeaders(Stream stream, HeadersFrame frame)
87   *      {
88   *          System.err.println(frame);
89   *      }
90   *
91   *      &#64;Override
92   *      public void onData(Stream stream, DataFrame frame, Callback callback)
93   *      {
94   *          System.err.println(frame);
95   *          callback.succeeded();
96   *      }
97   * };
98   *
99   * // Send the HEADERS frame to create a stream.
100  * FuturePromise&lt;Stream&gt; streamPromise = new FuturePromise&lt;&gt;();
101  * session.newStream(headersFrame, streamPromise, responseListener);
102  * Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
103  *
104  * // Use the Stream object to send request content, if any, using a DATA frame.
105  * ByteBuffer content = ...;
106  * DataFrame requestContent = new DataFrame(stream.getId(), content, true);
107  * stream.data(requestContent, Callback.Adapter.INSTANCE);
108  *
109  * // When done, stop the client.
110  * client.stop();
111  * </pre>
112  */
113 @ManagedObject
114 public class HTTP2Client extends ContainerLifeCycle
115 {
116     private Executor executor;
117     private Scheduler scheduler;
118     private ByteBufferPool bufferPool;
119     private ClientConnectionFactory connectionFactory;
120     private SelectorManager selector;
121     private int selectors = 1;
122     private long idleTimeout = 30000;
123     private long connectTimeout = 10000;
124     private int inputBufferSize = 8192;
125     private List<String> protocols = Arrays.asList("h2", "h2-17", "h2-16", "h2-15", "h2-14");
126     private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
127     private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
128     private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
129 
130     @Override
131     protected void doStart() throws Exception
132     {
133         if (executor == null)
134             setExecutor(new QueuedThreadPool());
135 
136         if (scheduler == null)
137             setScheduler(new ScheduledExecutorScheduler());
138 
139         if (bufferPool == null)
140             setByteBufferPool(new MappedByteBufferPool());
141 
142         if (connectionFactory == null)
143         {
144             HTTP2ClientConnectionFactory h2 = new HTTP2ClientConnectionFactory();
145             setClientConnectionFactory((endPoint, context) ->
146             {
147                 ClientConnectionFactory factory = h2;
148                 SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
149                 if (sslContextFactory != null)
150                 {
151                     ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), h2, getProtocols());
152                     factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), alpn);
153                 }
154                 return factory.newConnection(endPoint, context);
155             });
156         }
157 
158         if (selector == null)
159         {
160             selector = newSelectorManager();
161             addBean(selector);
162         }
163         selector.setConnectTimeout(getConnectTimeout());
164 
165         super.doStart();
166     }
167 
168     protected SelectorManager newSelectorManager()
169     {
170         return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
171     }
172 
173     public Executor getExecutor()
174     {
175         return executor;
176     }
177 
178     public void setExecutor(Executor executor)
179     {
180         this.updateBean(this.executor, executor);
181         this.executor = executor;
182     }
183 
184     public Scheduler getScheduler()
185     {
186         return scheduler;
187     }
188 
189     public void setScheduler(Scheduler scheduler)
190     {
191         this.updateBean(this.scheduler, scheduler);
192         this.scheduler = scheduler;
193     }
194 
195     public ByteBufferPool getByteBufferPool()
196     {
197         return bufferPool;
198     }
199 
200     public void setByteBufferPool(ByteBufferPool bufferPool)
201     {
202         this.updateBean(this.bufferPool, bufferPool);
203         this.bufferPool = bufferPool;
204     }
205 
206     public ClientConnectionFactory getClientConnectionFactory()
207     {
208         return connectionFactory;
209     }
210 
211     public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
212     {
213         this.updateBean(this.connectionFactory, connectionFactory);
214         this.connectionFactory = connectionFactory;
215     }
216 
217     public FlowControlStrategy.Factory getFlowControlStrategyFactory()
218     {
219         return flowControlStrategyFactory;
220     }
221 
222     public void setFlowControlStrategyFactory(FlowControlStrategy.Factory flowControlStrategyFactory)
223     {
224         this.flowControlStrategyFactory = flowControlStrategyFactory;
225     }
226 
227     @ManagedAttribute("The number of selectors")
228     public int getSelectors()
229     {
230         return selectors;
231     }
232 
233     public void setSelectors(int selectors)
234     {
235         this.selectors = selectors;
236     }
237 
238     @ManagedAttribute("The idle timeout in milliseconds")
239     public long getIdleTimeout()
240     {
241         return idleTimeout;
242     }
243 
244     public void setIdleTimeout(long idleTimeout)
245     {
246         this.idleTimeout = idleTimeout;
247     }
248 
249     @ManagedAttribute("The connect timeout in milliseconds")
250     public long getConnectTimeout()
251     {
252         return connectTimeout;
253     }
254 
255     public void setConnectTimeout(long connectTimeout)
256     {
257         this.connectTimeout = connectTimeout;
258         SelectorManager selector = this.selector;
259         if (selector != null)
260             selector.setConnectTimeout(connectTimeout);
261     }
262 
263     @ManagedAttribute("The size of the buffer used to read from the network")
264     public int getInputBufferSize()
265     {
266         return inputBufferSize;
267     }
268 
269     public void setInputBufferSize(int inputBufferSize)
270     {
271         this.inputBufferSize = inputBufferSize;
272     }
273 
274     @ManagedAttribute("The ALPN protocol list")
275     public List<String> getProtocols()
276     {
277         return protocols;
278     }
279 
280     public void setProtocols(List<String> protocols)
281     {
282         this.protocols = protocols;
283     }
284 
285     @ManagedAttribute("The initial size of session's flow control receive window")
286     public int getInitialSessionRecvWindow()
287     {
288         return initialSessionRecvWindow;
289     }
290 
291     public void setInitialSessionRecvWindow(int initialSessionRecvWindow)
292     {
293         this.initialSessionRecvWindow = initialSessionRecvWindow;
294     }
295 
296     @ManagedAttribute("The initial size of stream's flow control receive window")
297     public int getInitialStreamRecvWindow()
298     {
299         return initialStreamRecvWindow;
300     }
301 
302     public void setInitialStreamRecvWindow(int initialStreamRecvWindow)
303     {
304         this.initialStreamRecvWindow = initialStreamRecvWindow;
305     }
306 
307     public void connect(InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
308     {
309         connect(null, address, listener, promise);
310     }
311 
312     public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
313     {
314         connect(sslContextFactory, address, listener, promise, null);
315     }
316 
317     public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
318     {
319         try
320         {
321             SocketChannel channel = SocketChannel.open();
322             configure(channel);
323             channel.configureBlocking(false);
324             context = contextFrom(sslContextFactory, address, listener, promise, context);
325             if (channel.connect(address))
326                 selector.accept(channel, context);
327             else
328                 selector.connect(channel, context);
329         }
330         catch (Throwable x)
331         {
332             promise.failed(x);
333         }
334     }
335 
336     public void accept(SslContextFactory sslContextFactory, SocketChannel channel, Session.Listener listener, Promise<Session> promise)
337     {
338         try
339         {
340             if (!channel.isConnected())
341                 throw new IllegalStateException("SocketChannel must be connected");
342             channel.configureBlocking(false);
343             Map<String, Object> context = contextFrom(sslContextFactory, (InetSocketAddress)channel.getRemoteAddress(), listener, promise, null);
344             selector.accept(channel, context);
345         }
346         catch (Throwable x)
347         {
348             promise.failed(x);
349         }
350     }
351 
352     private Map<String, Object> contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
353     {
354         if (context == null)
355             context = new HashMap<>();
356         context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
357         context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
358         context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
359         if (sslContextFactory != null)
360             context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
361         context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
362         context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());
363         return context;
364     }
365 
366     protected void configure(SocketChannel channel) throws IOException
367     {
368         channel.socket().setTcpNoDelay(true);
369     }
370 
371     private class ClientSelectorManager extends SelectorManager
372     {
373         private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
374         {
375             super(executor, scheduler, selectors);
376         }
377 
378         @Override
379         protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
380         {
381             return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout());
382         }
383 
384         @Override
385         public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
386         {
387             @SuppressWarnings("unchecked")
388             Map<String, Object> context = (Map<String, Object>)attachment;
389             context.put(HTTP2ClientConnectionFactory.BYTE_BUFFER_POOL_CONTEXT_KEY, getByteBufferPool());
390             context.put(HTTP2ClientConnectionFactory.EXECUTOR_CONTEXT_KEY, getExecutor());
391             context.put(HTTP2ClientConnectionFactory.SCHEDULER_CONTEXT_KEY, getScheduler());
392             return getClientConnectionFactory().newConnection(endpoint, context);
393         }
394 
395         @Override
396         protected void connectionFailed(SocketChannel channel, Throwable failure, Object attachment)
397         {
398             @SuppressWarnings("unchecked")
399             Map<String, Object> context = (Map<String, Object>)attachment;
400             if (LOG.isDebugEnabled())
401             {
402                 Object host = context.get(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY);
403                 Object port = context.get(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY);
404                 LOG.debug("Could not connect to {}:{}", host, port);
405             }
406             @SuppressWarnings("unchecked")
407             Promise<Session> promise = (Promise<Session>)context.get(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY);
408             promise.failed(failure);
409         }
410     }
411 }