View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.server;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.concurrent.CopyOnWriteArraySet;
32  import java.util.concurrent.Executor;
33  
34  import javax.servlet.http.HttpServletRequest;
35  import javax.servlet.http.HttpServletResponse;
36  
37  import org.eclipse.jetty.http.HttpStatus;
38  import org.eclipse.jetty.io.ByteBufferPool;
39  import org.eclipse.jetty.io.EndPoint;
40  import org.eclipse.jetty.io.MappedByteBufferPool;
41  import org.eclipse.jetty.server.HttpConnection;
42  import org.eclipse.jetty.util.component.ContainerLifeCycle;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
46  import org.eclipse.jetty.util.thread.Scheduler;
47  import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
48  import org.eclipse.jetty.websocket.api.StatusCode;
49  import org.eclipse.jetty.websocket.api.WebSocketException;
50  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
51  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
52  import org.eclipse.jetty.websocket.api.util.QuoteUtil;
53  import org.eclipse.jetty.websocket.common.LogicalConnection;
54  import org.eclipse.jetty.websocket.common.SessionFactory;
55  import org.eclipse.jetty.websocket.common.SessionListener;
56  import org.eclipse.jetty.websocket.common.WebSocketSession;
57  import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
58  import org.eclipse.jetty.websocket.common.events.EventDriver;
59  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
60  import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
61  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
62  import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
63  import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
64  import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
65  import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
66  
67  /**
68   * Factory to create WebSocket connections
69   */
70  public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketServletFactory, SessionListener
71  {
72      private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
73  
74      private final ClassLoader contextClassloader;
75      private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
76      /**
77       * Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler.
78       */
79      private final Scheduler scheduler = new ScheduledExecutorScheduler();
80      private final String supportedVersions;
81      private final WebSocketPolicy defaultPolicy;
82      private final EventDriverFactory eventDriverFactory;
83      private final ByteBufferPool bufferPool;
84      private final WebSocketExtensionFactory extensionFactory;
85      private List<SessionFactory> sessionFactories;
86      private Set<WebSocketSession> openSessions = new CopyOnWriteArraySet<>();
87      private WebSocketCreator creator;
88      private List<Class<?>> registeredSocketClasses;
89  
90      public WebSocketServerFactory()
91      {
92          this(WebSocketPolicy.newServerPolicy(), new MappedByteBufferPool());
93      }
94  
95      public WebSocketServerFactory(WebSocketPolicy policy)
96      {
97          this(policy, new MappedByteBufferPool());
98      }
99  
100     public WebSocketServerFactory(ByteBufferPool bufferPool)
101     {
102         this(WebSocketPolicy.newServerPolicy(), bufferPool);
103     }
104 
105     public WebSocketServerFactory(WebSocketPolicy policy, ByteBufferPool bufferPool)
106     {
107         handshakes.put(HandshakeRFC6455.VERSION, new HandshakeRFC6455());
108 
109         addBean(scheduler);
110         addBean(bufferPool);
111         
112         this.contextClassloader = Thread.currentThread().getContextClassLoader();
113 
114         this.registeredSocketClasses = new ArrayList<>();
115 
116         this.defaultPolicy = policy;
117         this.eventDriverFactory = new EventDriverFactory(defaultPolicy);
118         this.bufferPool = bufferPool;
119         this.extensionFactory = new WebSocketExtensionFactory(defaultPolicy, this.bufferPool);
120         
121         // Bug #431459 - unregistering compression extensions till they are more stable
122         this.extensionFactory.unregister("deflate-frame");
123         this.extensionFactory.unregister("permessage-deflate");
124         this.extensionFactory.unregister("x-webkit-deflate-frame");
125         
126         this.sessionFactories = new ArrayList<>();
127         this.sessionFactories.add(new WebSocketSessionFactory(this));
128         this.creator = this;
129 
130         // Create supportedVersions
131         List<Integer> versions = new ArrayList<>();
132         for (int v : handshakes.keySet())
133         {
134             versions.add(v);
135         }
136         Collections.sort(versions, Collections.reverseOrder()); // newest first
137         StringBuilder rv = new StringBuilder();
138         for (int v : versions)
139         {
140             if (rv.length() > 0)
141             {
142                 rv.append(", ");
143             }
144             rv.append(v);
145         }
146         supportedVersions = rv.toString();
147     }
148 
149     @Override
150     public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) throws IOException
151     {
152         return acceptWebSocket(getCreator(), request, response);
153     }
154 
155     @Override
156     public boolean acceptWebSocket(WebSocketCreator creator, HttpServletRequest request, HttpServletResponse response) throws IOException
157     {
158         ClassLoader old = Thread.currentThread().getContextClassLoader();
159         try
160         {
161             Thread.currentThread().setContextClassLoader(contextClassloader);
162             ServletUpgradeRequest sockreq = new ServletUpgradeRequest(request);
163             ServletUpgradeResponse sockresp = new ServletUpgradeResponse(response);
164 
165             Object websocketPojo = creator.createWebSocket(sockreq, sockresp);
166 
167             // Handle response forbidden (and similar paths)
168             if (sockresp.isCommitted())
169             {
170                 return false;
171             }
172 
173             if (websocketPojo == null)
174             {
175                 // no creation, sorry
176                 sockresp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Endpoint Creation Failed");
177                 return false;
178             }
179 
180             // Get the original HTTPConnection
181             HttpConnection connection = (HttpConnection)request.getAttribute("org.eclipse.jetty.server.HttpConnection");
182             
183             // Send the upgrade
184             EventDriver driver = eventDriverFactory.wrap(websocketPojo);
185             return upgrade(connection, sockreq, sockresp, driver);
186         }
187         catch (URISyntaxException e)
188         {
189             throw new IOException("Unable to accept websocket due to mangled URI", e);
190         } 
191         finally
192         {
193             Thread.currentThread().setContextClassLoader(old);
194         }
195     }
196 
197     public void addSessionFactory(SessionFactory sessionFactory)
198     {
199         if (sessionFactories.contains(sessionFactory))
200         {
201             return;
202         }
203         this.sessionFactories.add(sessionFactory);
204     }
205 
206     @Override
207     public void cleanup()
208     {
209         try
210         {
211             this.stop();
212         }
213         catch (Exception e)
214         {
215             LOG.warn(e);
216         }
217     }
218 
219     protected void shutdownAllConnections()
220     {
221         for (WebSocketSession session : openSessions)
222         {
223             if (session.getConnection() != null)
224             {
225                 try
226                 {
227                     session.getConnection().close(
228                             StatusCode.SHUTDOWN,
229                             "Shutdown");
230                 }
231                 catch (Throwable t)
232                 {
233                     LOG.debug("During Shutdown All Connections",t);
234                 }
235             }
236         }
237         openSessions.clear();
238     }
239 
240     @Override
241     public WebSocketServletFactory createFactory(WebSocketPolicy policy)
242     {
243         return new WebSocketServerFactory(policy, bufferPool);
244     }
245 
246     private WebSocketSession createSession(URI requestURI, EventDriver websocket, LogicalConnection connection)
247     {
248         if (websocket == null)
249         {
250             throw new InvalidWebSocketException("Unable to create Session from null websocket");
251         }
252 
253         for (SessionFactory impl : sessionFactories)
254         {
255             if (impl.supports(websocket))
256             {
257                 try
258                 {
259                     return impl.createSession(requestURI, websocket, connection);
260                 }
261                 catch (Throwable e)
262                 {
263                     throw new InvalidWebSocketException("Unable to create Session", e);
264                 }
265             }
266         }
267 
268         throw new InvalidWebSocketException("Unable to create Session: unrecognized internal EventDriver type: " + websocket.getClass().getName());
269     }
270 
271     /**
272      * Default Creator logic
273      */
274     @Override
275     public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
276     {
277         if (registeredSocketClasses.size() < 1)
278         {
279             throw new WebSocketException("No WebSockets have been registered with the factory.  Cannot use default implementation of WebSocketCreator.");
280         }
281 
282         if (registeredSocketClasses.size() > 1)
283         {
284             LOG.warn("You have registered more than 1 websocket object, and are using the default WebSocketCreator! Using first registered websocket.");
285         }
286 
287         Class<?> firstClass = registeredSocketClasses.get(0);
288         try
289         {
290             return firstClass.newInstance();
291         }
292         catch (InstantiationException | IllegalAccessException e)
293         {
294             throw new WebSocketException("Unable to create instance of " + firstClass, e);
295         }
296     }
297 
298     @Override
299     protected void doStop() throws Exception
300     {
301         shutdownAllConnections();
302         super.doStop();
303     }
304 
305     @Override
306     public WebSocketCreator getCreator()
307     {
308         return this.creator;
309     }
310 
311     public EventDriverFactory getEventDriverFactory()
312     {
313         return eventDriverFactory;
314     }
315 
316     @Override
317     public ExtensionFactory getExtensionFactory()
318     {
319         return extensionFactory;
320     }
321 
322     public Set<WebSocketSession> getOpenSessions()
323     {
324         return Collections.unmodifiableSet(this.openSessions);
325     }
326 
327     @Override
328     public WebSocketPolicy getPolicy()
329     {
330         return defaultPolicy;
331     }
332 
333     @Override
334     public void init() throws Exception
335     {
336         start(); // start lifecycle
337     }
338 
339     @Override
340     public boolean isUpgradeRequest(HttpServletRequest request, HttpServletResponse response)
341     {
342         if (!"GET".equalsIgnoreCase(request.getMethod()))
343         {
344             // not a "GET" request (not a websocket upgrade)
345             return false;
346         }
347 
348         String connection = request.getHeader("connection");
349         if (connection == null)
350         {
351             // no "Connection: upgrade" header present.
352             return false;
353         }
354 
355         // Test for "Upgrade" token
356         boolean foundUpgradeToken = false;
357         Iterator<String> iter = QuoteUtil.splitAt(connection, ",");
358         while (iter.hasNext())
359         {
360             String token = iter.next();
361             if ("upgrade".equalsIgnoreCase(token))
362             {
363                 foundUpgradeToken = true;
364                 break;
365             }
366         }
367 
368         if (!foundUpgradeToken)
369         {
370             return false;
371         }
372 
373         String upgrade = request.getHeader("Upgrade");
374         if (upgrade == null)
375         {
376             // no "Upgrade: websocket" header present.
377             return false;
378         }
379 
380         if (!"websocket".equalsIgnoreCase(upgrade))
381         {
382             LOG.debug("Not a 'Upgrade: WebSocket' (was [Upgrade: " + upgrade + "])");
383             return false;
384         }
385 
386         if (!"HTTP/1.1".equals(request.getProtocol()))
387         {
388             LOG.debug("Not a 'HTTP/1.1' request (was [" + request.getProtocol() + "])");
389             return false;
390         }
391 
392         return true;
393     }
394 
395     @Override
396     public void onSessionClosed(WebSocketSession session)
397     {
398         this.openSessions.remove(session);
399     }
400 
401     @Override
402     public void onSessionOpened(WebSocketSession session)
403     {
404         this.openSessions.add(session);
405     }
406 
407     protected String[] parseProtocols(String protocol)
408     {
409         if (protocol == null)
410         {
411             return new String[]{null};
412         }
413         protocol = protocol.trim();
414         if (protocol.length() == 0)
415         {
416             return new String[]{null};
417         }
418         String[] passed = protocol.split("\\s*,\\s*");
419         String[] protocols = new String[passed.length + 1];
420         System.arraycopy(passed, 0, protocols, 0, passed.length);
421         return protocols;
422     }
423 
424     @Override
425     public void register(Class<?> websocketPojo)
426     {
427         registeredSocketClasses.add(websocketPojo);
428     }
429 
430     @Override
431     public void setCreator(WebSocketCreator creator)
432     {
433         this.creator = creator;
434     }
435 
436     /**
437      * Upgrade the request/response to a WebSocket Connection.
438      * <p/>
439      * This method will not normally return, but will instead throw a UpgradeConnectionException, to exit HTTP handling and initiate WebSocket handling of the
440      * connection.
441      *
442      * @param http     the raw http connection
443      * @param request  The request to upgrade
444      * @param response The response to upgrade
445      * @param driver   The websocket handler implementation to use
446      * @throws IOException
447      */
448     private boolean upgrade(HttpConnection http, ServletUpgradeRequest request, ServletUpgradeResponse response, EventDriver driver) throws IOException
449     {
450         if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
451         {
452             throw new IllegalStateException("Not a 'WebSocket: Upgrade' request");
453         }
454         if (!"HTTP/1.1".equals(request.getHttpVersion()))
455         {
456             throw new IllegalStateException("Not a 'HTTP/1.1' request");
457         }
458 
459         int version = request.getHeaderInt("Sec-WebSocket-Version");
460         if (version < 0)
461         {
462             // Old pre-RFC version specifications (header not present in RFC-6455)
463             version = request.getHeaderInt("Sec-WebSocket-Draft");
464         }
465 
466         WebSocketHandshake handshaker = handshakes.get(version);
467         if (handshaker == null)
468         {
469             StringBuilder warn = new StringBuilder();
470             warn.append("Client ").append(request.getRemoteAddress());
471             warn.append(" (:").append(request.getRemotePort());
472             warn.append(") User Agent: ");
473             String ua = request.getHeader("User-Agent");
474             if (ua == null)
475             {
476                 warn.append("[unset] ");
477             }
478             else
479             {
480                 warn.append('"').append(ua.replaceAll("<", "&lt;")).append("\" ");
481             }
482             warn.append("requested WebSocket version [").append(version);
483             warn.append("], Jetty supports version");
484             if (handshakes.size() > 1)
485             {
486                 warn.append('s');
487             }
488             warn.append(": [").append(supportedVersions).append("]");
489             LOG.warn(warn.toString());
490 
491             // Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol
492             // Using the examples as outlined
493             response.setHeader("Sec-WebSocket-Version", supportedVersions);
494             response.sendError(HttpStatus.BAD_REQUEST_400, "Unsupported websocket version specification");
495             return false;
496         }
497 
498         // Initialize / Negotiate Extensions
499         ExtensionStack extensionStack = new ExtensionStack(getExtensionFactory());
500         // The JSR allows for the extensions to be pre-negotiated, filtered, etc...
501         // Usually from a Configurator.
502         if (response.isExtensionsNegotiated())
503         {
504             // Use pre-negotiated extension list from response
505             extensionStack.negotiate(response.getExtensions());
506         }
507         else
508         {
509             // Use raw extension list from request
510             extensionStack.negotiate(request.getExtensions());
511         }
512 
513         // Get original HTTP connection
514         EndPoint endp = http.getEndPoint();
515         Executor executor = http.getConnector().getExecutor();
516         ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
517         
518         // Setup websocket connection
519         WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, driver.getPolicy(), bufferPool);
520 
521         extensionStack.setPolicy(driver.getPolicy());
522         extensionStack.configure(wsConnection.getParser());
523         extensionStack.configure(wsConnection.getGenerator());
524 
525         if (LOG.isDebugEnabled())
526         {
527             LOG.debug("HttpConnection: {}", http);
528             LOG.debug("WebSocketConnection: {}", wsConnection);
529         }
530 
531         // Setup Session
532         WebSocketSession session = createSession(request.getRequestURI(), driver, wsConnection);
533         session.setPolicy(driver.getPolicy());
534         session.setUpgradeRequest(request);
535         // set true negotiated extension list back to response 
536         response.setExtensions(extensionStack.getNegotiatedExtensions());
537         session.setUpgradeResponse(response);
538         wsConnection.setSession(session);
539 
540         // Setup Incoming Routing
541         wsConnection.setNextIncomingFrames(extensionStack);
542         extensionStack.setNextIncoming(session);
543 
544         // Setup Outgoing Routing
545         session.setOutgoingHandler(extensionStack);
546         extensionStack.setNextOutgoing(wsConnection);
547 
548         // Start Components
549         try
550         {
551             session.start();
552         }
553         catch (Exception e)
554         {
555             throw new IOException("Unable to start Session", e);
556         }
557         try
558         {
559             extensionStack.start();
560         }
561         catch (Exception e)
562         {
563             throw new IOException("Unable to start Extension Stack", e);
564         }
565 
566         // Tell jetty about the new upgraded connection
567         request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE, wsConnection);
568 
569         if (LOG.isDebugEnabled())
570             LOG.debug("Handshake Response: {}", handshaker);
571 
572         // Process (version specific) handshake response
573         handshaker.doHandshakeResponse(request, response);
574 
575         if (LOG.isDebugEnabled())
576             LOG.debug("Websocket upgrade {} {} {} {}", request.getRequestURI(), version, response.getAcceptedSubProtocol(), wsConnection);
577 
578         return true;
579     }
580 }