View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2.client;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.SocketChannel;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.Executor;
30  
31  import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
32  import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
33  import org.eclipse.jetty.http2.FlowControlStrategy;
34  import org.eclipse.jetty.http2.api.Session;
35  import org.eclipse.jetty.io.ByteBufferPool;
36  import org.eclipse.jetty.io.ClientConnectionFactory;
37  import org.eclipse.jetty.io.Connection;
38  import org.eclipse.jetty.io.EndPoint;
39  import org.eclipse.jetty.io.ManagedSelector;
40  import org.eclipse.jetty.io.MappedByteBufferPool;
41  import org.eclipse.jetty.io.SelectChannelEndPoint;
42  import org.eclipse.jetty.io.SelectorManager;
43  import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
44  import org.eclipse.jetty.util.Promise;
45  import org.eclipse.jetty.util.annotation.ManagedAttribute;
46  import org.eclipse.jetty.util.annotation.ManagedObject;
47  import org.eclipse.jetty.util.component.ContainerLifeCycle;
48  import org.eclipse.jetty.util.ssl.SslContextFactory;
49  import org.eclipse.jetty.util.thread.ExecutionStrategy;
50  import org.eclipse.jetty.util.thread.QueuedThreadPool;
51  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
52  import org.eclipse.jetty.util.thread.Scheduler;
53  import org.eclipse.jetty.util.thread.strategy.ProduceConsume;
54  
55  /**
56   * <p>{@link HTTP2Client} provides an asynchronous, non-blocking implementation
57   * to send HTTP/2 frames to a server.</p>
58   * <p>Typical usage:</p>
59   * <pre>
60   * // Create and start HTTP2Client.
61   * HTTP2Client client = new HTTP2Client();
62   * SslContextFactory sslContextFactory = new SslContextFactory();
63   * client.addBean(sslContextFactory);
64   * client.start();
65   *
66   * // Connect to host.
67   * String host = "webtide.com";
68   * int port = 443;
69   *
70   * FuturePromise&lt;Session&gt; sessionPromise = new FuturePromise&lt;&gt;();
71   * client.connect(sslContextFactory, new InetSocketAddress(host, port), new ServerSessionListener.Adapter(), sessionPromise);
72   *
73   * // Obtain the client Session object.
74   * Session session = sessionPromise.get(5, TimeUnit.SECONDS);
75   *
76   * // Prepare the HTTP request headers.
77   * HttpFields requestFields = new HttpFields();
78   * requestFields.put("User-Agent", client.getClass().getName() + "/" + Jetty.VERSION);
79   * // Prepare the HTTP request object.
80   * MetaData.Request request = new MetaData.Request("PUT", new HttpURI("https://" + host + ":" + port + "/"), HttpVersion.HTTP_2, requestFields);
81   * // Create the HTTP/2 HEADERS frame representing the HTTP request.
82   * HeadersFrame headersFrame = new HeadersFrame(request, null, false);
83   *
84   * // Prepare the listener to receive the HTTP response frames.
85   * Stream.Listener responseListener = new new Stream.Listener.Adapter()
86   * {
87   *      &#64;Override
88   *      public void onHeaders(Stream stream, HeadersFrame frame)
89   *      {
90   *          System.err.println(frame);
91   *      }
92   *
93   *      &#64;Override
94   *      public void onData(Stream stream, DataFrame frame, Callback callback)
95   *      {
96   *          System.err.println(frame);
97   *          callback.succeeded();
98   *      }
99   * };
100  *
101  * // Send the HEADERS frame to create a stream.
102  * FuturePromise&lt;Stream&gt; streamPromise = new FuturePromise&lt;&gt;();
103  * session.newStream(headersFrame, streamPromise, responseListener);
104  * Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
105  *
106  * // Use the Stream object to send request content, if any, using a DATA frame.
107  * ByteBuffer content = ...;
108  * DataFrame requestContent = new DataFrame(stream.getId(), content, true);
109  * stream.data(requestContent, Callback.Adapter.INSTANCE);
110  *
111  * // When done, stop the client.
112  * client.stop();
113  * </pre>
114  */
115 @ManagedObject
116 public class HTTP2Client extends ContainerLifeCycle
117 {
118     private Executor executor;
119     private Scheduler scheduler;
120     private ByteBufferPool bufferPool;
121     private ClientConnectionFactory connectionFactory;
122     private SelectorManager selector;
123     private int selectors = 1;
124     private long idleTimeout = 30000;
125     private long connectTimeout = 10000;
126     private int inputBufferSize = 8192;
127     private List<String> protocols = Arrays.asList("h2", "h2-17", "h2-16", "h2-15", "h2-14");
128     private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
129     private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
130     private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
131     private ExecutionStrategy.Factory executionStrategyFactory = new ProduceConsume.Factory();
132 
133     @Override
134     protected void doStart() throws Exception
135     {
136         if (executor == null)
137             setExecutor(new QueuedThreadPool());
138 
139         if (scheduler == null)
140             setScheduler(new ScheduledExecutorScheduler());
141 
142         if (bufferPool == null)
143             setByteBufferPool(new MappedByteBufferPool());
144 
145         if (connectionFactory == null)
146         {
147             HTTP2ClientConnectionFactory h2 = new HTTP2ClientConnectionFactory();
148             setClientConnectionFactory((endPoint, context) ->
149             {
150                 ClientConnectionFactory factory = h2;
151                 SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
152                 if (sslContextFactory != null)
153                 {
154                     ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), h2, getProtocols());
155                     factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), alpn);
156                 }
157                 return factory.newConnection(endPoint, context);
158             });
159         }
160 
161         if (selector == null)
162         {
163             selector = newSelectorManager();
164             addBean(selector);
165         }
166         selector.setConnectTimeout(getConnectTimeout());
167 
168         super.doStart();
169     }
170 
171     protected SelectorManager newSelectorManager()
172     {
173         return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
174     }
175 
176     public Executor getExecutor()
177     {
178         return executor;
179     }
180 
181     public void setExecutor(Executor executor)
182     {
183         this.updateBean(this.executor, executor);
184         this.executor = executor;
185     }
186 
187     public Scheduler getScheduler()
188     {
189         return scheduler;
190     }
191 
192     public void setScheduler(Scheduler scheduler)
193     {
194         this.updateBean(this.scheduler, scheduler);
195         this.scheduler = scheduler;
196     }
197 
198     public ByteBufferPool getByteBufferPool()
199     {
200         return bufferPool;
201     }
202 
203     public void setByteBufferPool(ByteBufferPool bufferPool)
204     {
205         this.updateBean(this.bufferPool, bufferPool);
206         this.bufferPool = bufferPool;
207     }
208 
209     public ClientConnectionFactory getClientConnectionFactory()
210     {
211         return connectionFactory;
212     }
213 
214     public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
215     {
216         this.updateBean(this.connectionFactory, connectionFactory);
217         this.connectionFactory = connectionFactory;
218     }
219 
220     public FlowControlStrategy.Factory getFlowControlStrategyFactory()
221     {
222         return flowControlStrategyFactory;
223     }
224 
225     public void setFlowControlStrategyFactory(FlowControlStrategy.Factory flowControlStrategyFactory)
226     {
227         this.flowControlStrategyFactory = flowControlStrategyFactory;
228     }
229 
230     public ExecutionStrategy.Factory getExecutionStrategyFactory()
231     {
232         return executionStrategyFactory;
233     }
234 
235     public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
236     {
237         this.executionStrategyFactory = executionStrategyFactory;
238     }
239 
240     @ManagedAttribute("The number of selectors")
241     public int getSelectors()
242     {
243         return selectors;
244     }
245 
246     public void setSelectors(int selectors)
247     {
248         this.selectors = selectors;
249     }
250 
251     @ManagedAttribute("The idle timeout in milliseconds")
252     public long getIdleTimeout()
253     {
254         return idleTimeout;
255     }
256 
257     public void setIdleTimeout(long idleTimeout)
258     {
259         this.idleTimeout = idleTimeout;
260     }
261 
262     @ManagedAttribute("The connect timeout in milliseconds")
263     public long getConnectTimeout()
264     {
265         return connectTimeout;
266     }
267 
268     public void setConnectTimeout(long connectTimeout)
269     {
270         this.connectTimeout = connectTimeout;
271         SelectorManager selector = this.selector;
272         if (selector != null)
273             selector.setConnectTimeout(connectTimeout);
274     }
275 
276     @ManagedAttribute("The size of the buffer used to read from the network")
277     public int getInputBufferSize()
278     {
279         return inputBufferSize;
280     }
281 
282     public void setInputBufferSize(int inputBufferSize)
283     {
284         this.inputBufferSize = inputBufferSize;
285     }
286 
287     @ManagedAttribute("The ALPN protocol list")
288     public List<String> getProtocols()
289     {
290         return protocols;
291     }
292 
293     public void setProtocols(List<String> protocols)
294     {
295         this.protocols = protocols;
296     }
297 
298     @ManagedAttribute("The initial size of session's flow control receive window")
299     public int getInitialSessionRecvWindow()
300     {
301         return initialSessionRecvWindow;
302     }
303 
304     public void setInitialSessionRecvWindow(int initialSessionRecvWindow)
305     {
306         this.initialSessionRecvWindow = initialSessionRecvWindow;
307     }
308 
309     @ManagedAttribute("The initial size of stream's flow control receive window")
310     public int getInitialStreamRecvWindow()
311     {
312         return initialStreamRecvWindow;
313     }
314 
315     public void setInitialStreamRecvWindow(int initialStreamRecvWindow)
316     {
317         this.initialStreamRecvWindow = initialStreamRecvWindow;
318     }
319 
320     public void connect(InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
321     {
322         connect(null, address, listener, promise);
323     }
324 
325     public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
326     {
327         connect(sslContextFactory, address, listener, promise, null);
328     }
329 
330     public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
331     {
332         try
333         {
334             SocketChannel channel = SocketChannel.open();
335             configure(channel);
336             channel.configureBlocking(false);
337             context = contextFrom(sslContextFactory, address, listener, promise, context);
338             if (channel.connect(address))
339                 selector.accept(channel, context);
340             else
341                 selector.connect(channel, context);
342         }
343         catch (Throwable x)
344         {
345             promise.failed(x);
346         }
347     }
348 
349     public void accept(SslContextFactory sslContextFactory, SocketChannel channel, Session.Listener listener, Promise<Session> promise)
350     {
351         try
352         {
353             if (!channel.isConnected())
354                 throw new IllegalStateException("SocketChannel must be connected");
355             channel.configureBlocking(false);
356             Map<String, Object> context = contextFrom(sslContextFactory, (InetSocketAddress)channel.getRemoteAddress(), listener, promise, null);
357             selector.accept(channel, context);
358         }
359         catch (Throwable x)
360         {
361             promise.failed(x);
362         }
363     }
364 
365     private Map<String, Object> contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
366     {
367         if (context == null)
368             context = new HashMap<>();
369         context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
370         context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
371         context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
372         if (sslContextFactory != null)
373             context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
374         context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
375         context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());
376         context.putIfAbsent(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY, this);
377         return context;
378     }
379 
380     protected void configure(SocketChannel channel) throws IOException
381     {
382         channel.socket().setTcpNoDelay(true);
383     }
384 
385     private class ClientSelectorManager extends SelectorManager
386     {
387         private ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
388         {
389             super(executor, scheduler, selectors);
390         }
391 
392         @Override
393         protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
394         {
395             return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout());
396         }
397 
398         @Override
399         public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
400         {
401             @SuppressWarnings("unchecked")
402             Map<String, Object> context = (Map<String, Object>)attachment;
403             context.put(HTTP2ClientConnectionFactory.BYTE_BUFFER_POOL_CONTEXT_KEY, getByteBufferPool());
404             context.put(HTTP2ClientConnectionFactory.EXECUTOR_CONTEXT_KEY, getExecutor());
405             context.put(HTTP2ClientConnectionFactory.SCHEDULER_CONTEXT_KEY, getScheduler());
406             return getClientConnectionFactory().newConnection(endpoint, context);
407         }
408 
409         @Override
410         protected void connectionFailed(SocketChannel channel, Throwable failure, Object attachment)
411         {
412             @SuppressWarnings("unchecked")
413             Map<String, Object> context = (Map<String, Object>)attachment;
414             if (LOG.isDebugEnabled())
415             {
416                 Object host = context.get(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY);
417                 Object port = context.get(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY);
418                 LOG.debug("Could not connect to {}:{}", host, port);
419             }
420             @SuppressWarnings("unchecked")
421             Promise<Session> promise = (Promise<Session>)context.get(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY);
422             promise.failed(failure);
423         }
424     }
425 }