View Javadoc

1   //========================================================================
2   //Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //All rights reserved. This program and the accompanying materials
5   //are made available under the terms of the Eclipse Public License v1.0
6   //and Apache License v2.0 which accompanies this distribution.
7   //The Eclipse Public License is available at
8   //http://www.eclipse.org/legal/epl-v10.html
9   //The Apache License v2.0 is available at
10  //http://www.opensource.org/licenses/apache2.0.php
11  //You may elect to redistribute this code under either of these licenses.
12  //========================================================================
13  
14  
15  package org.eclipse.jetty.spdy;
16  
17  import java.io.IOException;
18  import java.nio.channels.SocketChannel;
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.LinkedHashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Queue;
26  import java.util.concurrent.ConcurrentLinkedQueue;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.ScheduledExecutorService;
31  import java.util.concurrent.TimeUnit;
32  import javax.net.ssl.SSLEngine;
33  import javax.net.ssl.SSLException;
34  
35  import org.eclipse.jetty.io.AsyncEndPoint;
36  import org.eclipse.jetty.io.nio.AsyncConnection;
37  import org.eclipse.jetty.io.nio.SslConnection;
38  import org.eclipse.jetty.npn.NextProtoNego;
39  import org.eclipse.jetty.server.nio.SelectChannelConnector;
40  import org.eclipse.jetty.spdy.api.SPDY;
41  import org.eclipse.jetty.spdy.api.Session;
42  import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
43  import org.eclipse.jetty.util.component.AggregateLifeCycle;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  import org.eclipse.jetty.util.ssl.SslContextFactory;
47  import org.eclipse.jetty.util.thread.ThreadPool;
48  
49  public class SPDYServerConnector extends SelectChannelConnector
50  {
51      private static final Logger logger = Log.getLogger(SPDYServerConnector.class);
52  
53      // Order is important on server side, so we use a LinkedHashMap
54      private final Map<String, AsyncConnectionFactory> factories = new LinkedHashMap<>();
55      private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
56      private final ByteBufferPool bufferPool = new StandardByteBufferPool();
57      private final Executor executor = new LazyExecutor();
58      private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
59      private final ServerSessionFrameListener listener;
60      private final SslContextFactory sslContextFactory;
61      private volatile AsyncConnectionFactory defaultConnectionFactory;
62      private volatile int initialWindowSize = 65536;
63  
64      public SPDYServerConnector(ServerSessionFrameListener listener)
65      {
66          this(listener, null);
67      }
68  
69      public SPDYServerConnector(ServerSessionFrameListener listener, SslContextFactory sslContextFactory)
70      {
71          this.listener = listener;
72          this.sslContextFactory = sslContextFactory;
73          if (sslContextFactory != null)
74              addBean(sslContextFactory);
75          putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, bufferPool, executor, scheduler, listener));
76          putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, bufferPool, executor, scheduler, listener));
77          setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("spdy/2"));
78      }
79  
80      public ByteBufferPool getByteBufferPool()
81      {
82          return bufferPool;
83      }
84  
85      public Executor getExecutor()
86      {
87          return executor;
88      }
89  
90      public ScheduledExecutorService getScheduler()
91      {
92          return scheduler;
93      }
94  
95      public ServerSessionFrameListener getServerSessionFrameListener()
96      {
97          return listener;
98      }
99  
100     public SslContextFactory getSslContextFactory()
101     {
102         return sslContextFactory;
103     }
104 
105     @Override
106     protected void doStart() throws Exception
107     {
108         super.doStart();
109         logger.info("SPDY support is experimental. Please report feedback at jetty-dev@eclipse.org");
110     }
111 
112     @Override
113     protected void doStop() throws Exception
114     {
115         closeSessions();
116         scheduler.shutdown();
117         super.doStop();
118     }
119 
120     @Override
121     public void join() throws InterruptedException
122     {
123         scheduler.awaitTermination(0, TimeUnit.MILLISECONDS);
124         super.join();
125     }
126 
127     public AsyncConnectionFactory getAsyncConnectionFactory(String protocol)
128     {
129         synchronized (factories)
130         {
131             return factories.get(protocol);
132         }
133     }
134 
135     public AsyncConnectionFactory putAsyncConnectionFactory(String protocol, AsyncConnectionFactory factory)
136     {
137         synchronized (factories)
138         {
139             return factories.put(protocol, factory);
140         }
141     }
142 
143     public AsyncConnectionFactory removeAsyncConnectionFactory(String protocol)
144     {
145         synchronized (factories)
146         {
147             return factories.remove(protocol);
148         }
149     }
150 
151     public Map<String, AsyncConnectionFactory> getAsyncConnectionFactories()
152     {
153         synchronized (factories)
154         {
155             return new LinkedHashMap<>(factories);
156         }
157     }
158 
159     public void clearAsyncConnectionFactories()
160     {
161         synchronized (factories)
162         {
163             factories.clear();
164         }
165     }
166 
167     protected List<String> provideProtocols()
168     {
169         synchronized (factories)
170         {
171             return new ArrayList<>(factories.keySet());
172         }
173     }
174 
175     public AsyncConnectionFactory getDefaultAsyncConnectionFactory()
176     {
177         return defaultConnectionFactory;
178     }
179 
180     public void setDefaultAsyncConnectionFactory(AsyncConnectionFactory defaultConnectionFactory)
181     {
182         this.defaultConnectionFactory = defaultConnectionFactory;
183     }
184 
185     @Override
186     protected AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint)
187     {
188         if (sslContextFactory != null)
189         {
190             final SSLEngine engine = newSSLEngine(sslContextFactory, channel);
191             SslConnection sslConnection = new SslConnection(engine, endPoint)
192             {
193                 @Override
194                 public void onClose()
195                 {
196                     NextProtoNego.remove(engine);
197                     super.onClose();
198                 }
199             };
200             endPoint.setConnection(sslConnection);
201             final AsyncEndPoint sslEndPoint = sslConnection.getSslEndPoint();
202             NextProtoNego.put(engine, new NextProtoNego.ServerProvider()
203             {
204                 @Override
205                 public void unsupported()
206                 {
207                     AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory();
208                     AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, SPDYServerConnector.this);
209                     sslEndPoint.setConnection(connection);
210                 }
211 
212                 @Override
213                 public List<String> protocols()
214                 {
215                     return provideProtocols();
216                 }
217 
218                 @Override
219                 public void protocolSelected(String protocol)
220                 {
221                     AsyncConnectionFactory connectionFactory = getAsyncConnectionFactory(protocol);
222                     AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, SPDYServerConnector.this);
223                     sslEndPoint.setConnection(connection);
224                 }
225             });
226 
227             AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
228             sslEndPoint.setConnection(connection);
229 
230             startHandshake(engine);
231 
232             return sslConnection;
233         }
234         else
235         {
236             AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory();
237             AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, this);
238             endPoint.setConnection(connection);
239             return connection;
240         }
241     }
242 
243     protected FlowControlStrategy newFlowControlStrategy(short version)
244     {
245         return FlowControlStrategyFactory.newFlowControlStrategy(version);
246     }
247 
248     protected SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)
249     {
250         String peerHost = channel.socket().getInetAddress().getHostAddress();
251         int peerPort = channel.socket().getPort();
252         SSLEngine engine = sslContextFactory.newSslEngine(peerHost, peerPort);
253         engine.setUseClientMode(false);
254         return engine;
255     }
256 
257     private void startHandshake(SSLEngine engine)
258     {
259         try
260         {
261             engine.beginHandshake();
262         }
263         catch (SSLException x)
264         {
265             throw new RuntimeException(x);
266         }
267     }
268 
269     protected boolean sessionOpened(Session session)
270     {
271         // Add sessions only if the connector is not stopping
272         return isRunning() && sessions.offer(session);
273     }
274 
275     protected boolean sessionClosed(Session session)
276     {
277         // Remove sessions only if the connector is not stopping
278         // to avoid concurrent removes during iterations
279         return isRunning() && sessions.remove(session);
280     }
281 
282     private void closeSessions()
283     {
284         for (Session session : sessions)
285             session.goAway();
286         sessions.clear();
287     }
288 
289     protected Collection<Session> getSessions()
290     {
291         return Collections.unmodifiableCollection(sessions);
292     }
293 
294     public int getInitialWindowSize()
295     {
296         return initialWindowSize;
297     }
298 
299     public void setInitialWindowSize(int initialWindowSize)
300     {
301         this.initialWindowSize = initialWindowSize;
302     }
303 
304     private class LazyExecutor implements Executor
305     {
306         @Override
307         public void execute(Runnable command)
308         {
309             ThreadPool threadPool = getThreadPool();
310             if (threadPool == null)
311                 throw new RejectedExecutionException();
312             threadPool.dispatch(command);
313         }
314     }
315 
316 
317     @Override
318     public void dump(Appendable out, String indent) throws IOException
319     {
320         super.dump(out,indent);
321         AggregateLifeCycle.dump(out, indent, new ArrayList<Session>(sessions));
322     }
323     
324     
325 }