View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.server;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Queue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executor;
30  
31  import javax.servlet.http.HttpServletRequest;
32  import javax.servlet.http.HttpServletResponse;
33  
34  import org.eclipse.jetty.http.HttpStatus;
35  import org.eclipse.jetty.io.ByteBufferPool;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.io.MappedByteBufferPool;
38  import org.eclipse.jetty.server.HttpConnection;
39  import org.eclipse.jetty.util.component.ContainerLifeCycle;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  import org.eclipse.jetty.util.thread.Scheduler;
43  import org.eclipse.jetty.util.thread.TimerScheduler;
44  import org.eclipse.jetty.websocket.api.UpgradeRequest;
45  import org.eclipse.jetty.websocket.api.UpgradeResponse;
46  import org.eclipse.jetty.websocket.api.WebSocketException;
47  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
49  import org.eclipse.jetty.websocket.common.LogicalConnection;
50  import org.eclipse.jetty.websocket.common.WebSocketSession;
51  import org.eclipse.jetty.websocket.common.events.EventDriver;
52  import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
53  import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
54  import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
55  import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
56  import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
57  
58  /**
59   * Factory to create WebSocket connections
60   */
61  public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketServletFactory
62  {
63      private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
64  
65      private static final ThreadLocal<UpgradeContext> ACTIVE_CONTEXT = new ThreadLocal<>();
66  
67      public static UpgradeContext getActiveUpgradeContext()
68      {
69          return ACTIVE_CONTEXT.get();
70      }
71  
72      protected static void setActiveUpgradeContext(UpgradeContext connection)
73      {
74          ACTIVE_CONTEXT.set(connection);
75      }
76  
77      private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
78      {
79          handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
80      }
81  
82      private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
83      /**
84       * Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler.
85       */
86      private final Scheduler scheduler = new TimerScheduler();
87      private final String supportedVersions;
88      private final WebSocketPolicy basePolicy;
89      private final EventDriverFactory eventDriverFactory;
90      private final WebSocketExtensionFactory extensionFactory;
91      private WebSocketCreator creator;
92      private List<Class<?>> registeredSocketClasses;
93  
94      public WebSocketServerFactory()
95      {
96          this(WebSocketPolicy.newServerPolicy(),new MappedByteBufferPool());
97      }
98  
99      public WebSocketServerFactory(WebSocketPolicy policy)
100     {
101         this(policy,new MappedByteBufferPool());
102     }
103 
104     public WebSocketServerFactory(WebSocketPolicy policy, ByteBufferPool bufferPool)
105     {
106         addBean(scheduler);
107         addBean(bufferPool);
108 
109         this.registeredSocketClasses = new ArrayList<>();
110 
111         this.basePolicy = policy;
112         this.eventDriverFactory = new EventDriverFactory(basePolicy);
113         this.extensionFactory = new WebSocketExtensionFactory(basePolicy,bufferPool);
114         this.creator = this;
115 
116         // Create supportedVersions
117         List<Integer> versions = new ArrayList<>();
118         for (int v : handshakes.keySet())
119         {
120             versions.add(v);
121         }
122         Collections.sort(versions,Collections.reverseOrder()); // newest first
123         StringBuilder rv = new StringBuilder();
124         for (int v : versions)
125         {
126             if (rv.length() > 0)
127             {
128                 rv.append(", ");
129             }
130             rv.append(v);
131         }
132         supportedVersions = rv.toString();
133     }
134 
135     @Override
136     public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) throws IOException
137     {
138         ServletWebSocketRequest sockreq = new ServletWebSocketRequest(request);
139         ServletWebSocketResponse sockresp = new ServletWebSocketResponse(response);
140 
141         WebSocketCreator creator = getCreator();
142 
143         UpgradeContext context = getActiveUpgradeContext();
144         if (context == null)
145         {
146             context = new UpgradeContext();
147             setActiveUpgradeContext(context);
148         }
149         context.setRequest(sockreq);
150         context.setResponse(sockresp);
151 
152         Object websocketPojo = creator.createWebSocket(sockreq,sockresp);
153 
154         // Handle response forbidden (and similar paths)
155         if (sockresp.isCommitted())
156         {
157             return false;
158         }
159 
160         if (websocketPojo == null)
161         {
162             // no creation, sorry
163             response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
164             return false;
165         }
166 
167         // Send the upgrade
168         EventDriver driver = eventDriverFactory.wrap(websocketPojo);
169         return upgrade(sockreq,sockresp,driver);
170     }
171 
172     @Override
173     public void cleanup()
174     {
175         try
176         {
177             this.stop();
178         }
179         catch (Exception e)
180         {
181             LOG.warn(e);
182         }
183     }
184 
185     protected void closeAllConnections()
186     {
187         for (WebSocketSession session : sessions)
188         {
189             try
190             {
191                 session.close();
192             }
193             catch (IOException e)
194             {
195                 LOG.warn("CloseAllConnections Close failure",e);
196             }
197         }
198         sessions.clear();
199     }
200 
201     @Override
202     public WebSocketServletFactory createFactory(WebSocketPolicy policy)
203     {
204         return new WebSocketServerFactory(policy);
205     }
206 
207     /**
208      * Default Creator logic
209      */
210     @Override
211     public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp)
212     {
213         if (registeredSocketClasses.size() < 1)
214         {
215             throw new WebSocketException("No WebSockets have been registered with the factory.  Cannot use default implementation of WebSocketCreator.");
216         }
217 
218         if (registeredSocketClasses.size() > 1)
219         {
220             LOG.warn("You have registered more than 1 websocket object, and are using the default WebSocketCreator! Using first registered websocket.");
221         }
222 
223         Class<?> firstClass = registeredSocketClasses.get(0);
224         try
225         {
226             return firstClass.newInstance();
227         }
228         catch (InstantiationException | IllegalAccessException e)
229         {
230             throw new WebSocketException("Unable to create instance of " + firstClass,e);
231         }
232     }
233 
234     @Override
235     protected void doStop() throws Exception
236     {
237         closeAllConnections();
238         super.doStop();
239     }
240 
241     @Override
242     public WebSocketCreator getCreator()
243     {
244         return this.creator;
245     }
246 
247     @Override
248     public ExtensionFactory getExtensionFactory()
249     {
250         return extensionFactory;
251     }
252 
253     @Override
254     public WebSocketPolicy getPolicy()
255     {
256         return basePolicy;
257     }
258 
259     @Override
260     public void init() throws Exception
261     {
262         start();
263     }
264 
265     @Override
266     public boolean isUpgradeRequest(HttpServletRequest request, HttpServletResponse response)
267     {
268         String upgrade = request.getHeader("Upgrade");
269         if (upgrade == null)
270         {
271             // Quietly fail
272             return false;
273         }
274 
275         if (!"websocket".equalsIgnoreCase(upgrade))
276         {
277             LOG.warn("Not a 'Upgrade: WebSocket' (was [Upgrade: " + upgrade + "])");
278             return false;
279         }
280 
281         if (!"HTTP/1.1".equals(request.getProtocol()))
282         {
283             LOG.warn("Not a 'HTTP/1.1' request (was [" + request.getProtocol() + "])");
284             return false;
285         }
286 
287         return true;
288     }
289 
290     protected String[] parseProtocols(String protocol)
291     {
292         if (protocol == null)
293         {
294             return new String[]
295             { null };
296         }
297         protocol = protocol.trim();
298         if ((protocol == null) || (protocol.length() == 0))
299         {
300             return new String[]
301             { null };
302         }
303         String[] passed = protocol.split("\\s*,\\s*");
304         String[] protocols = new String[passed.length + 1];
305         System.arraycopy(passed,0,protocols,0,passed.length);
306         return protocols;
307     }
308 
309     /*
310      * (non-Javadoc)
311      * 
312      * @see org.eclipse.jetty.websocket.server.WebSocketServletFactory#register(java.lang.Class)
313      */
314     @Override
315     public void register(Class<?> websocketPojo)
316     {
317         registeredSocketClasses.add(websocketPojo);
318     }
319 
320     public boolean sessionClosed(WebSocketSession session)
321     {
322         return isRunning() && sessions.remove(session);
323     }
324 
325     public boolean sessionOpened(WebSocketSession session)
326     {
327         if (LOG.isDebugEnabled())
328         {
329             LOG.debug("Session Opened: {}",session);
330         }
331         if (!isRunning())
332         {
333             LOG.warn("Factory is not running");
334             return false;
335         }
336         boolean ret = sessions.offer(session);
337         session.open();
338         return ret;
339     }
340 
341     @Override
342     public void setCreator(WebSocketCreator creator)
343     {
344         this.creator = creator;
345     }
346 
347     /**
348      * Upgrade the request/response to a WebSocket Connection.
349      * <p>
350      * This method will not normally return, but will instead throw a UpgradeConnectionException, to exit HTTP handling and initiate WebSocket handling of the
351      * connection.
352      * 
353      * @param request
354      *            The request to upgrade
355      * @param response
356      *            The response to upgrade
357      * @param driver
358      *            The websocket handler implementation to use
359      * @throws IOException
360      */
361     public boolean upgrade(ServletWebSocketRequest request, ServletWebSocketResponse response, EventDriver driver) throws IOException
362     {
363         if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
364         {
365             throw new IllegalStateException("Not a 'WebSocket: Upgrade' request");
366         }
367         if (!"HTTP/1.1".equals(request.getHttpVersion()))
368         {
369             throw new IllegalStateException("Not a 'HTTP/1.1' request");
370         }
371 
372         int version = request.getHeaderInt("Sec-WebSocket-Version");
373         if (version < 0)
374         {
375             // Old pre-RFC version specifications (header not present in RFC-6455)
376             version = request.getHeaderInt("Sec-WebSocket-Draft");
377         }
378 
379         WebSocketHandshake handshaker = handshakes.get(version);
380         if (handshaker == null)
381         {
382             LOG.warn("Unsupported Websocket version: " + version);
383             // Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol
384             // Using the examples as outlined
385             response.setHeader("Sec-WebSocket-Version",supportedVersions);
386             response.sendError(HttpStatus.BAD_REQUEST_400,"Unsupported websocket version specification");
387             return false;
388         }
389 
390         // Initialize / Negotiate Extensions
391         ExtensionStack extensionStack = new ExtensionStack(getExtensionFactory());
392         extensionStack.negotiate(request.getExtensions());
393 
394         // Create connection
395         UpgradeContext context = getActiveUpgradeContext();
396         LogicalConnection connection = context.getConnection();
397 
398         if (connection == null)
399         {
400             HttpConnection http = HttpConnection.getCurrentConnection();
401             EndPoint endp = http.getEndPoint();
402             Executor executor = http.getConnector().getExecutor();
403             ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
404             WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
405             connection = wsConnection;
406 
407             extensionStack.configure(wsConnection.getParser());
408             extensionStack.configure(wsConnection.getGenerator());
409 
410             LOG.debug("HttpConnection: {}",http);
411             LOG.debug("AsyncWebSocketConnection: {}",connection);
412         }
413 
414         // Setup Session
415         WebSocketSession session = new WebSocketSession(request.getRequestURI(),driver,connection);
416         session.setPolicy(getPolicy().clonePolicy());
417         session.setUpgradeRequest(request);
418         response.setExtensions(extensionStack.getNegotiatedExtensions());
419         session.setUpgradeResponse(response);
420         connection.setSession(session);
421 
422         // Setup Incoming Routing
423         connection.setNextIncomingFrames(extensionStack);
424         extensionStack.setNextIncoming(session);
425 
426         // Setup Outgoing Routing
427         session.setOutgoingHandler(extensionStack);
428         extensionStack.setNextOutgoing(connection);
429 
430         // Start Components
431         try
432         {
433             session.start();
434         }
435         catch (Exception e)
436         {
437             throw new IOException("Unable to start Session",e);
438         }
439         try
440         {
441             extensionStack.start();
442         }
443         catch (Exception e)
444         {
445             throw new IOException("Unable to start Extension Stack",e);
446         }
447 
448         // Tell jetty about the new connection
449         request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);
450 
451         // Process (version specific) handshake response
452         LOG.debug("Handshake Response: {}",handshaker);
453         handshaker.doHandshakeResponse(request,response);
454 
455         LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection);
456         return true;
457     }
458 }