View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.server;
20  
21  import java.io.IOException;
22  import java.net.URISyntaxException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.Executor;
31  
32  import javax.servlet.http.HttpServletRequest;
33  import javax.servlet.http.HttpServletResponse;
34  
35  import org.eclipse.jetty.http.HttpStatus;
36  import org.eclipse.jetty.io.ByteBufferPool;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.io.MappedByteBufferPool;
39  import org.eclipse.jetty.server.HttpConnection;
40  import org.eclipse.jetty.util.component.ContainerLifeCycle;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44  import org.eclipse.jetty.util.thread.Scheduler;
45  import org.eclipse.jetty.websocket.api.UpgradeRequest;
46  import org.eclipse.jetty.websocket.api.UpgradeResponse;
47  import org.eclipse.jetty.websocket.api.WebSocketException;
48  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
49  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
50  import org.eclipse.jetty.websocket.common.LogicalConnection;
51  import org.eclipse.jetty.websocket.common.WebSocketSession;
52  import org.eclipse.jetty.websocket.common.events.EventDriver;
53  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
54  import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
55  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
56  import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
57  import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
58  import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
59  import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
60  
61  /**
62   * Factory to create WebSocket connections
63   */
64  public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketServletFactory
65  {
66      private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
67  
68      private static final ThreadLocal<UpgradeContext> ACTIVE_CONTEXT = new ThreadLocal<>();
69  
70      public static UpgradeContext getActiveUpgradeContext()
71      {
72          return ACTIVE_CONTEXT.get();
73      }
74  
75      protected static void setActiveUpgradeContext(UpgradeContext connection)
76      {
77          ACTIVE_CONTEXT.set(connection);
78      }
79  
80      private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
81      {
82          handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
83      }
84  
85      private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
86      /**
87       * Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler.
88       */
89      private final Scheduler scheduler = new ScheduledExecutorScheduler();
90      private final String supportedVersions;
91      private final WebSocketPolicy basePolicy;
92      private final EventDriverFactory eventDriverFactory;
93      private final WebSocketExtensionFactory extensionFactory;
94      private WebSocketCreator creator;
95      private List<Class<?>> registeredSocketClasses;
96  
97      public WebSocketServerFactory()
98      {
99          this(WebSocketPolicy.newServerPolicy(),new MappedByteBufferPool());
100     }
101 
102     public WebSocketServerFactory(WebSocketPolicy policy)
103     {
104         this(policy,new MappedByteBufferPool());
105     }
106 
107     public WebSocketServerFactory(WebSocketPolicy policy, ByteBufferPool bufferPool)
108     {
109         addBean(scheduler);
110         addBean(bufferPool);
111 
112         this.registeredSocketClasses = new ArrayList<>();
113 
114         this.basePolicy = policy;
115         this.eventDriverFactory = new EventDriverFactory(basePolicy);
116         this.extensionFactory = new WebSocketExtensionFactory(basePolicy,bufferPool);
117         this.creator = this;
118 
119         // Create supportedVersions
120         List<Integer> versions = new ArrayList<>();
121         for (int v : handshakes.keySet())
122         {
123             versions.add(v);
124         }
125         Collections.sort(versions,Collections.reverseOrder()); // newest first
126         StringBuilder rv = new StringBuilder();
127         for (int v : versions)
128         {
129             if (rv.length() > 0)
130             {
131                 rv.append(", ");
132             }
133             rv.append(v);
134         }
135         supportedVersions = rv.toString();
136     }
137 
138     @Override
139     public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) throws IOException
140     {
141         try
142         {
143             // TODO: use ServletUpgradeRequest in Jetty 9.1
144             @SuppressWarnings("deprecation")
145             ServletWebSocketRequest sockreq = new ServletWebSocketRequest(request);
146             // TODO: use ServletUpgradeResponse in Jetty 9.1
147             @SuppressWarnings("deprecation")
148             ServletWebSocketResponse sockresp = new ServletWebSocketResponse(response);
149 
150             WebSocketCreator creator = getCreator();
151 
152             UpgradeContext context = getActiveUpgradeContext();
153             if (context == null)
154             {
155                 context = new UpgradeContext();
156                 setActiveUpgradeContext(context);
157             }
158             context.setRequest(sockreq);
159             context.setResponse(sockresp);
160 
161             Object websocketPojo = creator.createWebSocket(sockreq,sockresp);
162 
163             // Handle response forbidden (and similar paths)
164             if (sockresp.isCommitted())
165             {
166                 return false;
167             }
168 
169             if (websocketPojo == null)
170             {
171                 // no creation, sorry
172                 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
173                 return false;
174             }
175 
176             // Send the upgrade
177             EventDriver driver = eventDriverFactory.wrap(websocketPojo);
178             return upgrade(sockreq,sockresp,driver);
179         }
180         catch (URISyntaxException e)
181         {
182             throw new IOException("Unable to accept websocket due to mangled URI",e);
183         }
184     }
185 
186     @Override
187     public void cleanup()
188     {
189         try
190         {
191             this.stop();
192         }
193         catch (Exception e)
194         {
195             LOG.warn(e);
196         }
197     }
198 
199     protected void closeAllConnections()
200     {
201         for (WebSocketSession session : sessions)
202         {
203             try
204             {
205                 session.close();
206             }
207             catch (IOException e)
208             {
209                 LOG.warn("CloseAllConnections Close failure",e);
210             }
211         }
212         sessions.clear();
213     }
214 
215     @Override
216     public WebSocketServletFactory createFactory(WebSocketPolicy policy)
217     {
218         return new WebSocketServerFactory(policy);
219     }
220 
221     /**
222      * Default Creator logic
223      */
224     @Override
225     public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp)
226     {
227         if (registeredSocketClasses.size() < 1)
228         {
229             throw new WebSocketException("No WebSockets have been registered with the factory.  Cannot use default implementation of WebSocketCreator.");
230         }
231 
232         if (registeredSocketClasses.size() > 1)
233         {
234             LOG.warn("You have registered more than 1 websocket object, and are using the default WebSocketCreator! Using first registered websocket.");
235         }
236 
237         Class<?> firstClass = registeredSocketClasses.get(0);
238         try
239         {
240             return firstClass.newInstance();
241         }
242         catch (InstantiationException | IllegalAccessException e)
243         {
244             throw new WebSocketException("Unable to create instance of " + firstClass,e);
245         }
246     }
247 
248     @Override
249     protected void doStop() throws Exception
250     {
251         closeAllConnections();
252         super.doStop();
253     }
254 
255     @Override
256     public WebSocketCreator getCreator()
257     {
258         return this.creator;
259     }
260 
261     @Override
262     public ExtensionFactory getExtensionFactory()
263     {
264         return extensionFactory;
265     }
266 
267     @Override
268     public WebSocketPolicy getPolicy()
269     {
270         return basePolicy;
271     }
272 
273     @Override
274     public void init() throws Exception
275     {
276         start();
277     }
278 
279     @Override
280     public boolean isUpgradeRequest(HttpServletRequest request, HttpServletResponse response)
281     {
282         String upgrade = request.getHeader("Upgrade");
283         if (upgrade == null)
284         {
285             // Quietly fail
286             return false;
287         }
288 
289         if (!"websocket".equalsIgnoreCase(upgrade))
290         {
291             LOG.warn("Not a 'Upgrade: WebSocket' (was [Upgrade: " + upgrade + "])");
292             return false;
293         }
294 
295         if (!"HTTP/1.1".equals(request.getProtocol()))
296         {
297             LOG.warn("Not a 'HTTP/1.1' request (was [" + request.getProtocol() + "])");
298             return false;
299         }
300 
301         return true;
302     }
303 
304     protected String[] parseProtocols(String protocol)
305     {
306         if (protocol == null)
307         {
308             return new String[]
309             { null };
310         }
311         protocol = protocol.trim();
312         if ((protocol == null) || (protocol.length() == 0))
313         {
314             return new String[]
315             { null };
316         }
317         String[] passed = protocol.split("\\s*,\\s*");
318         String[] protocols = new String[passed.length + 1];
319         System.arraycopy(passed,0,protocols,0,passed.length);
320         return protocols;
321     }
322 
323     /*
324      * (non-Javadoc)
325      * 
326      * @see org.eclipse.jetty.websocket.server.WebSocketServletFactory#register(java.lang.Class)
327      */
328     @Override
329     public void register(Class<?> websocketPojo)
330     {
331         registeredSocketClasses.add(websocketPojo);
332     }
333 
334     public boolean sessionClosed(WebSocketSession session)
335     {
336         return isRunning() && sessions.remove(session);
337     }
338 
339     public boolean sessionOpened(WebSocketSession session)
340     {
341         if (LOG.isDebugEnabled())
342         {
343             LOG.debug("Session Opened: {}",session);
344         }
345         if (!isRunning())
346         {
347             LOG.warn("Factory is not running");
348             return false;
349         }
350         boolean ret = sessions.offer(session);
351         session.open();
352         return ret;
353     }
354 
355     @Override
356     public void setCreator(WebSocketCreator creator)
357     {
358         this.creator = creator;
359     }
360 
361     /**
362      * Upgrade the request/response to a WebSocket Connection.
363      * <p>
364      * This method will not normally return, but will instead throw a UpgradeConnectionException, to exit HTTP handling and initiate WebSocket handling of the
365      * connection.
366      * 
367      * @param request
368      *            The request to upgrade
369      * @param response
370      *            The response to upgrade
371      * @param driver
372      *            The websocket handler implementation to use
373      * @throws IOException
374      */
375     public boolean upgrade(ServletUpgradeRequest request, ServletUpgradeResponse response, EventDriver driver) throws IOException
376     {
377         if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
378         {
379             throw new IllegalStateException("Not a 'WebSocket: Upgrade' request");
380         }
381         if (!"HTTP/1.1".equals(request.getHttpVersion()))
382         {
383             throw new IllegalStateException("Not a 'HTTP/1.1' request");
384         }
385 
386         int version = request.getHeaderInt("Sec-WebSocket-Version");
387         if (version < 0)
388         {
389             // Old pre-RFC version specifications (header not present in RFC-6455)
390             version = request.getHeaderInt("Sec-WebSocket-Draft");
391         }
392 
393         WebSocketHandshake handshaker = handshakes.get(version);
394         if (handshaker == null)
395         {
396             LOG.warn("Unsupported Websocket version: " + version);
397             // Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol
398             // Using the examples as outlined
399             response.setHeader("Sec-WebSocket-Version",supportedVersions);
400             response.sendError(HttpStatus.BAD_REQUEST_400,"Unsupported websocket version specification");
401             return false;
402         }
403 
404         // Initialize / Negotiate Extensions
405         ExtensionStack extensionStack = new ExtensionStack(getExtensionFactory());
406         extensionStack.negotiate(request.getExtensions());
407 
408         // Create connection
409         UpgradeContext context = getActiveUpgradeContext();
410         LogicalConnection connection = context.getConnection();
411 
412         if (connection == null)
413         {
414             HttpConnection http = HttpConnection.getCurrentConnection();
415             EndPoint endp = http.getEndPoint();
416             Executor executor = http.getConnector().getExecutor();
417             ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
418             WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
419             connection = wsConnection;
420 
421             extensionStack.configure(wsConnection.getParser());
422             extensionStack.configure(wsConnection.getGenerator());
423 
424             LOG.debug("HttpConnection: {}",http);
425             LOG.debug("AsyncWebSocketConnection: {}",connection);
426         }
427 
428         // Setup Session
429         WebSocketSession session = new WebSocketSession(request.getRequestURI(),driver,connection);
430         session.setPolicy(getPolicy().clonePolicy());
431         session.setUpgradeRequest(request);
432         response.setExtensions(extensionStack.getNegotiatedExtensions());
433         session.setUpgradeResponse(response);
434         connection.setSession(session);
435 
436         // Setup Incoming Routing
437         connection.setNextIncomingFrames(extensionStack);
438         extensionStack.setNextIncoming(session);
439 
440         // Setup Outgoing Routing
441         session.setOutgoingHandler(extensionStack);
442         extensionStack.setNextOutgoing(connection);
443 
444         // Start Components
445         try
446         {
447             session.start();
448         }
449         catch (Exception e)
450         {
451             throw new IOException("Unable to start Session",e);
452         }
453         try
454         {
455             extensionStack.start();
456         }
457         catch (Exception e)
458         {
459             throw new IOException("Unable to start Extension Stack",e);
460         }
461 
462         // Tell jetty about the new connection
463         request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);
464 
465         // Process (version specific) handshake response
466         LOG.debug("Handshake Response: {}",handshaker);
467         handshaker.doHandshakeResponse(request,response);
468 
469         LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection);
470         return true;
471     }
472 }