1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.server;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33
34 import javax.servlet.http.HttpServletRequest;
35 import javax.servlet.http.HttpServletResponse;
36
37 import org.eclipse.jetty.http.HttpStatus;
38 import org.eclipse.jetty.io.ByteBufferPool;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.MappedByteBufferPool;
41 import org.eclipse.jetty.server.HttpConnection;
42 import org.eclipse.jetty.util.component.ContainerLifeCycle;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
46 import org.eclipse.jetty.util.thread.Scheduler;
47 import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
48 import org.eclipse.jetty.websocket.api.WebSocketException;
49 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
50 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
51 import org.eclipse.jetty.websocket.api.util.QuoteUtil;
52 import org.eclipse.jetty.websocket.common.LogicalConnection;
53 import org.eclipse.jetty.websocket.common.SessionFactory;
54 import org.eclipse.jetty.websocket.common.WebSocketSession;
55 import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
56 import org.eclipse.jetty.websocket.common.events.EventDriver;
57 import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
58 import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
59 import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
60 import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
61 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
62 import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
63 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
64
65
66
67
68 public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketServletFactory
69 {
70 private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
71 private static final ThreadLocal<UpgradeContext> ACTIVE_CONTEXT = new ThreadLocal<>();
72
73 public static UpgradeContext getActiveUpgradeContext()
74 {
75 return ACTIVE_CONTEXT.get();
76 }
77
78 protected static void setActiveUpgradeContext(UpgradeContext connection)
79 {
80 ACTIVE_CONTEXT.set(connection);
81 }
82
83 private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
84 {
85 handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
86 }
87
88
89
90
91 private final Scheduler scheduler = new ScheduledExecutorScheduler();
92 private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
93 private final String supportedVersions;
94 private final WebSocketPolicy defaultPolicy;
95 private final EventDriverFactory eventDriverFactory;
96 private final WebSocketExtensionFactory extensionFactory;
97 private List<SessionFactory> sessionFactories;
98 private WebSocketCreator creator;
99 private List<Class<?>> registeredSocketClasses;
100
101 public WebSocketServerFactory()
102 {
103 this(WebSocketPolicy.newServerPolicy(),new MappedByteBufferPool());
104 }
105
106 public WebSocketServerFactory(WebSocketPolicy policy)
107 {
108 this(policy,new MappedByteBufferPool());
109 }
110
111 public WebSocketServerFactory(WebSocketPolicy policy, ByteBufferPool bufferPool)
112 {
113 addBean(scheduler);
114 addBean(bufferPool);
115
116 this.registeredSocketClasses = new ArrayList<>();
117
118 this.defaultPolicy = policy;
119 this.eventDriverFactory = new EventDriverFactory(defaultPolicy);
120 this.extensionFactory = new WebSocketExtensionFactory(defaultPolicy,bufferPool);
121 this.sessionFactories = new ArrayList<>();
122 this.sessionFactories.add(new WebSocketSessionFactory());
123 this.creator = this;
124
125
126 List<Integer> versions = new ArrayList<>();
127 for (int v : handshakes.keySet())
128 {
129 versions.add(v);
130 }
131 Collections.sort(versions,Collections.reverseOrder());
132 StringBuilder rv = new StringBuilder();
133 for (int v : versions)
134 {
135 if (rv.length() > 0)
136 {
137 rv.append(", ");
138 }
139 rv.append(v);
140 }
141 supportedVersions = rv.toString();
142 }
143
144 @Override
145 public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) throws IOException
146 {
147 return acceptWebSocket(getCreator(),request,response);
148 }
149
150 @Override
151 public boolean acceptWebSocket(WebSocketCreator creator, HttpServletRequest request, HttpServletResponse response) throws IOException
152 {
153 try
154 {
155 ServletUpgradeRequest sockreq = new ServletUpgradeRequest(request);
156 ServletUpgradeResponse sockresp = new ServletUpgradeResponse(response);
157
158 UpgradeContext context = getActiveUpgradeContext();
159 if (context == null)
160 {
161 context = new UpgradeContext();
162 setActiveUpgradeContext(context);
163 }
164
165 context.setRequest(sockreq);
166 context.setResponse(sockresp);
167
168 Object websocketPojo = creator.createWebSocket(sockreq,sockresp);
169
170
171 if (sockresp.isCommitted())
172 {
173 return false;
174 }
175
176 if (websocketPojo == null)
177 {
178
179 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
180 return false;
181 }
182
183
184 EventDriver driver = eventDriverFactory.wrap(websocketPojo);
185 return upgrade(sockreq,sockresp,driver);
186 }
187 catch (URISyntaxException e)
188 {
189 throw new IOException("Unable to accept websocket due to mangled URI",e);
190 }
191 }
192
193 public void addSessionFactory(SessionFactory sessionFactory)
194 {
195 if (sessionFactories.contains(sessionFactory))
196 {
197 return;
198 }
199 this.sessionFactories.add(sessionFactory);
200 }
201
202 @Override
203 public void cleanup()
204 {
205 try
206 {
207 this.stop();
208 }
209 catch (Exception e)
210 {
211 LOG.warn(e);
212 }
213 }
214
215 protected void closeAllConnections()
216 {
217 for (WebSocketSession session : sessions)
218 {
219 session.close();
220 }
221 sessions.clear();
222 }
223
224 @Override
225 public WebSocketServletFactory createFactory(WebSocketPolicy policy)
226 {
227 return new WebSocketServerFactory(policy);
228 }
229
230 private WebSocketSession createSession(URI requestURI, EventDriver websocket, LogicalConnection connection)
231 {
232 if (websocket == null)
233 {
234 throw new InvalidWebSocketException("Unable to create Session from null websocket");
235 }
236
237 for (SessionFactory impl : sessionFactories)
238 {
239 if (impl.supports(websocket))
240 {
241 try
242 {
243 return impl.createSession(requestURI,websocket,connection);
244 }
245 catch (Throwable e)
246 {
247 throw new InvalidWebSocketException("Unable to create Session",e);
248 }
249 }
250 }
251
252 throw new InvalidWebSocketException("Unable to create Session: unrecognized internal EventDriver type: " + websocket.getClass().getName());
253 }
254
255
256
257
258 @Override
259 public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
260 {
261 if (registeredSocketClasses.size() < 1)
262 {
263 throw new WebSocketException("No WebSockets have been registered with the factory. Cannot use default implementation of WebSocketCreator.");
264 }
265
266 if (registeredSocketClasses.size() > 1)
267 {
268 LOG.warn("You have registered more than 1 websocket object, and are using the default WebSocketCreator! Using first registered websocket.");
269 }
270
271 Class<?> firstClass = registeredSocketClasses.get(0);
272 try
273 {
274 return firstClass.newInstance();
275 }
276 catch (InstantiationException | IllegalAccessException e)
277 {
278 throw new WebSocketException("Unable to create instance of " + firstClass,e);
279 }
280 }
281
282 @Override
283 protected void doStop() throws Exception
284 {
285 closeAllConnections();
286 super.doStop();
287 }
288
289 @Override
290 public WebSocketCreator getCreator()
291 {
292 return this.creator;
293 }
294
295 public EventDriverFactory getEventDriverFactory()
296 {
297 return eventDriverFactory;
298 }
299
300 @Override
301 public ExtensionFactory getExtensionFactory()
302 {
303 return extensionFactory;
304 }
305
306 @Override
307 public WebSocketPolicy getPolicy()
308 {
309 return defaultPolicy;
310 }
311
312 @Override
313 public void init() throws Exception
314 {
315 start();
316 }
317
318 @Override
319 public boolean isUpgradeRequest(HttpServletRequest request, HttpServletResponse response)
320 {
321 if (!"GET".equalsIgnoreCase(request.getMethod()))
322 {
323
324 return false;
325 }
326
327 String connection = request.getHeader("connection");
328 if (connection == null)
329 {
330
331 return false;
332 }
333
334
335 boolean foundUpgradeToken = false;
336 Iterator<String> iter = QuoteUtil.splitAt(connection,",");
337 while (iter.hasNext())
338 {
339 String token = iter.next();
340 if ("upgrade".equalsIgnoreCase(token))
341 {
342 foundUpgradeToken = true;
343 break;
344 }
345 }
346
347 if (!foundUpgradeToken)
348 {
349 return false;
350 }
351
352 String upgrade = request.getHeader("Upgrade");
353 if (upgrade == null)
354 {
355
356 return false;
357 }
358
359 if (!"websocket".equalsIgnoreCase(upgrade))
360 {
361 LOG.debug("Not a 'Upgrade: WebSocket' (was [Upgrade: " + upgrade + "])");
362 return false;
363 }
364
365 if (!"HTTP/1.1".equals(request.getProtocol()))
366 {
367 LOG.debug("Not a 'HTTP/1.1' request (was [" + request.getProtocol() + "])");
368 return false;
369 }
370
371 return true;
372 }
373
374 protected String[] parseProtocols(String protocol)
375 {
376 if (protocol == null)
377 {
378 return new String[]
379 { null };
380 }
381 protocol = protocol.trim();
382 if ((protocol == null) || (protocol.length() == 0))
383 {
384 return new String[]
385 { null };
386 }
387 String[] passed = protocol.split("\\s*,\\s*");
388 String[] protocols = new String[passed.length + 1];
389 System.arraycopy(passed,0,protocols,0,passed.length);
390 return protocols;
391 }
392
393 @Override
394 public void register(Class<?> websocketPojo)
395 {
396 registeredSocketClasses.add(websocketPojo);
397 }
398
399 public boolean sessionClosed(WebSocketSession session)
400 {
401 return isRunning() && sessions.remove(session);
402 }
403
404 public boolean sessionOpened(WebSocketSession session)
405 {
406 if (LOG.isDebugEnabled())
407 {
408 LOG.debug("Session Opened: {}",session);
409 }
410 if (!isRunning())
411 {
412 LOG.warn("Factory is not running");
413 return false;
414 }
415 boolean ret = sessions.offer(session);
416 session.open();
417 return ret;
418 }
419
420 @Override
421 public void setCreator(WebSocketCreator creator)
422 {
423 this.creator = creator;
424 }
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440 public boolean upgrade(ServletUpgradeRequest request, ServletUpgradeResponse response, EventDriver driver) throws IOException
441 {
442 if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
443 {
444 throw new IllegalStateException("Not a 'WebSocket: Upgrade' request");
445 }
446 if (!"HTTP/1.1".equals(request.getHttpVersion()))
447 {
448 throw new IllegalStateException("Not a 'HTTP/1.1' request");
449 }
450
451 int version = request.getHeaderInt("Sec-WebSocket-Version");
452 if (version < 0)
453 {
454
455 version = request.getHeaderInt("Sec-WebSocket-Draft");
456 }
457
458 WebSocketHandshake handshaker = handshakes.get(version);
459 if (handshaker == null)
460 {
461 LOG.warn("Unsupported Websocket version: " + version);
462
463
464 response.setHeader("Sec-WebSocket-Version",supportedVersions);
465 response.sendError(HttpStatus.BAD_REQUEST_400,"Unsupported websocket version specification");
466 return false;
467 }
468
469
470 ExtensionStack extensionStack = new ExtensionStack(getExtensionFactory());
471
472
473 if (response.isExtensionsNegotiated())
474 {
475
476 extensionStack.negotiate(response.getExtensions());
477 }
478 else
479 {
480
481 extensionStack.negotiate(request.getExtensions());
482 }
483
484
485 UpgradeContext context = getActiveUpgradeContext();
486 LogicalConnection connection = context.getConnection();
487
488 if (connection == null)
489 {
490 HttpConnection http = HttpConnection.getCurrentConnection();
491 EndPoint endp = http.getEndPoint();
492 Executor executor = http.getConnector().getExecutor();
493 ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
494 WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
495 connection = wsConnection;
496
497 extensionStack.configure(wsConnection.getParser());
498 extensionStack.configure(wsConnection.getGenerator());
499
500 LOG.debug("HttpConnection: {}",http);
501 LOG.debug("AsyncWebSocketConnection: {}",connection);
502 }
503
504
505 WebSocketSession session = createSession(request.getRequestURI(),driver,connection);
506 session.setPolicy(driver.getPolicy());
507 session.setUpgradeRequest(request);
508
509 response.setExtensions(extensionStack.getNegotiatedExtensions());
510 session.setUpgradeResponse(response);
511 connection.setSession(session);
512
513
514 connection.setNextIncomingFrames(extensionStack);
515 extensionStack.setNextIncoming(session);
516
517
518 session.setOutgoingHandler(extensionStack);
519 extensionStack.setNextOutgoing(connection);
520
521
522 try
523 {
524 session.start();
525 }
526 catch (Exception e)
527 {
528 throw new IOException("Unable to start Session",e);
529 }
530 try
531 {
532 extensionStack.start();
533 }
534 catch (Exception e)
535 {
536 throw new IOException("Unable to start Extension Stack",e);
537 }
538
539
540 request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);
541
542
543 LOG.debug("Handshake Response: {}",handshaker);
544 handshaker.doHandshakeResponse(request,response);
545
546 LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection);
547 return true;
548 }
549 }