View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.server;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Queue;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  
34  import javax.servlet.http.HttpServletRequest;
35  import javax.servlet.http.HttpServletResponse;
36  
37  import org.eclipse.jetty.http.HttpStatus;
38  import org.eclipse.jetty.io.ByteBufferPool;
39  import org.eclipse.jetty.io.EndPoint;
40  import org.eclipse.jetty.io.MappedByteBufferPool;
41  import org.eclipse.jetty.server.HttpConnection;
42  import org.eclipse.jetty.util.component.ContainerLifeCycle;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
46  import org.eclipse.jetty.util.thread.Scheduler;
47  import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
48  import org.eclipse.jetty.websocket.api.WebSocketException;
49  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
50  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
51  import org.eclipse.jetty.websocket.api.util.QuoteUtil;
52  import org.eclipse.jetty.websocket.common.LogicalConnection;
53  import org.eclipse.jetty.websocket.common.SessionFactory;
54  import org.eclipse.jetty.websocket.common.WebSocketSession;
55  import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
56  import org.eclipse.jetty.websocket.common.events.EventDriver;
57  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
58  import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
59  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
60  import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
61  import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
62  import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
63  import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
64  
65  /**
66   * Factory to create WebSocket connections
67   */
68  public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketServletFactory
69  {
70      private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
71      private static final ThreadLocal<UpgradeContext> ACTIVE_CONTEXT = new ThreadLocal<>();
72  
73      public static UpgradeContext getActiveUpgradeContext()
74      {
75          return ACTIVE_CONTEXT.get();
76      }
77  
78      protected static void setActiveUpgradeContext(UpgradeContext connection)
79      {
80          ACTIVE_CONTEXT.set(connection);
81      }
82  
83      private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
84      {
85          handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
86      }
87  
88      /**
89       * Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler.
90       */
91      private final Scheduler scheduler = new ScheduledExecutorScheduler();
92      private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
93      private final String supportedVersions;
94      private final WebSocketPolicy defaultPolicy;
95      private final EventDriverFactory eventDriverFactory;
96      private final WebSocketExtensionFactory extensionFactory;
97      private List<SessionFactory> sessionFactories;
98      private WebSocketCreator creator;
99      private List<Class<?>> registeredSocketClasses;
100 
101     public WebSocketServerFactory()
102     {
103         this(WebSocketPolicy.newServerPolicy(),new MappedByteBufferPool());
104     }
105 
106     public WebSocketServerFactory(WebSocketPolicy policy)
107     {
108         this(policy,new MappedByteBufferPool());
109     }
110 
111     public WebSocketServerFactory(WebSocketPolicy policy, ByteBufferPool bufferPool)
112     {
113         addBean(scheduler);
114         addBean(bufferPool);
115 
116         this.registeredSocketClasses = new ArrayList<>();
117 
118         this.defaultPolicy = policy;
119         this.eventDriverFactory = new EventDriverFactory(defaultPolicy);
120         this.extensionFactory = new WebSocketExtensionFactory(defaultPolicy,bufferPool);
121         this.sessionFactories = new ArrayList<>();
122         this.sessionFactories.add(new WebSocketSessionFactory());
123         this.creator = this;
124 
125         // Create supportedVersions
126         List<Integer> versions = new ArrayList<>();
127         for (int v : handshakes.keySet())
128         {
129             versions.add(v);
130         }
131         Collections.sort(versions,Collections.reverseOrder()); // newest first
132         StringBuilder rv = new StringBuilder();
133         for (int v : versions)
134         {
135             if (rv.length() > 0)
136             {
137                 rv.append(", ");
138             }
139             rv.append(v);
140         }
141         supportedVersions = rv.toString();
142     }
143 
144     @Override
145     public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) throws IOException
146     {
147         return acceptWebSocket(getCreator(),request,response);
148     }
149 
150     @Override
151     public boolean acceptWebSocket(WebSocketCreator creator, HttpServletRequest request, HttpServletResponse response) throws IOException
152     {
153         try
154         {
155             ServletUpgradeRequest sockreq = new ServletUpgradeRequest(request);
156             ServletUpgradeResponse sockresp = new ServletUpgradeResponse(response);
157 
158             UpgradeContext context = getActiveUpgradeContext();
159             if (context == null)
160             {
161                 context = new UpgradeContext();
162                 setActiveUpgradeContext(context);
163             }
164 
165             context.setRequest(sockreq);
166             context.setResponse(sockresp);
167 
168             Object websocketPojo = creator.createWebSocket(sockreq,sockresp);
169 
170             // Handle response forbidden (and similar paths)
171             if (sockresp.isCommitted())
172             {
173                 return false;
174             }
175 
176             if (websocketPojo == null)
177             {
178                 // no creation, sorry
179                 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
180                 return false;
181             }
182 
183             // Send the upgrade
184             EventDriver driver = eventDriverFactory.wrap(websocketPojo);
185             return upgrade(sockreq,sockresp,driver);
186         }
187         catch (URISyntaxException e)
188         {
189             throw new IOException("Unable to accept websocket due to mangled URI",e);
190         }
191     }
192 
193     public void addSessionFactory(SessionFactory sessionFactory)
194     {
195         if (sessionFactories.contains(sessionFactory))
196         {
197             return;
198         }
199         this.sessionFactories.add(sessionFactory);
200     }
201 
202     @Override
203     public void cleanup()
204     {
205         try
206         {
207             this.stop();
208         }
209         catch (Exception e)
210         {
211             LOG.warn(e);
212         }
213     }
214 
215     protected void closeAllConnections()
216     {
217         for (WebSocketSession session : sessions)
218         {
219             session.close();
220         }
221         sessions.clear();
222     }
223 
224     @Override
225     public WebSocketServletFactory createFactory(WebSocketPolicy policy)
226     {
227         return new WebSocketServerFactory(policy);
228     }
229 
230     private WebSocketSession createSession(URI requestURI, EventDriver websocket, LogicalConnection connection)
231     {
232         if (websocket == null)
233         {
234             throw new InvalidWebSocketException("Unable to create Session from null websocket");
235         }
236 
237         for (SessionFactory impl : sessionFactories)
238         {
239             if (impl.supports(websocket))
240             {
241                 try
242                 {
243                     return impl.createSession(requestURI,websocket,connection);
244                 }
245                 catch (Throwable e)
246                 {
247                     throw new InvalidWebSocketException("Unable to create Session",e);
248                 }
249             }
250         }
251 
252         throw new InvalidWebSocketException("Unable to create Session: unrecognized internal EventDriver type: " + websocket.getClass().getName());
253     }
254 
255     /**
256      * Default Creator logic
257      */
258     @Override
259     public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
260     {
261         if (registeredSocketClasses.size() < 1)
262         {
263             throw new WebSocketException("No WebSockets have been registered with the factory.  Cannot use default implementation of WebSocketCreator.");
264         }
265 
266         if (registeredSocketClasses.size() > 1)
267         {
268             LOG.warn("You have registered more than 1 websocket object, and are using the default WebSocketCreator! Using first registered websocket.");
269         }
270 
271         Class<?> firstClass = registeredSocketClasses.get(0);
272         try
273         {
274             return firstClass.newInstance();
275         }
276         catch (InstantiationException | IllegalAccessException e)
277         {
278             throw new WebSocketException("Unable to create instance of " + firstClass,e);
279         }
280     }
281 
282     @Override
283     protected void doStop() throws Exception
284     {
285         closeAllConnections();
286         super.doStop();
287     }
288 
289     @Override
290     public WebSocketCreator getCreator()
291     {
292         return this.creator;
293     }
294 
295     public EventDriverFactory getEventDriverFactory()
296     {
297         return eventDriverFactory;
298     }
299 
300     @Override
301     public ExtensionFactory getExtensionFactory()
302     {
303         return extensionFactory;
304     }
305 
306     @Override
307     public WebSocketPolicy getPolicy()
308     {
309         return defaultPolicy;
310     }
311 
312     @Override
313     public void init() throws Exception
314     {
315         start(); // start lifecycle
316     }
317 
318     @Override
319     public boolean isUpgradeRequest(HttpServletRequest request, HttpServletResponse response)
320     {
321         if (!"GET".equalsIgnoreCase(request.getMethod()))
322         {
323             // not a "GET" request (not a websocket upgrade)
324             return false;
325         }
326 
327         String connection = request.getHeader("connection");
328         if (connection == null)
329         {
330             // no "Connection: upgrade" header present.
331             return false;
332         }
333 
334         // Test for "Upgrade" token
335         boolean foundUpgradeToken = false;
336         Iterator<String> iter = QuoteUtil.splitAt(connection,",");
337         while (iter.hasNext())
338         {
339             String token = iter.next();
340             if ("upgrade".equalsIgnoreCase(token))
341             {
342                 foundUpgradeToken = true;
343                 break;
344             }
345         }
346 
347         if (!foundUpgradeToken)
348         {
349             return false;
350         }
351 
352         String upgrade = request.getHeader("Upgrade");
353         if (upgrade == null)
354         {
355             // no "Upgrade: websocket" header present.
356             return false;
357         }
358 
359         if (!"websocket".equalsIgnoreCase(upgrade))
360         {
361             LOG.debug("Not a 'Upgrade: WebSocket' (was [Upgrade: " + upgrade + "])");
362             return false;
363         }
364 
365         if (!"HTTP/1.1".equals(request.getProtocol()))
366         {
367             LOG.debug("Not a 'HTTP/1.1' request (was [" + request.getProtocol() + "])");
368             return false;
369         }
370 
371         return true;
372     }
373 
374     protected String[] parseProtocols(String protocol)
375     {
376         if (protocol == null)
377         {
378             return new String[]
379             { null };
380         }
381         protocol = protocol.trim();
382         if ((protocol == null) || (protocol.length() == 0))
383         {
384             return new String[]
385             { null };
386         }
387         String[] passed = protocol.split("\\s*,\\s*");
388         String[] protocols = new String[passed.length + 1];
389         System.arraycopy(passed,0,protocols,0,passed.length);
390         return protocols;
391     }
392 
393     @Override
394     public void register(Class<?> websocketPojo)
395     {
396         registeredSocketClasses.add(websocketPojo);
397     }
398 
399     public boolean sessionClosed(WebSocketSession session)
400     {
401         return isRunning() && sessions.remove(session);
402     }
403 
404     public boolean sessionOpened(WebSocketSession session)
405     {
406         if (LOG.isDebugEnabled())
407         {
408             LOG.debug("Session Opened: {}",session);
409         }
410         if (!isRunning())
411         {
412             LOG.warn("Factory is not running");
413             return false;
414         }
415         boolean ret = sessions.offer(session);
416         session.open();
417         return ret;
418     }
419 
420     @Override
421     public void setCreator(WebSocketCreator creator)
422     {
423         this.creator = creator;
424     }
425 
426     /**
427      * Upgrade the request/response to a WebSocket Connection.
428      * <p>
429      * This method will not normally return, but will instead throw a UpgradeConnectionException, to exit HTTP handling and initiate WebSocket handling of the
430      * connection.
431      * 
432      * @param request
433      *            The request to upgrade
434      * @param response
435      *            The response to upgrade
436      * @param driver
437      *            The websocket handler implementation to use
438      * @throws IOException
439      */
440     public boolean upgrade(ServletUpgradeRequest request, ServletUpgradeResponse response, EventDriver driver) throws IOException
441     {
442         if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
443         {
444             throw new IllegalStateException("Not a 'WebSocket: Upgrade' request");
445         }
446         if (!"HTTP/1.1".equals(request.getHttpVersion()))
447         {
448             throw new IllegalStateException("Not a 'HTTP/1.1' request");
449         }
450 
451         int version = request.getHeaderInt("Sec-WebSocket-Version");
452         if (version < 0)
453         {
454             // Old pre-RFC version specifications (header not present in RFC-6455)
455             version = request.getHeaderInt("Sec-WebSocket-Draft");
456         }
457 
458         WebSocketHandshake handshaker = handshakes.get(version);
459         if (handshaker == null)
460         {
461             LOG.warn("Unsupported Websocket version: " + version);
462             // Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol
463             // Using the examples as outlined
464             response.setHeader("Sec-WebSocket-Version",supportedVersions);
465             response.sendError(HttpStatus.BAD_REQUEST_400,"Unsupported websocket version specification");
466             return false;
467         }
468 
469         // Initialize / Negotiate Extensions
470         ExtensionStack extensionStack = new ExtensionStack(getExtensionFactory());
471         // The JSR allows for the extensions to be pre-negotiated, filtered, etc...
472         // Usually from a Configurator.
473         if (response.isExtensionsNegotiated())
474         {
475             // Use pre-negotiated extension list from response
476             extensionStack.negotiate(response.getExtensions());
477         }
478         else
479         {
480             // Use raw extension list from request
481             extensionStack.negotiate(request.getExtensions());
482         }
483 
484         // Create connection
485         UpgradeContext context = getActiveUpgradeContext();
486         LogicalConnection connection = context.getConnection();
487 
488         if (connection == null)
489         {
490             HttpConnection http = HttpConnection.getCurrentConnection();
491             EndPoint endp = http.getEndPoint();
492             Executor executor = http.getConnector().getExecutor();
493             ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
494             WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
495             connection = wsConnection;
496 
497             extensionStack.configure(wsConnection.getParser());
498             extensionStack.configure(wsConnection.getGenerator());
499 
500             LOG.debug("HttpConnection: {}",http);
501             LOG.debug("AsyncWebSocketConnection: {}",connection);
502         }
503 
504         // Setup Session
505         WebSocketSession session = createSession(request.getRequestURI(),driver,connection);
506         session.setPolicy(driver.getPolicy());
507         session.setUpgradeRequest(request);
508         // set true negotiated extension list back to response 
509         response.setExtensions(extensionStack.getNegotiatedExtensions());
510         session.setUpgradeResponse(response);
511         connection.setSession(session);
512 
513         // Setup Incoming Routing
514         connection.setNextIncomingFrames(extensionStack);
515         extensionStack.setNextIncoming(session);
516 
517         // Setup Outgoing Routing
518         session.setOutgoingHandler(extensionStack);
519         extensionStack.setNextOutgoing(connection);
520 
521         // Start Components
522         try
523         {
524             session.start();
525         }
526         catch (Exception e)
527         {
528             throw new IOException("Unable to start Session",e);
529         }
530         try
531         {
532             extensionStack.start();
533         }
534         catch (Exception e)
535         {
536             throw new IOException("Unable to start Extension Stack",e);
537         }
538 
539         // Tell jetty about the new connection
540         request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);
541 
542         // Process (version specific) handshake response
543         LOG.debug("Handshake Response: {}",handshaker);
544         handshaker.doHandshakeResponse(request,response);
545 
546         LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection);
547         return true;
548     }
549 }