View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy.client;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.SocketChannel;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Queue;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import java.util.concurrent.ExecutionException;
34  import java.util.concurrent.Executor;
35  
36  import org.eclipse.jetty.io.ByteBufferPool;
37  import org.eclipse.jetty.io.ClientConnectionFactory;
38  import org.eclipse.jetty.io.Connection;
39  import org.eclipse.jetty.io.EndPoint;
40  import org.eclipse.jetty.io.MappedByteBufferPool;
41  import org.eclipse.jetty.io.SelectChannelEndPoint;
42  import org.eclipse.jetty.io.SelectorManager;
43  import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
44  import org.eclipse.jetty.spdy.FlowControlStrategy;
45  import org.eclipse.jetty.spdy.api.GoAwayInfo;
46  import org.eclipse.jetty.spdy.api.Session;
47  import org.eclipse.jetty.spdy.api.SessionFrameListener;
48  import org.eclipse.jetty.util.Callback;
49  import org.eclipse.jetty.util.FuturePromise;
50  import org.eclipse.jetty.util.Promise;
51  import org.eclipse.jetty.util.component.ContainerLifeCycle;
52  import org.eclipse.jetty.util.ssl.SslContextFactory;
53  import org.eclipse.jetty.util.thread.QueuedThreadPool;
54  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
55  import org.eclipse.jetty.util.thread.Scheduler;
56  
57  /**
58   * A {@link SPDYClient} allows applications to connect to one or more SPDY servers,
59   * obtaining {@link Session} objects that can be used to send/receive SPDY frames.
60   * <p/>
61   * {@link SPDYClient} instances are created through a {@link Factory}:
62   * <pre>
63   * SPDYClient.Factory factory = new SPDYClient.Factory();
64   * SPDYClient client = factory.newSPDYClient(SPDY.V3);
65   * </pre>
66   * and then used to connect to the server:
67   * <pre>
68   * FuturePromise&lt;Session&gt; promise = new FuturePromise&lt;&gt;();
69   * client.connect("server.com", null, promise);
70   * Session session = promise.get();
71   * </pre>
72   */
73  public class SPDYClient
74  {
75      private final short version;
76      private final Factory factory;
77      private volatile SocketAddress bindAddress;
78      private volatile long idleTimeout = -1;
79      private volatile int initialWindowSize;
80      private volatile boolean dispatchIO;
81      private volatile ClientConnectionFactory connectionFactory;
82  
83      protected SPDYClient(short version, Factory factory)
84      {
85          this.version = version;
86          this.factory = factory;
87          setInitialWindowSize(65536);
88          setDispatchIO(true);
89          ClientConnectionFactory connectionFactory = new SPDYClientConnectionFactory();
90          if (factory.sslContextFactory != null)
91              connectionFactory = new SslClientConnectionFactory(factory.getSslContextFactory(), factory.getByteBufferPool(), factory.getExecutor(), new NPNClientConnectionFactory(this, connectionFactory));
92          setClientConnectionFactory(connectionFactory);
93      }
94  
95      public short getVersion()
96      {
97          return version;
98      }
99  
100     public Factory getFactory()
101     {
102         return factory;
103     }
104 
105     /**
106      * Equivalent to:
107      * <pre>
108      * Future&lt;Session&gt; promise = new FuturePromise&lt;&gt;();
109      * connect(address, listener, promise);
110      * </pre>
111      *
112      * @param address  the address to connect to
113      * @param listener the session listener that will be notified of session events
114      * @return a {@link Session} when connected
115      */
116     public Session connect(SocketAddress address, SessionFrameListener listener) throws ExecutionException, InterruptedException
117     {
118         FuturePromise<Session> promise = new FuturePromise<>();
119         connect(address, listener, promise);
120         return promise.get();
121     }
122 
123     /**
124      * Equivalent to:
125      * <pre>
126      * connect(address, listener, promise, null);
127      * </pre>
128      *
129      * @param address  the address to connect to
130      * @param listener the session listener that will be notified of session events
131      * @param promise  the promise notified of connection success/failure
132      */
133     public void connect(SocketAddress address, SessionFrameListener listener, Promise<Session> promise)
134     {
135         connect(address, listener, promise, new HashMap<String, Object>());
136     }
137 
138     /**
139      * Connects to the given {@code address}, binding the given {@code listener} to session events,
140      * and notified the given {@code promise} of the connect result.
141      * <p/>
142      * If the connect operation is successful, the {@code promise} will be invoked with the {@link Session}
143      * object that applications can use to perform SPDY requests.
144      *
145      * @param address  the address to connect to
146      * @param listener the session listener that will be notified of session events
147      * @param promise  the promise notified of connection success/failure
148      * @param context  a context object passed to the {@link #getClientConnectionFactory() ConnectionFactory}
149      *                 for the creation of the connection
150      */
151     public void connect(final SocketAddress address, final SessionFrameListener listener, final Promise<Session> promise, Map<String, Object> context)
152     {
153         if (!factory.isStarted())
154             throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
155 
156         try
157         {
158             SocketChannel channel = SocketChannel.open();
159             if (bindAddress != null)
160                 channel.bind(bindAddress);
161             configure(channel);
162             channel.configureBlocking(false);
163             channel.connect(address);
164 
165             context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, ((InetSocketAddress)address).getHostString());
166             context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, ((InetSocketAddress)address).getPort());
167             context.put(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY, this);
168             context.put(SPDYClientConnectionFactory.SPDY_SESSION_LISTENER_CONTEXT_KEY, listener);
169             context.put(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY, promise);
170 
171             factory.selector.connect(channel, context);
172         }
173         catch (IOException x)
174         {
175             promise.failed(x);
176         }
177     }
178 
179     protected void configure(SocketChannel channel) throws IOException
180     {
181         channel.socket().setTcpNoDelay(true);
182     }
183 
184     /**
185      * @return the address to bind the socket channel to
186      * @see #setBindAddress(SocketAddress)
187      */
188     public SocketAddress getBindAddress()
189     {
190         return bindAddress;
191     }
192 
193     /**
194      * @param bindAddress the address to bind the socket channel to
195      * @see #getBindAddress()
196      */
197     public void setBindAddress(SocketAddress bindAddress)
198     {
199         this.bindAddress = bindAddress;
200     }
201 
202     public long getIdleTimeout()
203     {
204         return idleTimeout;
205     }
206 
207     public void setIdleTimeout(long idleTimeout)
208     {
209         this.idleTimeout = idleTimeout;
210     }
211 
212     public int getInitialWindowSize()
213     {
214         return initialWindowSize;
215     }
216 
217     public void setInitialWindowSize(int initialWindowSize)
218     {
219         this.initialWindowSize = initialWindowSize;
220     }
221 
222     public boolean isDispatchIO()
223     {
224         return dispatchIO;
225     }
226 
227     public void setDispatchIO(boolean dispatchIO)
228     {
229         this.dispatchIO = dispatchIO;
230     }
231 
232     public ClientConnectionFactory getClientConnectionFactory()
233     {
234         return connectionFactory;
235     }
236 
237     public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
238     {
239         this.connectionFactory = connectionFactory;
240     }
241 
242     protected String selectProtocol(List<String> serverProtocols)
243     {
244         String protocol = "spdy/" + version;
245         for (String serverProtocol : serverProtocols)
246         {
247             if (serverProtocol.equals(protocol))
248                 return protocol;
249         }
250         return null;
251     }
252 
253     protected FlowControlStrategy newFlowControlStrategy()
254     {
255         return FlowControlStrategyFactory.newFlowControlStrategy(version);
256     }
257 
258     public static class Factory extends ContainerLifeCycle
259     {
260         private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
261         private final ByteBufferPool bufferPool = new MappedByteBufferPool();
262         private final Scheduler scheduler;
263         private final Executor executor;
264         private final SslContextFactory sslContextFactory;
265         private final SelectorManager selector;
266         private final long idleTimeout;
267         private long connectTimeout;
268 
269         public Factory()
270         {
271             this(null, null);
272         }
273 
274         public Factory(SslContextFactory sslContextFactory)
275         {
276             this(null, null, sslContextFactory);
277         }
278 
279         public Factory(Executor executor)
280         {
281             this(executor, null);
282         }
283 
284         public Factory(Executor executor, Scheduler scheduler)
285         {
286             this(executor, scheduler, null);
287         }
288 
289         public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory)
290         {
291             this(executor, scheduler, sslContextFactory, 30000);
292         }
293 
294         public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory, long idleTimeout)
295         {
296             this.idleTimeout = idleTimeout;
297             setConnectTimeout(15000);
298 
299             if (executor == null)
300                 executor = new QueuedThreadPool();
301             this.executor = executor;
302             addBean(executor);
303 
304             if (scheduler == null)
305                 scheduler = new ScheduledExecutorScheduler();
306             this.scheduler = scheduler;
307             addBean(scheduler);
308 
309             this.sslContextFactory = sslContextFactory;
310             if (sslContextFactory != null)
311                 addBean(sslContextFactory);
312 
313             selector = new ClientSelectorManager(executor, scheduler);
314             selector.setConnectTimeout(getConnectTimeout());
315             addBean(selector);
316         }
317 
318         public ByteBufferPool getByteBufferPool()
319         {
320             return bufferPool;
321         }
322 
323         public Scheduler getScheduler()
324         {
325             return scheduler;
326         }
327 
328         public Executor getExecutor()
329         {
330             return executor;
331         }
332 
333         public SslContextFactory getSslContextFactory()
334         {
335             return sslContextFactory;
336         }
337 
338         public long getConnectTimeout()
339         {
340             return connectTimeout;
341         }
342 
343         public void setConnectTimeout(long connectTimeout)
344         {
345             this.connectTimeout = connectTimeout;
346         }
347 
348         public SPDYClient newSPDYClient(short version)
349         {
350             return new SPDYClient(version, this);
351         }
352 
353         @Override
354         protected void doStop() throws Exception
355         {
356             closeConnections();
357             super.doStop();
358         }
359 
360         boolean sessionOpened(Session session)
361         {
362             // Add sessions only if the factory is not stopping
363             return isRunning() && sessions.offer(session);
364         }
365 
366         boolean sessionClosed(Session session)
367         {
368             // Remove sessions only if the factory is not stopping
369             // to avoid concurrent removes during iterations
370             return isRunning() && sessions.remove(session);
371         }
372 
373         private void closeConnections()
374         {
375             for (Session session : sessions)
376                 session.goAway(new GoAwayInfo(), new Callback.Adapter());
377             sessions.clear();
378         }
379 
380         public Collection<Session> getSessions()
381         {
382             return Collections.unmodifiableCollection(sessions);
383         }
384 
385         private class ClientSelectorManager extends SelectorManager
386         {
387             private ClientSelectorManager(Executor executor, Scheduler scheduler)
388             {
389                 super(executor, scheduler);
390             }
391 
392             @Override
393             protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
394             {
395                 @SuppressWarnings("unchecked")
396                 Map<String, Object> context = (Map<String, Object>)key.attachment();
397                 SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
398                 long clientIdleTimeout = client.getIdleTimeout();
399                 if (clientIdleTimeout < 0)
400                     clientIdleTimeout = idleTimeout;
401                 return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), clientIdleTimeout);
402             }
403 
404             @Override
405             public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
406             {
407                 @SuppressWarnings("unchecked")
408                 Map<String, Object> context = (Map<String, Object>)attachment;
409                 try
410                 {
411                     SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
412                     return client.getClientConnectionFactory().newConnection(endPoint, context);
413                 }
414                 catch (Throwable x)
415                 {
416                     @SuppressWarnings("unchecked")
417                     Promise<Session> promise = (Promise<Session>)context.get(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY);
418                     promise.failed(x);
419                     throw x;
420                 }
421             }
422         }
423     }
424 }