1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.server;
20
21 import java.io.IOException;
22 import java.net.URISyntaxException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.Executor;
31
32 import javax.servlet.http.HttpServletRequest;
33 import javax.servlet.http.HttpServletResponse;
34
35 import org.eclipse.jetty.http.HttpStatus;
36 import org.eclipse.jetty.io.ByteBufferPool;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.io.MappedByteBufferPool;
39 import org.eclipse.jetty.server.HttpConnection;
40 import org.eclipse.jetty.util.component.ContainerLifeCycle;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43 import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
44 import org.eclipse.jetty.util.thread.Scheduler;
45 import org.eclipse.jetty.websocket.api.UpgradeRequest;
46 import org.eclipse.jetty.websocket.api.UpgradeResponse;
47 import org.eclipse.jetty.websocket.api.WebSocketException;
48 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
49 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
50 import org.eclipse.jetty.websocket.common.LogicalConnection;
51 import org.eclipse.jetty.websocket.common.WebSocketSession;
52 import org.eclipse.jetty.websocket.common.events.EventDriver;
53 import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
54 import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
55 import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
56 import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
57 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
58 import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
59 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
60
61
62
63
64 public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketServletFactory
65 {
66 private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
67
68 private static final ThreadLocal<UpgradeContext> ACTIVE_CONTEXT = new ThreadLocal<>();
69
70 public static UpgradeContext getActiveUpgradeContext()
71 {
72 return ACTIVE_CONTEXT.get();
73 }
74
75 protected static void setActiveUpgradeContext(UpgradeContext connection)
76 {
77 ACTIVE_CONTEXT.set(connection);
78 }
79
80 private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
81 {
82 handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
83 }
84
85 private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
86
87
88
89 private final Scheduler scheduler = new ScheduledExecutorScheduler();
90 private final String supportedVersions;
91 private final WebSocketPolicy basePolicy;
92 private final EventDriverFactory eventDriverFactory;
93 private final WebSocketExtensionFactory extensionFactory;
94 private WebSocketCreator creator;
95 private List<Class<?>> registeredSocketClasses;
96
97 public WebSocketServerFactory()
98 {
99 this(WebSocketPolicy.newServerPolicy(),new MappedByteBufferPool());
100 }
101
102 public WebSocketServerFactory(WebSocketPolicy policy)
103 {
104 this(policy,new MappedByteBufferPool());
105 }
106
107 public WebSocketServerFactory(WebSocketPolicy policy, ByteBufferPool bufferPool)
108 {
109 addBean(scheduler);
110 addBean(bufferPool);
111
112 this.registeredSocketClasses = new ArrayList<>();
113
114 this.basePolicy = policy;
115 this.eventDriverFactory = new EventDriverFactory(basePolicy);
116 this.extensionFactory = new WebSocketExtensionFactory(basePolicy,bufferPool);
117 this.creator = this;
118
119
120 List<Integer> versions = new ArrayList<>();
121 for (int v : handshakes.keySet())
122 {
123 versions.add(v);
124 }
125 Collections.sort(versions,Collections.reverseOrder());
126 StringBuilder rv = new StringBuilder();
127 for (int v : versions)
128 {
129 if (rv.length() > 0)
130 {
131 rv.append(", ");
132 }
133 rv.append(v);
134 }
135 supportedVersions = rv.toString();
136 }
137
138 @Override
139 public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) throws IOException
140 {
141 try
142 {
143
144 @SuppressWarnings("deprecation")
145 ServletWebSocketRequest sockreq = new ServletWebSocketRequest(request);
146
147 @SuppressWarnings("deprecation")
148 ServletWebSocketResponse sockresp = new ServletWebSocketResponse(response);
149
150 WebSocketCreator creator = getCreator();
151
152 UpgradeContext context = getActiveUpgradeContext();
153 if (context == null)
154 {
155 context = new UpgradeContext();
156 setActiveUpgradeContext(context);
157 }
158 context.setRequest(sockreq);
159 context.setResponse(sockresp);
160
161 Object websocketPojo = creator.createWebSocket(sockreq,sockresp);
162
163
164 if (sockresp.isCommitted())
165 {
166 return false;
167 }
168
169 if (websocketPojo == null)
170 {
171
172 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
173 return false;
174 }
175
176
177 EventDriver driver = eventDriverFactory.wrap(websocketPojo);
178 return upgrade(sockreq,sockresp,driver);
179 }
180 catch (URISyntaxException e)
181 {
182 throw new IOException("Unable to accept websocket due to mangled URI",e);
183 }
184 }
185
186 @Override
187 public void cleanup()
188 {
189 try
190 {
191 this.stop();
192 }
193 catch (Exception e)
194 {
195 LOG.warn(e);
196 }
197 }
198
199 protected void closeAllConnections()
200 {
201 for (WebSocketSession session : sessions)
202 {
203 try
204 {
205 session.close();
206 }
207 catch (IOException e)
208 {
209 LOG.warn("CloseAllConnections Close failure",e);
210 }
211 }
212 sessions.clear();
213 }
214
215 @Override
216 public WebSocketServletFactory createFactory(WebSocketPolicy policy)
217 {
218 return new WebSocketServerFactory(policy);
219 }
220
221
222
223
224 @Override
225 public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp)
226 {
227 if (registeredSocketClasses.size() < 1)
228 {
229 throw new WebSocketException("No WebSockets have been registered with the factory. Cannot use default implementation of WebSocketCreator.");
230 }
231
232 if (registeredSocketClasses.size() > 1)
233 {
234 LOG.warn("You have registered more than 1 websocket object, and are using the default WebSocketCreator! Using first registered websocket.");
235 }
236
237 Class<?> firstClass = registeredSocketClasses.get(0);
238 try
239 {
240 return firstClass.newInstance();
241 }
242 catch (InstantiationException | IllegalAccessException e)
243 {
244 throw new WebSocketException("Unable to create instance of " + firstClass,e);
245 }
246 }
247
248 @Override
249 protected void doStop() throws Exception
250 {
251 closeAllConnections();
252 super.doStop();
253 }
254
255 @Override
256 public WebSocketCreator getCreator()
257 {
258 return this.creator;
259 }
260
261 @Override
262 public ExtensionFactory getExtensionFactory()
263 {
264 return extensionFactory;
265 }
266
267 @Override
268 public WebSocketPolicy getPolicy()
269 {
270 return basePolicy;
271 }
272
273 @Override
274 public void init() throws Exception
275 {
276 start();
277 }
278
279 @Override
280 public boolean isUpgradeRequest(HttpServletRequest request, HttpServletResponse response)
281 {
282 String upgrade = request.getHeader("Upgrade");
283 if (upgrade == null)
284 {
285
286 return false;
287 }
288
289 if (!"websocket".equalsIgnoreCase(upgrade))
290 {
291 LOG.warn("Not a 'Upgrade: WebSocket' (was [Upgrade: " + upgrade + "])");
292 return false;
293 }
294
295 if (!"HTTP/1.1".equals(request.getProtocol()))
296 {
297 LOG.warn("Not a 'HTTP/1.1' request (was [" + request.getProtocol() + "])");
298 return false;
299 }
300
301 return true;
302 }
303
304 protected String[] parseProtocols(String protocol)
305 {
306 if (protocol == null)
307 {
308 return new String[]
309 { null };
310 }
311 protocol = protocol.trim();
312 if ((protocol == null) || (protocol.length() == 0))
313 {
314 return new String[]
315 { null };
316 }
317 String[] passed = protocol.split("\\s*,\\s*");
318 String[] protocols = new String[passed.length + 1];
319 System.arraycopy(passed,0,protocols,0,passed.length);
320 return protocols;
321 }
322
323
324
325
326
327
328 @Override
329 public void register(Class<?> websocketPojo)
330 {
331 registeredSocketClasses.add(websocketPojo);
332 }
333
334 public boolean sessionClosed(WebSocketSession session)
335 {
336 return isRunning() && sessions.remove(session);
337 }
338
339 public boolean sessionOpened(WebSocketSession session)
340 {
341 if (LOG.isDebugEnabled())
342 {
343 LOG.debug("Session Opened: {}",session);
344 }
345 if (!isRunning())
346 {
347 LOG.warn("Factory is not running");
348 return false;
349 }
350 boolean ret = sessions.offer(session);
351 session.open();
352 return ret;
353 }
354
355 @Override
356 public void setCreator(WebSocketCreator creator)
357 {
358 this.creator = creator;
359 }
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375 public boolean upgrade(ServletUpgradeRequest request, ServletUpgradeResponse response, EventDriver driver) throws IOException
376 {
377 if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
378 {
379 throw new IllegalStateException("Not a 'WebSocket: Upgrade' request");
380 }
381 if (!"HTTP/1.1".equals(request.getHttpVersion()))
382 {
383 throw new IllegalStateException("Not a 'HTTP/1.1' request");
384 }
385
386 int version = request.getHeaderInt("Sec-WebSocket-Version");
387 if (version < 0)
388 {
389
390 version = request.getHeaderInt("Sec-WebSocket-Draft");
391 }
392
393 WebSocketHandshake handshaker = handshakes.get(version);
394 if (handshaker == null)
395 {
396 LOG.warn("Unsupported Websocket version: " + version);
397
398
399 response.setHeader("Sec-WebSocket-Version",supportedVersions);
400 response.sendError(HttpStatus.BAD_REQUEST_400,"Unsupported websocket version specification");
401 return false;
402 }
403
404
405 ExtensionStack extensionStack = new ExtensionStack(getExtensionFactory());
406 extensionStack.negotiate(request.getExtensions());
407
408
409 UpgradeContext context = getActiveUpgradeContext();
410 LogicalConnection connection = context.getConnection();
411
412 if (connection == null)
413 {
414 HttpConnection http = HttpConnection.getCurrentConnection();
415 EndPoint endp = http.getEndPoint();
416 Executor executor = http.getConnector().getExecutor();
417 ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
418 WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
419 connection = wsConnection;
420
421 extensionStack.configure(wsConnection.getParser());
422 extensionStack.configure(wsConnection.getGenerator());
423
424 LOG.debug("HttpConnection: {}",http);
425 LOG.debug("AsyncWebSocketConnection: {}",connection);
426 }
427
428
429 WebSocketSession session = new WebSocketSession(request.getRequestURI(),driver,connection);
430 session.setPolicy(getPolicy().clonePolicy());
431 session.setUpgradeRequest(request);
432 response.setExtensions(extensionStack.getNegotiatedExtensions());
433 session.setUpgradeResponse(response);
434 connection.setSession(session);
435
436
437 connection.setNextIncomingFrames(extensionStack);
438 extensionStack.setNextIncoming(session);
439
440
441 session.setOutgoingHandler(extensionStack);
442 extensionStack.setNextOutgoing(connection);
443
444
445 try
446 {
447 session.start();
448 }
449 catch (Exception e)
450 {
451 throw new IOException("Unable to start Session",e);
452 }
453 try
454 {
455 extensionStack.start();
456 }
457 catch (Exception e)
458 {
459 throw new IOException("Unable to start Extension Stack",e);
460 }
461
462
463 request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);
464
465
466 LOG.debug("Handshake Response: {}",handshaker);
467 handshaker.doHandshakeResponse(request,response);
468
469 LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection);
470 return true;
471 }
472 }