View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  
20  package org.eclipse.jetty.spdy;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.SocketChannel;
27  import java.util.Collection;
28  import java.util.Collections;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Queue;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.ScheduledExecutorService;
39  import javax.net.ssl.SSLEngine;
40  import javax.net.ssl.SSLException;
41  
42  import org.eclipse.jetty.io.AsyncEndPoint;
43  import org.eclipse.jetty.io.ConnectedEndPoint;
44  import org.eclipse.jetty.io.Connection;
45  import org.eclipse.jetty.io.nio.AsyncConnection;
46  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
47  import org.eclipse.jetty.io.nio.SelectorManager;
48  import org.eclipse.jetty.io.nio.SslConnection;
49  import org.eclipse.jetty.npn.NextProtoNego;
50  import org.eclipse.jetty.spdy.api.Session;
51  import org.eclipse.jetty.spdy.api.SessionFrameListener;
52  import org.eclipse.jetty.spdy.generator.Generator;
53  import org.eclipse.jetty.spdy.parser.Parser;
54  import org.eclipse.jetty.util.component.AggregateLifeCycle;
55  import org.eclipse.jetty.util.ssl.SslContextFactory;
56  import org.eclipse.jetty.util.thread.QueuedThreadPool;
57  
58  public class SPDYClient
59  {
60      private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
61      private final short version;
62      private final Factory factory;
63      private SocketAddress bindAddress;
64      private long maxIdleTime = -1;
65      private volatile int initialWindowSize = 65536;
66  
67      protected SPDYClient(short version, Factory factory)
68      {
69          this.version = version;
70          this.factory = factory;
71      }
72  
73      /**
74       * @return the address to bind the socket channel to
75       * @see #setBindAddress(SocketAddress)
76       */
77      public SocketAddress getBindAddress()
78      {
79          return bindAddress;
80      }
81  
82      /**
83       * @param bindAddress the address to bind the socket channel to
84       * @see #getBindAddress()
85       */
86      public void setBindAddress(SocketAddress bindAddress)
87      {
88          this.bindAddress = bindAddress;
89      }
90  
91      public Future<Session> connect(InetSocketAddress address, SessionFrameListener listener) throws IOException
92      {
93          if (!factory.isStarted())
94              throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
95  
96          SocketChannel channel = SocketChannel.open();
97          if (bindAddress != null)
98              channel.bind(bindAddress);
99          channel.socket().setTcpNoDelay(true);
100         channel.configureBlocking(false);
101 
102         SessionPromise result = new SessionPromise(channel, this, listener);
103 
104         channel.connect(address);
105         factory.selector.register(channel, result);
106 
107         return result;
108     }
109 
110     public long getMaxIdleTime()
111     {
112         return maxIdleTime;
113     }
114 
115     public void setMaxIdleTime(long maxIdleTime)
116     {
117         this.maxIdleTime = maxIdleTime;
118     }
119 
120     public int getInitialWindowSize()
121     {
122         return initialWindowSize;
123     }
124 
125     public void setInitialWindowSize(int initialWindowSize)
126     {
127         this.initialWindowSize = initialWindowSize;
128     }
129 
130     protected String selectProtocol(List<String> serverProtocols)
131     {
132         if (serverProtocols == null)
133             return "spdy/2";
134 
135         for (String serverProtocol : serverProtocols)
136         {
137             for (String protocol : factories.keySet())
138             {
139                 if (serverProtocol.equals(protocol))
140                     return protocol;
141             }
142             String protocol = factory.selectProtocol(serverProtocols);
143             if (protocol != null)
144                 return protocol;
145         }
146 
147         return null;
148     }
149 
150     public AsyncConnectionFactory getAsyncConnectionFactory(String protocol)
151     {
152         for (Map.Entry<String, AsyncConnectionFactory> entry : factories.entrySet())
153         {
154             if (protocol.equals(entry.getKey()))
155                 return entry.getValue();
156         }
157         for (Map.Entry<String, AsyncConnectionFactory> entry : factory.factories.entrySet())
158         {
159             if (protocol.equals(entry.getKey()))
160                 return entry.getValue();
161         }
162         return null;
163     }
164 
165     public void putAsyncConnectionFactory(String protocol, AsyncConnectionFactory factory)
166     {
167         factories.put(protocol, factory);
168     }
169 
170     public AsyncConnectionFactory removeAsyncConnectionFactory(String protocol)
171     {
172         return factories.remove(protocol);
173     }
174 
175     protected SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)
176     {
177         String peerHost = channel.socket().getInetAddress().getHostAddress();
178         int peerPort = channel.socket().getPort();
179         SSLEngine engine = sslContextFactory.newSslEngine(peerHost, peerPort);
180         engine.setUseClientMode(true);
181         return engine;
182     }
183 
184     protected FlowControlStrategy newFlowControlStrategy()
185     {
186         return FlowControlStrategyFactory.newFlowControlStrategy(version);
187     }
188 
189     public static class Factory extends AggregateLifeCycle
190     {
191         private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
192         private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
193         private final ByteBufferPool bufferPool = new StandardByteBufferPool();
194         private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
195         private final Executor threadPool;
196         private final SslContextFactory sslContextFactory;
197         private final SelectorManager selector;
198 
199         public Factory()
200         {
201             this(null, null);
202         }
203 
204         public Factory(SslContextFactory sslContextFactory)
205         {
206             this(null, sslContextFactory);
207         }
208 
209         public Factory(Executor threadPool)
210         {
211             this(threadPool, null);
212         }
213 
214         public Factory(Executor threadPool, SslContextFactory sslContextFactory)
215         {
216             if (threadPool == null)
217                 threadPool = new QueuedThreadPool();
218             this.threadPool = threadPool;
219             addBean(threadPool);
220 
221             this.sslContextFactory = sslContextFactory;
222             if (sslContextFactory != null)
223                 addBean(sslContextFactory);
224 
225             selector = new ClientSelectorManager();
226             addBean(selector);
227 
228             factories.put("spdy/2", new ClientSPDYAsyncConnectionFactory());
229         }
230 
231         public SPDYClient newSPDYClient(short version)
232         {
233             return new SPDYClient(version, this);
234         }
235 
236         @Override
237         protected void doStop() throws Exception
238         {
239             closeConnections();
240             super.doStop();
241         }
242 
243         protected String selectProtocol(List<String> serverProtocols)
244         {
245             for (String serverProtocol : serverProtocols)
246             {
247                 for (String protocol : factories.keySet())
248                 {
249                     if (serverProtocol.equals(protocol))
250                         return protocol;
251                 }
252             }
253             return null;
254         }
255 
256         private boolean sessionOpened(Session session)
257         {
258             // Add sessions only if the factory is not stopping
259             return isRunning() && sessions.offer(session);
260         }
261 
262         private boolean sessionClosed(Session session)
263         {
264             // Remove sessions only if the factory is not stopping
265             // to avoid concurrent removes during iterations
266             return isRunning() && sessions.remove(session);
267         }
268 
269         private void closeConnections()
270         {
271             for (Session session : sessions)
272                 session.goAway();
273             sessions.clear();
274         }
275 
276         protected Collection<Session> getSessions()
277         {
278             return Collections.unmodifiableCollection(sessions);
279         }
280 
281         private class ClientSelectorManager extends SelectorManager
282         {
283             @Override
284             public boolean dispatch(Runnable task)
285             {
286                 try
287                 {
288                     threadPool.execute(task);
289                     return true;
290                 }
291                 catch (RejectedExecutionException x)
292                 {
293                     return false;
294                 }
295             }
296 
297             @Override
298             protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
299             {
300                 SessionPromise attachment = (SessionPromise)key.attachment();
301 
302                 long maxIdleTime = attachment.client.getMaxIdleTime();
303                 if (maxIdleTime < 0)
304                     maxIdleTime = getMaxIdleTime();
305                 SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, (int)maxIdleTime);
306 
307                 AsyncConnection connection = newConnection(channel, result, attachment);
308                 result.setConnection(connection);
309 
310                 return result;
311             }
312 
313             @Override
314             protected void endPointOpened(SelectChannelEndPoint endpoint)
315             {
316             }
317 
318             @Override
319             protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
320             {
321             }
322 
323             @Override
324             protected void endPointClosed(SelectChannelEndPoint endpoint)
325             {
326                 endpoint.getConnection().onClose();
327             }
328 
329             @Override
330             public AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint, final Object attachment)
331             {
332                 SessionPromise sessionPromise = (SessionPromise)attachment;
333                 final SPDYClient client = sessionPromise.client;
334 
335                 try
336                 {
337                     if (sslContextFactory != null)
338                     {
339                         final SSLEngine engine = client.newSSLEngine(sslContextFactory, channel);
340                         SslConnection sslConnection = new SslConnection(engine, endPoint)
341                         {
342                             @Override
343                             public void onClose()
344                             {
345                                 NextProtoNego.remove(engine);
346                                 super.onClose();
347                             }
348                         };
349                         endPoint.setConnection(sslConnection);
350                         final AsyncEndPoint sslEndPoint = sslConnection.getSslEndPoint();
351                         NextProtoNego.put(engine, new NextProtoNego.ClientProvider()
352                         {
353                             @Override
354                             public boolean supports()
355                             {
356                                 return true;
357                             }
358 
359                             @Override
360                             public void unsupported()
361                             {
362                                 // Server does not support NPN, but this is a SPDY client, so hardcode SPDY
363                                 ClientSPDYAsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
364                                 AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachment);
365                                 sslEndPoint.setConnection(connection);
366                             }
367 
368                             @Override
369                             public String selectProtocol(List<String> protocols)
370                             {
371                                 String protocol = client.selectProtocol(protocols);
372                                 if (protocol == null)
373                                     return null;
374 
375                                 AsyncConnectionFactory connectionFactory = client.getAsyncConnectionFactory(protocol);
376                                 AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachment);
377                                 sslEndPoint.setConnection(connection);
378                                 return protocol;
379                             }
380                         });
381 
382                         AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
383                         sslEndPoint.setConnection(connection);
384 
385                         startHandshake(engine);
386 
387                         return sslConnection;
388                     }
389                     else
390                     {
391                         AsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
392                         AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, attachment);
393                         endPoint.setConnection(connection);
394                         return connection;
395                     }
396                 }
397                 catch (RuntimeException x)
398                 {
399                     sessionPromise.failed(null,x);
400                     throw x;
401                 }
402             }
403 
404             private void startHandshake(SSLEngine engine)
405             {
406                 try
407                 {
408                     engine.beginHandshake();
409                 }
410                 catch (SSLException x)
411                 {
412                     throw new RuntimeException(x);
413                 }
414             }
415         }
416     }
417 
418     private static class SessionPromise extends Promise<Session>
419     {
420         private final SocketChannel channel;
421         private final SPDYClient client;
422         private final SessionFrameListener listener;
423 
424         private SessionPromise(SocketChannel channel, SPDYClient client, SessionFrameListener listener)
425         {
426             this.channel = channel;
427             this.client = client;
428             this.listener = listener;
429         }
430 
431         @Override
432         public boolean cancel(boolean mayInterruptIfRunning)
433         {
434             try
435             {
436                 super.cancel(mayInterruptIfRunning);
437                 channel.close();
438                 return true;
439             }
440             catch (IOException x)
441             {
442                 return true;
443             }
444         }
445     }
446 
447     private static class ClientSPDYAsyncConnectionFactory implements AsyncConnectionFactory
448     {
449         @Override
450         public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
451         {
452             SessionPromise sessionPromise = (SessionPromise)attachment;
453             SPDYClient client = sessionPromise.client;
454             Factory factory = client.factory;
455 
456             CompressionFactory compressionFactory = new StandardCompressionFactory();
457             Parser parser = new Parser(compressionFactory.newDecompressor());
458             Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor());
459 
460             SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory);
461             endPoint.setConnection(connection);
462 
463             FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy();
464 
465             StandardSession session = new StandardSession(client.version, factory.bufferPool, factory.threadPool, factory.scheduler, connection, connection, 1, sessionPromise.listener, generator, flowControlStrategy);
466             session.setWindowSize(client.getInitialWindowSize());
467             parser.addListener(session);
468             sessionPromise.completed(session);
469             connection.setSession(session);
470 
471             factory.sessionOpened(session);
472 
473             return connection;
474         }
475 
476         private class ClientSPDYAsyncConnection extends SPDYAsyncConnection
477         {
478             private final Factory factory;
479 
480             public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
481             {
482                 super(endPoint, bufferPool, parser);
483                 this.factory = factory;
484             }
485 
486             @Override
487             public void onClose()
488             {
489                 super.onClose();
490                 factory.sessionClosed(getSession());
491             }
492         }
493     }
494 }