View Javadoc

1   //========================================================================
2   //Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //All rights reserved. This program and the accompanying materials
5   //are made available under the terms of the Eclipse Public License v1.0
6   //and Apache License v2.0 which accompanies this distribution.
7   //The Eclipse Public License is available at
8   //http://www.eclipse.org/legal/epl-v10.html
9   //The Apache License v2.0 is available at
10  //http://www.opensource.org/licenses/apache2.0.php
11  //You may elect to redistribute this code under either of these licenses.
12  //========================================================================
13  
14  
15  package org.eclipse.jetty.spdy;
16  
17  import java.io.IOException;
18  import java.net.InetSocketAddress;
19  import java.net.SocketAddress;
20  import java.nio.channels.SelectionKey;
21  import java.nio.channels.SocketChannel;
22  import java.util.Collection;
23  import java.util.Collections;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.RejectedExecutionException;
33  import java.util.concurrent.ScheduledExecutorService;
34  import javax.net.ssl.SSLEngine;
35  import javax.net.ssl.SSLException;
36  
37  import org.eclipse.jetty.io.AsyncEndPoint;
38  import org.eclipse.jetty.io.ConnectedEndPoint;
39  import org.eclipse.jetty.io.Connection;
40  import org.eclipse.jetty.io.nio.AsyncConnection;
41  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
42  import org.eclipse.jetty.io.nio.SelectorManager;
43  import org.eclipse.jetty.io.nio.SslConnection;
44  import org.eclipse.jetty.npn.NextProtoNego;
45  import org.eclipse.jetty.spdy.api.Session;
46  import org.eclipse.jetty.spdy.api.SessionFrameListener;
47  import org.eclipse.jetty.spdy.generator.Generator;
48  import org.eclipse.jetty.spdy.parser.Parser;
49  import org.eclipse.jetty.util.component.AggregateLifeCycle;
50  import org.eclipse.jetty.util.ssl.SslContextFactory;
51  import org.eclipse.jetty.util.thread.QueuedThreadPool;
52  
53  public class SPDYClient
54  {
55      private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
56      private final short version;
57      private final Factory factory;
58      private SocketAddress bindAddress;
59      private long maxIdleTime = -1;
60      private volatile int initialWindowSize = 65536;
61  
62      protected SPDYClient(short version, Factory factory)
63      {
64          this.version = version;
65          this.factory = factory;
66      }
67  
68      /**
69       * @return the address to bind the socket channel to
70       * @see #setBindAddress(SocketAddress)
71       */
72      public SocketAddress getBindAddress()
73      {
74          return bindAddress;
75      }
76  
77      /**
78       * @param bindAddress the address to bind the socket channel to
79       * @see #getBindAddress()
80       */
81      public void setBindAddress(SocketAddress bindAddress)
82      {
83          this.bindAddress = bindAddress;
84      }
85  
86      public Future<Session> connect(InetSocketAddress address, SessionFrameListener listener) throws IOException
87      {
88          if (!factory.isStarted())
89              throw new IllegalStateException(Factory.class.getSimpleName() + " is not started");
90  
91          SocketChannel channel = SocketChannel.open();
92          if (bindAddress != null)
93              channel.bind(bindAddress);
94          channel.socket().setTcpNoDelay(true);
95          channel.configureBlocking(false);
96  
97          SessionPromise result = new SessionPromise(channel, this, listener);
98  
99          channel.connect(address);
100         factory.selector.register(channel, result);
101 
102         return result;
103     }
104 
105     public long getMaxIdleTime()
106     {
107         return maxIdleTime;
108     }
109 
110     public void setMaxIdleTime(long maxIdleTime)
111     {
112         this.maxIdleTime = maxIdleTime;
113     }
114 
115     public int getInitialWindowSize()
116     {
117         return initialWindowSize;
118     }
119 
120     public void setInitialWindowSize(int initialWindowSize)
121     {
122         this.initialWindowSize = initialWindowSize;
123     }
124 
125     protected String selectProtocol(List<String> serverProtocols)
126     {
127         if (serverProtocols == null)
128             return "spdy/2";
129 
130         for (String serverProtocol : serverProtocols)
131         {
132             for (String protocol : factories.keySet())
133             {
134                 if (serverProtocol.equals(protocol))
135                     return protocol;
136             }
137             String protocol = factory.selectProtocol(serverProtocols);
138             if (protocol != null)
139                 return protocol;
140         }
141 
142         return null;
143     }
144 
145     public AsyncConnectionFactory getAsyncConnectionFactory(String protocol)
146     {
147         for (Map.Entry<String, AsyncConnectionFactory> entry : factories.entrySet())
148         {
149             if (protocol.equals(entry.getKey()))
150                 return entry.getValue();
151         }
152         for (Map.Entry<String, AsyncConnectionFactory> entry : factory.factories.entrySet())
153         {
154             if (protocol.equals(entry.getKey()))
155                 return entry.getValue();
156         }
157         return null;
158     }
159 
160     public void putAsyncConnectionFactory(String protocol, AsyncConnectionFactory factory)
161     {
162         factories.put(protocol, factory);
163     }
164 
165     public AsyncConnectionFactory removeAsyncConnectionFactory(String protocol)
166     {
167         return factories.remove(protocol);
168     }
169 
170     protected SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)
171     {
172         String peerHost = channel.socket().getInetAddress().getHostAddress();
173         int peerPort = channel.socket().getPort();
174         SSLEngine engine = sslContextFactory.newSslEngine(peerHost, peerPort);
175         engine.setUseClientMode(true);
176         return engine;
177     }
178 
179     protected FlowControlStrategy newFlowControlStrategy()
180     {
181         return FlowControlStrategyFactory.newFlowControlStrategy(version);
182     }
183 
184     public static class Factory extends AggregateLifeCycle
185     {
186         private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
187         private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
188         private final ByteBufferPool bufferPool = new StandardByteBufferPool();
189         private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
190         private final Executor threadPool;
191         private final SslContextFactory sslContextFactory;
192         private final SelectorManager selector;
193 
194         public Factory()
195         {
196             this(null, null);
197         }
198 
199         public Factory(SslContextFactory sslContextFactory)
200         {
201             this(null, sslContextFactory);
202         }
203 
204         public Factory(Executor threadPool)
205         {
206             this(threadPool, null);
207         }
208 
209         public Factory(Executor threadPool, SslContextFactory sslContextFactory)
210         {
211             if (threadPool == null)
212                 threadPool = new QueuedThreadPool();
213             this.threadPool = threadPool;
214             addBean(threadPool);
215 
216             this.sslContextFactory = sslContextFactory;
217             if (sslContextFactory != null)
218                 addBean(sslContextFactory);
219 
220             selector = new ClientSelectorManager();
221             addBean(selector);
222 
223             factories.put("spdy/2", new ClientSPDYAsyncConnectionFactory());
224         }
225 
226         public SPDYClient newSPDYClient(short version)
227         {
228             return new SPDYClient(version, this);
229         }
230 
231         @Override
232         protected void doStop() throws Exception
233         {
234             closeConnections();
235             super.doStop();
236         }
237 
238         protected String selectProtocol(List<String> serverProtocols)
239         {
240             for (String serverProtocol : serverProtocols)
241             {
242                 for (String protocol : factories.keySet())
243                 {
244                     if (serverProtocol.equals(protocol))
245                         return protocol;
246                 }
247             }
248             return null;
249         }
250 
251         private boolean sessionOpened(Session session)
252         {
253             // Add sessions only if the factory is not stopping
254             return isRunning() && sessions.offer(session);
255         }
256 
257         private boolean sessionClosed(Session session)
258         {
259             // Remove sessions only if the factory is not stopping
260             // to avoid concurrent removes during iterations
261             return isRunning() && sessions.remove(session);
262         }
263 
264         private void closeConnections()
265         {
266             for (Session session : sessions)
267                 session.goAway();
268             sessions.clear();
269         }
270 
271         protected Collection<Session> getSessions()
272         {
273             return Collections.unmodifiableCollection(sessions);
274         }
275 
276         private class ClientSelectorManager extends SelectorManager
277         {
278             @Override
279             public boolean dispatch(Runnable task)
280             {
281                 try
282                 {
283                     threadPool.execute(task);
284                     return true;
285                 }
286                 catch (RejectedExecutionException x)
287                 {
288                     return false;
289                 }
290             }
291 
292             @Override
293             protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
294             {
295                 SessionPromise attachment = (SessionPromise)key.attachment();
296 
297                 long maxIdleTime = attachment.client.getMaxIdleTime();
298                 if (maxIdleTime < 0)
299                     maxIdleTime = getMaxIdleTime();
300                 SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, (int)maxIdleTime);
301 
302                 AsyncConnection connection = newConnection(channel, result, attachment);
303                 result.setConnection(connection);
304 
305                 return result;
306             }
307 
308             @Override
309             protected void endPointOpened(SelectChannelEndPoint endpoint)
310             {
311             }
312 
313             @Override
314             protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
315             {
316             }
317 
318             @Override
319             protected void endPointClosed(SelectChannelEndPoint endpoint)
320             {
321                 endpoint.getConnection().onClose();
322             }
323 
324             @Override
325             public AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint, final Object attachment)
326             {
327                 SessionPromise sessionPromise = (SessionPromise)attachment;
328                 final SPDYClient client = sessionPromise.client;
329 
330                 try
331                 {
332                     if (sslContextFactory != null)
333                     {
334                         final SSLEngine engine = client.newSSLEngine(sslContextFactory, channel);
335                         SslConnection sslConnection = new SslConnection(engine, endPoint)
336                         {
337                             @Override
338                             public void onClose()
339                             {
340                                 NextProtoNego.remove(engine);
341                                 super.onClose();
342                             }
343                         };
344                         endPoint.setConnection(sslConnection);
345                         final AsyncEndPoint sslEndPoint = sslConnection.getSslEndPoint();
346                         NextProtoNego.put(engine, new NextProtoNego.ClientProvider()
347                         {
348                             @Override
349                             public boolean supports()
350                             {
351                                 return true;
352                             }
353 
354                             @Override
355                             public void unsupported()
356                             {
357                                 // Server does not support NPN, but this is a SPDY client, so hardcode SPDY
358                                 ClientSPDYAsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
359                                 AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachment);
360                                 sslEndPoint.setConnection(connection);
361                             }
362 
363                             @Override
364                             public String selectProtocol(List<String> protocols)
365                             {
366                                 String protocol = client.selectProtocol(protocols);
367                                 if (protocol == null)
368                                     return null;
369 
370                                 AsyncConnectionFactory connectionFactory = client.getAsyncConnectionFactory(protocol);
371                                 AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachment);
372                                 sslEndPoint.setConnection(connection);
373                                 return protocol;
374                             }
375                         });
376 
377                         AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
378                         sslEndPoint.setConnection(connection);
379 
380                         startHandshake(engine);
381 
382                         return sslConnection;
383                     }
384                     else
385                     {
386                         AsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
387                         AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, attachment);
388                         endPoint.setConnection(connection);
389                         return connection;
390                     }
391                 }
392                 catch (RuntimeException x)
393                 {
394                     sessionPromise.failed(null,x);
395                     throw x;
396                 }
397             }
398 
399             private void startHandshake(SSLEngine engine)
400             {
401                 try
402                 {
403                     engine.beginHandshake();
404                 }
405                 catch (SSLException x)
406                 {
407                     throw new RuntimeException(x);
408                 }
409             }
410         }
411     }
412 
413     private static class SessionPromise extends Promise<Session>
414     {
415         private final SocketChannel channel;
416         private final SPDYClient client;
417         private final SessionFrameListener listener;
418 
419         private SessionPromise(SocketChannel channel, SPDYClient client, SessionFrameListener listener)
420         {
421             this.channel = channel;
422             this.client = client;
423             this.listener = listener;
424         }
425 
426         @Override
427         public boolean cancel(boolean mayInterruptIfRunning)
428         {
429             try
430             {
431                 super.cancel(mayInterruptIfRunning);
432                 channel.close();
433                 return true;
434             }
435             catch (IOException x)
436             {
437                 return true;
438             }
439         }
440     }
441 
442     private static class ClientSPDYAsyncConnectionFactory implements AsyncConnectionFactory
443     {
444         @Override
445         public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
446         {
447             SessionPromise sessionPromise = (SessionPromise)attachment;
448             SPDYClient client = sessionPromise.client;
449             Factory factory = client.factory;
450 
451             CompressionFactory compressionFactory = new StandardCompressionFactory();
452             Parser parser = new Parser(compressionFactory.newDecompressor());
453             Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor());
454 
455             SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory);
456             endPoint.setConnection(connection);
457 
458             FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy();
459 
460             StandardSession session = new StandardSession(client.version, factory.bufferPool, factory.threadPool, factory.scheduler, connection, connection, 1, sessionPromise.listener, generator, flowControlStrategy);
461             session.setWindowSize(client.getInitialWindowSize());
462             parser.addListener(session);
463             sessionPromise.completed(session);
464             connection.setSession(session);
465 
466             factory.sessionOpened(session);
467 
468             return connection;
469         }
470 
471         private class ClientSPDYAsyncConnection extends SPDYAsyncConnection
472         {
473             private final Factory factory;
474 
475             public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
476             {
477                 super(endPoint, bufferPool, parser);
478                 this.factory = factory;
479             }
480 
481             @Override
482             public void onClose()
483             {
484                 super.onClose();
485                 factory.sessionClosed(getSession());
486             }
487         }
488     }
489 }