View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy.client;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.SocketChannel;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.Map;
30  import java.util.Queue;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.Executor;
34  
35  import org.eclipse.jetty.io.ByteBufferPool;
36  import org.eclipse.jetty.io.ClientConnectionFactory;
37  import org.eclipse.jetty.io.Connection;
38  import org.eclipse.jetty.io.EndPoint;
39  import org.eclipse.jetty.io.MappedByteBufferPool;
40  import org.eclipse.jetty.io.NegotiatingClientConnectionFactory;
41  import org.eclipse.jetty.io.SelectChannelEndPoint;
42  import org.eclipse.jetty.io.SelectorManager;
43  import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
44  import org.eclipse.jetty.spdy.FlowControlStrategy;
45  import org.eclipse.jetty.spdy.api.GoAwayInfo;
46  import org.eclipse.jetty.spdy.api.Session;
47  import org.eclipse.jetty.spdy.api.SessionFrameListener;
48  import org.eclipse.jetty.util.Callback;
49  import org.eclipse.jetty.util.FuturePromise;
50  import org.eclipse.jetty.util.Promise;
51  import org.eclipse.jetty.util.component.ContainerLifeCycle;
52  import org.eclipse.jetty.util.ssl.SslContextFactory;
53  import org.eclipse.jetty.util.thread.QueuedThreadPool;
54  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
55  import org.eclipse.jetty.util.thread.Scheduler;
56  
57  /**
58   * A {@link SPDYClient} allows applications to connect to one or more SPDY servers,
59   * obtaining {@link Session} objects that can be used to send/receive SPDY frames.
60   * <p/>
61   * {@link SPDYClient} instances are created through a {@link Factory}:
62   * <pre>
63   * SPDYClient.Factory factory = new SPDYClient.Factory();
64   * SPDYClient client = factory.newSPDYClient(SPDY.V3);
65   * </pre>
66   * and then used to connect to the server:
67   * <pre>
68   * FuturePromise&lt;Session&gt; promise = new FuturePromise&lt;&gt;();
69   * client.connect("server.com", null, promise);
70   * Session session = promise.get();
71   * </pre>
72   */
73  public class SPDYClient
74  {
75      private final short version;
76      private final Factory factory;
77      private volatile SocketAddress bindAddress;
78      private volatile long idleTimeout = -1;
79      private volatile int initialWindowSize;
80      private volatile boolean dispatchIO;
81      private volatile ClientConnectionFactory connectionFactory;
82  
83      protected SPDYClient(short version, Factory factory)
84      {
85          this.version = version;
86          this.factory = factory;
87          setInitialWindowSize(65536);
88          setDispatchIO(true);
89      }
90  
91      public short getVersion()
92      {
93          return version;
94      }
95  
96      public Factory getFactory()
97      {
98          return factory;
99      }
100 
101     /**
102      * Equivalent to:
103      * <pre>
104      * Future&lt;Session&gt; promise = new FuturePromise&lt;&gt;();
105      * connect(address, listener, promise);
106      * </pre>
107      *
108      * @param address  the address to connect to
109      * @param listener the session listener that will be notified of session events
110      * @return a {@link Session} when connected
111      */
112     public Session connect(SocketAddress address, SessionFrameListener listener) throws ExecutionException, InterruptedException
113     {
114         FuturePromise<Session> promise = new FuturePromise<>();
115         connect(address, listener, promise);
116         return promise.get();
117     }
118 
119     /**
120      * Equivalent to:
121      * <pre>
122      * connect(address, listener, promise, null);
123      * </pre>
124      *
125      * @param address  the address to connect to
126      * @param listener the session listener that will be notified of session events
127      * @param promise  the promise notified of connection success/failure
128      */
129     public void connect(SocketAddress address, SessionFrameListener listener, Promise<Session> promise)
130     {
131         connect(address, listener, promise, new HashMap<String, Object>());
132     }
133 
134     /**
135      * Connects to the given {@code address}, binding the given {@code listener} to session events,
136      * and notified the given {@code promise} of the connect result.
137      * <p/>
138      * If the connect operation is successful, the {@code promise} will be invoked with the {@link Session}
139      * object that applications can use to perform SPDY requests.
140      *
141      * @param address  the address to connect to
142      * @param listener the session listener that will be notified of session events
143      * @param promise  the promise notified of connection success/failure
144      * @param context  a context object passed to the {@link #getClientConnectionFactory() ConnectionFactory}
145      *                 for the creation of the connection
146      */
147     public void connect(final SocketAddress address, final SessionFrameListener listener, final Promise<Session> promise, Map<String, Object> context)
148     {
149         if (!factory.isStarted())
150             throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
151 
152         try
153         {
154             SocketChannel channel = SocketChannel.open();
155             if (bindAddress != null)
156                 channel.bind(bindAddress);
157             configure(channel);
158             channel.configureBlocking(false);
159 
160             context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, ((InetSocketAddress)address).getHostString());
161             context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, ((InetSocketAddress)address).getPort());
162             context.put(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY, this);
163             context.put(SPDYClientConnectionFactory.SPDY_SESSION_LISTENER_CONTEXT_KEY, listener);
164             context.put(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY, promise);
165 
166             if (channel.connect(address))
167                 factory.selector.accept(channel, context);
168             else
169                 factory.selector.connect(channel, context);
170         }
171         catch (IOException x)
172         {
173             promise.failed(x);
174         }
175     }
176 
177     protected void configure(SocketChannel channel) throws IOException
178     {
179         channel.socket().setTcpNoDelay(true);
180     }
181 
182     /**
183      * @return the address to bind the socket channel to
184      * @see #setBindAddress(SocketAddress)
185      */
186     public SocketAddress getBindAddress()
187     {
188         return bindAddress;
189     }
190 
191     /**
192      * @param bindAddress the address to bind the socket channel to
193      * @see #getBindAddress()
194      */
195     public void setBindAddress(SocketAddress bindAddress)
196     {
197         this.bindAddress = bindAddress;
198     }
199 
200     public long getIdleTimeout()
201     {
202         return idleTimeout;
203     }
204 
205     public void setIdleTimeout(long idleTimeout)
206     {
207         this.idleTimeout = idleTimeout;
208     }
209 
210     public int getInitialWindowSize()
211     {
212         return initialWindowSize;
213     }
214 
215     public void setInitialWindowSize(int initialWindowSize)
216     {
217         this.initialWindowSize = initialWindowSize;
218     }
219 
220     public boolean isDispatchIO()
221     {
222         return dispatchIO;
223     }
224 
225     public void setDispatchIO(boolean dispatchIO)
226     {
227         this.dispatchIO = dispatchIO;
228     }
229 
230     public ClientConnectionFactory getClientConnectionFactory()
231     {
232         return connectionFactory;
233     }
234 
235     public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
236     {
237         this.connectionFactory = connectionFactory;
238     }
239 
240     protected FlowControlStrategy newFlowControlStrategy()
241     {
242         return FlowControlStrategyFactory.newFlowControlStrategy(version);
243     }
244 
245     public static class Factory extends ContainerLifeCycle
246     {
247         private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
248         private final Scheduler scheduler;
249         private final Executor executor;
250         private final ByteBufferPool bufferPool;
251         private final SslContextFactory sslContextFactory;
252         private final SelectorManager selector;
253         private final long idleTimeout;
254         private long connectTimeout;
255 
256         public Factory()
257         {
258             this(null, null);
259         }
260 
261         public Factory(SslContextFactory sslContextFactory)
262         {
263             this(null, null, sslContextFactory);
264         }
265 
266         public Factory(Executor executor)
267         {
268             this(executor, null);
269         }
270 
271         public Factory(Executor executor, Scheduler scheduler)
272         {
273             this(executor, scheduler, null);
274         }
275 
276         public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory)
277         {
278             this(executor, scheduler, sslContextFactory, 30000);
279         }
280 
281         public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory, long idleTimeout)
282         {
283             this(executor, scheduler, null, sslContextFactory, idleTimeout);
284         }
285 
286         public Factory(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, SslContextFactory sslContextFactory, long idleTimeout)
287         {
288             this.idleTimeout = idleTimeout;
289             setConnectTimeout(15000);
290 
291             if (executor == null)
292                 executor = new QueuedThreadPool();
293             this.executor = executor;
294             addBean(executor);
295 
296             if (scheduler == null)
297                 scheduler = new ScheduledExecutorScheduler();
298             this.scheduler = scheduler;
299             addBean(scheduler);
300 
301             if (bufferPool == null)
302                 bufferPool = new MappedByteBufferPool();
303             this.bufferPool = bufferPool;
304             addBean(bufferPool);
305 
306             this.sslContextFactory = sslContextFactory;
307             if (sslContextFactory != null)
308                 addBean(sslContextFactory);
309 
310             selector = new ClientSelectorManager(executor, scheduler);
311             selector.setConnectTimeout(getConnectTimeout());
312             addBean(selector);
313         }
314 
315         public ByteBufferPool getByteBufferPool()
316         {
317             return bufferPool;
318         }
319 
320         public Scheduler getScheduler()
321         {
322             return scheduler;
323         }
324 
325         public Executor getExecutor()
326         {
327             return executor;
328         }
329 
330         public SslContextFactory getSslContextFactory()
331         {
332             return sslContextFactory;
333         }
334 
335         public long getConnectTimeout()
336         {
337             return connectTimeout;
338         }
339 
340         public void setConnectTimeout(long connectTimeout)
341         {
342             this.connectTimeout = connectTimeout;
343         }
344 
345         public SPDYClient newSPDYClient(short version)
346         {
347             return newSPDYClient(version, new NPNClientConnectionFactory(getExecutor(), new SPDYClientConnectionFactory(), "spdy/" + version));
348         }
349 
350         public SPDYClient newSPDYClient(short version, NegotiatingClientConnectionFactory negotiatingFactory)
351         {
352             SPDYClient client = new SPDYClient(version, this);
353 
354             ClientConnectionFactory connectionFactory = negotiatingFactory.getClientConnectionFactory();
355             if (sslContextFactory != null)
356                 connectionFactory = new SslClientConnectionFactory(getSslContextFactory(), getByteBufferPool(), getExecutor(), negotiatingFactory);
357 
358             client.setClientConnectionFactory(connectionFactory);
359             return client;
360         }
361 
362         @Override
363         protected void doStop() throws Exception
364         {
365             closeConnections();
366             super.doStop();
367         }
368 
369         boolean sessionOpened(Session session)
370         {
371             // Add sessions only if the factory is not stopping
372             return isRunning() && sessions.offer(session);
373         }
374 
375         boolean sessionClosed(Session session)
376         {
377             // Remove sessions only if the factory is not stopping
378             // to avoid concurrent removes during iterations
379             return isRunning() && sessions.remove(session);
380         }
381 
382         private void closeConnections()
383         {
384             for (Session session : sessions)
385                 session.goAway(new GoAwayInfo(), Callback.Adapter.INSTANCE);
386             sessions.clear();
387         }
388 
389         public Collection<Session> getSessions()
390         {
391             return Collections.unmodifiableCollection(sessions);
392         }
393 
394         @Override
395         protected void dumpThis(Appendable out) throws IOException
396         {
397             super.dumpThis(out);
398             dump(out, "", sessions);
399         }
400 
401         private class ClientSelectorManager extends SelectorManager
402         {
403             private ClientSelectorManager(Executor executor, Scheduler scheduler)
404             {
405                 super(executor, scheduler);
406             }
407 
408             @Override
409             protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
410             {
411                 @SuppressWarnings("unchecked")
412                 Map<String, Object> context = (Map<String, Object>)key.attachment();
413                 SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
414                 long clientIdleTimeout = client.getIdleTimeout();
415                 if (clientIdleTimeout < 0)
416                     clientIdleTimeout = idleTimeout;
417                 return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), clientIdleTimeout);
418             }
419 
420             @Override
421             public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
422             {
423                 @SuppressWarnings("unchecked")
424                 Map<String, Object> context = (Map<String, Object>)attachment;
425                 try
426                 {
427                     SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
428                     return client.getClientConnectionFactory().newConnection(endPoint, context);
429                 }
430                 catch (Throwable x)
431                 {
432                     @SuppressWarnings("unchecked")
433                     Promise<Session> promise = (Promise<Session>)context.get(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY);
434                     promise.failed(x);
435                     throw x;
436                 }
437             }
438         }
439     }
440 }