View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy.client;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.SocketChannel;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.Map;
30  import java.util.Queue;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.Executor;
34  
35  import org.eclipse.jetty.io.ByteBufferPool;
36  import org.eclipse.jetty.io.ClientConnectionFactory;
37  import org.eclipse.jetty.io.Connection;
38  import org.eclipse.jetty.io.EndPoint;
39  import org.eclipse.jetty.io.MappedByteBufferPool;
40  import org.eclipse.jetty.io.SelectChannelEndPoint;
41  import org.eclipse.jetty.io.SelectorManager;
42  import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
43  import org.eclipse.jetty.spdy.FlowControlStrategy;
44  import org.eclipse.jetty.spdy.api.GoAwayInfo;
45  import org.eclipse.jetty.spdy.api.Session;
46  import org.eclipse.jetty.spdy.api.SessionFrameListener;
47  import org.eclipse.jetty.util.Callback;
48  import org.eclipse.jetty.util.FuturePromise;
49  import org.eclipse.jetty.util.Promise;
50  import org.eclipse.jetty.util.component.ContainerLifeCycle;
51  import org.eclipse.jetty.util.ssl.SslContextFactory;
52  import org.eclipse.jetty.util.thread.QueuedThreadPool;
53  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
54  import org.eclipse.jetty.util.thread.Scheduler;
55  
56  /**
57   * A {@link SPDYClient} allows applications to connect to one or more SPDY servers,
58   * obtaining {@link Session} objects that can be used to send/receive SPDY frames.
59   * <p/>
60   * {@link SPDYClient} instances are created through a {@link Factory}:
61   * <pre>
62   * SPDYClient.Factory factory = new SPDYClient.Factory();
63   * SPDYClient client = factory.newSPDYClient(SPDY.V3);
64   * </pre>
65   * and then used to connect to the server:
66   * <pre>
67   * FuturePromise&lt;Session&gt; promise = new FuturePromise&lt;&gt;();
68   * client.connect("server.com", null, promise);
69   * Session session = promise.get();
70   * </pre>
71   */
72  public class SPDYClient
73  {
74      private final short version;
75      private final Factory factory;
76      private volatile SocketAddress bindAddress;
77      private volatile long idleTimeout = -1;
78      private volatile int initialWindowSize;
79      private volatile boolean dispatchIO;
80      private volatile ClientConnectionFactory connectionFactory;
81  
82      protected SPDYClient(short version, Factory factory)
83      {
84          this.version = version;
85          this.factory = factory;
86          setInitialWindowSize(65536);
87          setDispatchIO(true);
88      }
89  
90      public short getVersion()
91      {
92          return version;
93      }
94  
95      public Factory getFactory()
96      {
97          return factory;
98      }
99  
100     /**
101      * Equivalent to:
102      * <pre>
103      * Future&lt;Session&gt; promise = new FuturePromise&lt;&gt;();
104      * connect(address, listener, promise);
105      * </pre>
106      *
107      * @param address  the address to connect to
108      * @param listener the session listener that will be notified of session events
109      * @return a {@link Session} when connected
110      */
111     public Session connect(SocketAddress address, SessionFrameListener listener) throws ExecutionException, InterruptedException
112     {
113         FuturePromise<Session> promise = new FuturePromise<>();
114         connect(address, listener, promise);
115         return promise.get();
116     }
117 
118     /**
119      * Equivalent to:
120      * <pre>
121      * connect(address, listener, promise, null);
122      * </pre>
123      *
124      * @param address  the address to connect to
125      * @param listener the session listener that will be notified of session events
126      * @param promise  the promise notified of connection success/failure
127      */
128     public void connect(SocketAddress address, SessionFrameListener listener, Promise<Session> promise)
129     {
130         connect(address, listener, promise, new HashMap<String, Object>());
131     }
132 
133     /**
134      * Connects to the given {@code address}, binding the given {@code listener} to session events,
135      * and notified the given {@code promise} of the connect result.
136      * <p/>
137      * If the connect operation is successful, the {@code promise} will be invoked with the {@link Session}
138      * object that applications can use to perform SPDY requests.
139      *
140      * @param address  the address to connect to
141      * @param listener the session listener that will be notified of session events
142      * @param promise  the promise notified of connection success/failure
143      * @param context  a context object passed to the {@link #getClientConnectionFactory() ConnectionFactory}
144      *                 for the creation of the connection
145      */
146     public void connect(final SocketAddress address, final SessionFrameListener listener, final Promise<Session> promise, Map<String, Object> context)
147     {
148         if (!factory.isStarted())
149             throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
150 
151         try
152         {
153             SocketChannel channel = SocketChannel.open();
154             if (bindAddress != null)
155                 channel.bind(bindAddress);
156             configure(channel);
157             channel.configureBlocking(false);
158             channel.connect(address);
159 
160             context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, ((InetSocketAddress)address).getHostString());
161             context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, ((InetSocketAddress)address).getPort());
162             context.put(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY, this);
163             context.put(SPDYClientConnectionFactory.SPDY_SESSION_LISTENER_CONTEXT_KEY, listener);
164             context.put(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY, promise);
165 
166             factory.selector.connect(channel, context);
167         }
168         catch (IOException x)
169         {
170             promise.failed(x);
171         }
172     }
173 
174     protected void configure(SocketChannel channel) throws IOException
175     {
176         channel.socket().setTcpNoDelay(true);
177     }
178 
179     /**
180      * @return the address to bind the socket channel to
181      * @see #setBindAddress(SocketAddress)
182      */
183     public SocketAddress getBindAddress()
184     {
185         return bindAddress;
186     }
187 
188     /**
189      * @param bindAddress the address to bind the socket channel to
190      * @see #getBindAddress()
191      */
192     public void setBindAddress(SocketAddress bindAddress)
193     {
194         this.bindAddress = bindAddress;
195     }
196 
197     public long getIdleTimeout()
198     {
199         return idleTimeout;
200     }
201 
202     public void setIdleTimeout(long idleTimeout)
203     {
204         this.idleTimeout = idleTimeout;
205     }
206 
207     public int getInitialWindowSize()
208     {
209         return initialWindowSize;
210     }
211 
212     public void setInitialWindowSize(int initialWindowSize)
213     {
214         this.initialWindowSize = initialWindowSize;
215     }
216 
217     public boolean isDispatchIO()
218     {
219         return dispatchIO;
220     }
221 
222     public void setDispatchIO(boolean dispatchIO)
223     {
224         this.dispatchIO = dispatchIO;
225     }
226 
227     public ClientConnectionFactory getClientConnectionFactory()
228     {
229         return connectionFactory;
230     }
231 
232     public void setClientConnectionFactory(ClientConnectionFactory connectionFactory)
233     {
234         this.connectionFactory = connectionFactory;
235     }
236 
237     protected FlowControlStrategy newFlowControlStrategy()
238     {
239         return FlowControlStrategyFactory.newFlowControlStrategy(version);
240     }
241 
242     public static class Factory extends ContainerLifeCycle
243     {
244         private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
245         private final Scheduler scheduler;
246         private final Executor executor;
247         private final ByteBufferPool bufferPool;
248         private final SslContextFactory sslContextFactory;
249         private final SelectorManager selector;
250         private final long idleTimeout;
251         private long connectTimeout;
252 
253         public Factory()
254         {
255             this(null, null);
256         }
257 
258         public Factory(SslContextFactory sslContextFactory)
259         {
260             this(null, null, sslContextFactory);
261         }
262 
263         public Factory(Executor executor)
264         {
265             this(executor, null);
266         }
267 
268         public Factory(Executor executor, Scheduler scheduler)
269         {
270             this(executor, scheduler, null);
271         }
272 
273         public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory)
274         {
275             this(executor, scheduler, sslContextFactory, 30000);
276         }
277 
278         public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory, long idleTimeout)
279         {
280             this(executor, scheduler, null, sslContextFactory, idleTimeout);
281         }
282 
283         public Factory(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, SslContextFactory sslContextFactory, long idleTimeout)
284         {
285             this.idleTimeout = idleTimeout;
286             setConnectTimeout(15000);
287 
288             if (executor == null)
289                 executor = new QueuedThreadPool();
290             this.executor = executor;
291             addBean(executor);
292 
293             if (scheduler == null)
294                 scheduler = new ScheduledExecutorScheduler();
295             this.scheduler = scheduler;
296             addBean(scheduler);
297 
298             if (bufferPool == null)
299                 bufferPool = new MappedByteBufferPool();
300             this.bufferPool = bufferPool;
301             addBean(bufferPool);
302 
303             this.sslContextFactory = sslContextFactory;
304             if (sslContextFactory != null)
305                 addBean(sslContextFactory);
306 
307             selector = new ClientSelectorManager(executor, scheduler);
308             selector.setConnectTimeout(getConnectTimeout());
309             addBean(selector);
310         }
311 
312         public ByteBufferPool getByteBufferPool()
313         {
314             return bufferPool;
315         }
316 
317         public Scheduler getScheduler()
318         {
319             return scheduler;
320         }
321 
322         public Executor getExecutor()
323         {
324             return executor;
325         }
326 
327         public SslContextFactory getSslContextFactory()
328         {
329             return sslContextFactory;
330         }
331 
332         public long getConnectTimeout()
333         {
334             return connectTimeout;
335         }
336 
337         public void setConnectTimeout(long connectTimeout)
338         {
339             this.connectTimeout = connectTimeout;
340         }
341 
342         public SPDYClient newSPDYClient(short version)
343         {
344             return newSPDYClient(version, new NPNClientConnectionFactory(getExecutor(), new SPDYClientConnectionFactory(), "spdy/" + version));
345         }
346 
347         public SPDYClient newSPDYClient(short version, NegotiatingClientConnectionFactory negotiatingFactory)
348         {
349             SPDYClient client = new SPDYClient(version, this);
350 
351             ClientConnectionFactory connectionFactory = negotiatingFactory.getClientConnectionFactory();
352             if (sslContextFactory != null)
353                 connectionFactory = new SslClientConnectionFactory(getSslContextFactory(), getByteBufferPool(), getExecutor(), negotiatingFactory);
354 
355             client.setClientConnectionFactory(connectionFactory);
356             return client;
357         }
358 
359         @Override
360         protected void doStop() throws Exception
361         {
362             closeConnections();
363             super.doStop();
364         }
365 
366         boolean sessionOpened(Session session)
367         {
368             // Add sessions only if the factory is not stopping
369             return isRunning() && sessions.offer(session);
370         }
371 
372         boolean sessionClosed(Session session)
373         {
374             // Remove sessions only if the factory is not stopping
375             // to avoid concurrent removes during iterations
376             return isRunning() && sessions.remove(session);
377         }
378 
379         private void closeConnections()
380         {
381             for (Session session : sessions)
382                 session.goAway(new GoAwayInfo(), new Callback.Adapter());
383             sessions.clear();
384         }
385 
386         public Collection<Session> getSessions()
387         {
388             return Collections.unmodifiableCollection(sessions);
389         }
390 
391         @Override
392         protected void dumpThis(Appendable out) throws IOException
393         {
394             super.dumpThis(out);
395             dump(out, "", sessions);
396         }
397 
398         private class ClientSelectorManager extends SelectorManager
399         {
400             private ClientSelectorManager(Executor executor, Scheduler scheduler)
401             {
402                 super(executor, scheduler);
403             }
404 
405             @Override
406             protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
407             {
408                 @SuppressWarnings("unchecked")
409                 Map<String, Object> context = (Map<String, Object>)key.attachment();
410                 SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
411                 long clientIdleTimeout = client.getIdleTimeout();
412                 if (clientIdleTimeout < 0)
413                     clientIdleTimeout = idleTimeout;
414                 return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), clientIdleTimeout);
415             }
416 
417             @Override
418             public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
419             {
420                 @SuppressWarnings("unchecked")
421                 Map<String, Object> context = (Map<String, Object>)attachment;
422                 try
423                 {
424                     SPDYClient client = (SPDYClient)context.get(SPDYClientConnectionFactory.SPDY_CLIENT_CONTEXT_KEY);
425                     return client.getClientConnectionFactory().newConnection(endPoint, context);
426                 }
427                 catch (Throwable x)
428                 {
429                     @SuppressWarnings("unchecked")
430                     Promise<Session> promise = (Promise<Session>)context.get(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY);
431                     promise.failed(x);
432                     throw x;
433                 }
434             }
435         }
436     }
437 }