1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.server;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Queue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executor;
30
31 import javax.servlet.http.HttpServletRequest;
32 import javax.servlet.http.HttpServletResponse;
33
34 import org.eclipse.jetty.http.HttpStatus;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.io.MappedByteBufferPool;
38 import org.eclipse.jetty.server.HttpConnection;
39 import org.eclipse.jetty.util.component.ContainerLifeCycle;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Scheduler;
43 import org.eclipse.jetty.util.thread.TimerScheduler;
44 import org.eclipse.jetty.websocket.api.UpgradeRequest;
45 import org.eclipse.jetty.websocket.api.UpgradeResponse;
46 import org.eclipse.jetty.websocket.api.WebSocketException;
47 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
49 import org.eclipse.jetty.websocket.common.LogicalConnection;
50 import org.eclipse.jetty.websocket.common.WebSocketSession;
51 import org.eclipse.jetty.websocket.common.events.EventDriver;
52 import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
53 import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
54 import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
55 import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
56 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
57
58
59
60
61 public class WebSocketServerFactory extends ContainerLifeCycle implements WebSocketCreator, WebSocketServletFactory
62 {
63 private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
64
65 private static final ThreadLocal<UpgradeContext> ACTIVE_CONTEXT = new ThreadLocal<>();
66
67 public static UpgradeContext getActiveUpgradeContext()
68 {
69 return ACTIVE_CONTEXT.get();
70 }
71
72 protected static void setActiveUpgradeContext(UpgradeContext connection)
73 {
74 ACTIVE_CONTEXT.set(connection);
75 }
76
77 private final Map<Integer, WebSocketHandshake> handshakes = new HashMap<>();
78 {
79 handshakes.put(HandshakeRFC6455.VERSION,new HandshakeRFC6455());
80 }
81
82 private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
83
84
85
86 private final Scheduler scheduler = new TimerScheduler();
87 private final String supportedVersions;
88 private final WebSocketPolicy basePolicy;
89 private final EventDriverFactory eventDriverFactory;
90 private final WebSocketExtensionFactory extensionFactory;
91 private WebSocketCreator creator;
92 private List<Class<?>> registeredSocketClasses;
93
94 public WebSocketServerFactory()
95 {
96 this(WebSocketPolicy.newServerPolicy(),new MappedByteBufferPool());
97 }
98
99 public WebSocketServerFactory(WebSocketPolicy policy)
100 {
101 this(policy,new MappedByteBufferPool());
102 }
103
104 public WebSocketServerFactory(WebSocketPolicy policy, ByteBufferPool bufferPool)
105 {
106 addBean(scheduler);
107 addBean(bufferPool);
108
109 this.registeredSocketClasses = new ArrayList<>();
110
111 this.basePolicy = policy;
112 this.eventDriverFactory = new EventDriverFactory(basePolicy);
113 this.extensionFactory = new WebSocketExtensionFactory(basePolicy,bufferPool);
114 this.creator = this;
115
116
117 List<Integer> versions = new ArrayList<>();
118 for (int v : handshakes.keySet())
119 {
120 versions.add(v);
121 }
122 Collections.sort(versions,Collections.reverseOrder());
123 StringBuilder rv = new StringBuilder();
124 for (int v : versions)
125 {
126 if (rv.length() > 0)
127 {
128 rv.append(", ");
129 }
130 rv.append(v);
131 }
132 supportedVersions = rv.toString();
133 }
134
135 @Override
136 public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response) throws IOException
137 {
138 ServletWebSocketRequest sockreq = new ServletWebSocketRequest(request);
139 ServletWebSocketResponse sockresp = new ServletWebSocketResponse(response);
140
141 WebSocketCreator creator = getCreator();
142
143 UpgradeContext context = getActiveUpgradeContext();
144 if (context == null)
145 {
146 context = new UpgradeContext();
147 setActiveUpgradeContext(context);
148 }
149 context.setRequest(sockreq);
150 context.setResponse(sockresp);
151
152 Object websocketPojo = creator.createWebSocket(sockreq,sockresp);
153
154
155 if (sockresp.isCommitted())
156 {
157 return false;
158 }
159
160 if (websocketPojo == null)
161 {
162
163 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
164 return false;
165 }
166
167
168 EventDriver driver = eventDriverFactory.wrap(websocketPojo);
169 return upgrade(sockreq,sockresp,driver);
170 }
171
172 @Override
173 public void cleanup()
174 {
175 try
176 {
177 this.stop();
178 }
179 catch (Exception e)
180 {
181 LOG.warn(e);
182 }
183 }
184
185 protected void closeAllConnections()
186 {
187 for (WebSocketSession session : sessions)
188 {
189 try
190 {
191 session.close();
192 }
193 catch (IOException e)
194 {
195 LOG.warn("CloseAllConnections Close failure",e);
196 }
197 }
198 sessions.clear();
199 }
200
201 @Override
202 public WebSocketServletFactory createFactory(WebSocketPolicy policy)
203 {
204 return new WebSocketServerFactory(policy);
205 }
206
207
208
209
210 @Override
211 public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp)
212 {
213 if (registeredSocketClasses.size() < 1)
214 {
215 throw new WebSocketException("No WebSockets have been registered with the factory. Cannot use default implementation of WebSocketCreator.");
216 }
217
218 if (registeredSocketClasses.size() > 1)
219 {
220 LOG.warn("You have registered more than 1 websocket object, and are using the default WebSocketCreator! Using first registered websocket.");
221 }
222
223 Class<?> firstClass = registeredSocketClasses.get(0);
224 try
225 {
226 return firstClass.newInstance();
227 }
228 catch (InstantiationException | IllegalAccessException e)
229 {
230 throw new WebSocketException("Unable to create instance of " + firstClass,e);
231 }
232 }
233
234 @Override
235 protected void doStop() throws Exception
236 {
237 closeAllConnections();
238 super.doStop();
239 }
240
241 @Override
242 public WebSocketCreator getCreator()
243 {
244 return this.creator;
245 }
246
247 @Override
248 public ExtensionFactory getExtensionFactory()
249 {
250 return extensionFactory;
251 }
252
253 @Override
254 public WebSocketPolicy getPolicy()
255 {
256 return basePolicy;
257 }
258
259 @Override
260 public void init() throws Exception
261 {
262 start();
263 }
264
265 @Override
266 public boolean isUpgradeRequest(HttpServletRequest request, HttpServletResponse response)
267 {
268 String upgrade = request.getHeader("Upgrade");
269 if (upgrade == null)
270 {
271
272 return false;
273 }
274
275 if (!"websocket".equalsIgnoreCase(upgrade))
276 {
277 LOG.warn("Not a 'Upgrade: WebSocket' (was [Upgrade: " + upgrade + "])");
278 return false;
279 }
280
281 if (!"HTTP/1.1".equals(request.getProtocol()))
282 {
283 LOG.warn("Not a 'HTTP/1.1' request (was [" + request.getProtocol() + "])");
284 return false;
285 }
286
287 return true;
288 }
289
290 protected String[] parseProtocols(String protocol)
291 {
292 if (protocol == null)
293 {
294 return new String[]
295 { null };
296 }
297 protocol = protocol.trim();
298 if ((protocol == null) || (protocol.length() == 0))
299 {
300 return new String[]
301 { null };
302 }
303 String[] passed = protocol.split("\\s*,\\s*");
304 String[] protocols = new String[passed.length + 1];
305 System.arraycopy(passed,0,protocols,0,passed.length);
306 return protocols;
307 }
308
309
310
311
312
313
314 @Override
315 public void register(Class<?> websocketPojo)
316 {
317 registeredSocketClasses.add(websocketPojo);
318 }
319
320 public boolean sessionClosed(WebSocketSession session)
321 {
322 return isRunning() && sessions.remove(session);
323 }
324
325 public boolean sessionOpened(WebSocketSession session)
326 {
327 if (LOG.isDebugEnabled())
328 {
329 LOG.debug("Session Opened: {}",session);
330 }
331 if (!isRunning())
332 {
333 LOG.warn("Factory is not running");
334 return false;
335 }
336 boolean ret = sessions.offer(session);
337 session.open();
338 return ret;
339 }
340
341 @Override
342 public void setCreator(WebSocketCreator creator)
343 {
344 this.creator = creator;
345 }
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361 public boolean upgrade(ServletWebSocketRequest request, ServletWebSocketResponse response, EventDriver driver) throws IOException
362 {
363 if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
364 {
365 throw new IllegalStateException("Not a 'WebSocket: Upgrade' request");
366 }
367 if (!"HTTP/1.1".equals(request.getHttpVersion()))
368 {
369 throw new IllegalStateException("Not a 'HTTP/1.1' request");
370 }
371
372 int version = request.getHeaderInt("Sec-WebSocket-Version");
373 if (version < 0)
374 {
375
376 version = request.getHeaderInt("Sec-WebSocket-Draft");
377 }
378
379 WebSocketHandshake handshaker = handshakes.get(version);
380 if (handshaker == null)
381 {
382 LOG.warn("Unsupported Websocket version: " + version);
383
384
385 response.setHeader("Sec-WebSocket-Version",supportedVersions);
386 response.sendError(HttpStatus.BAD_REQUEST_400,"Unsupported websocket version specification");
387 return false;
388 }
389
390
391 ExtensionStack extensionStack = new ExtensionStack(getExtensionFactory());
392 extensionStack.negotiate(request.getExtensions());
393
394
395 UpgradeContext context = getActiveUpgradeContext();
396 LogicalConnection connection = context.getConnection();
397
398 if (connection == null)
399 {
400 HttpConnection http = HttpConnection.getCurrentConnection();
401 EndPoint endp = http.getEndPoint();
402 Executor executor = http.getConnector().getExecutor();
403 ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
404 WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp,executor,scheduler,driver.getPolicy(),bufferPool,this);
405 connection = wsConnection;
406
407 extensionStack.configure(wsConnection.getParser());
408 extensionStack.configure(wsConnection.getGenerator());
409
410 LOG.debug("HttpConnection: {}",http);
411 LOG.debug("AsyncWebSocketConnection: {}",connection);
412 }
413
414
415 WebSocketSession session = new WebSocketSession(request.getRequestURI(),driver,connection);
416 session.setPolicy(getPolicy().clonePolicy());
417 session.setUpgradeRequest(request);
418 response.setExtensions(extensionStack.getNegotiatedExtensions());
419 session.setUpgradeResponse(response);
420 connection.setSession(session);
421
422
423 connection.setNextIncomingFrames(extensionStack);
424 extensionStack.setNextIncoming(session);
425
426
427 session.setOutgoingHandler(extensionStack);
428 extensionStack.setNextOutgoing(connection);
429
430
431 try
432 {
433 session.start();
434 }
435 catch (Exception e)
436 {
437 throw new IOException("Unable to start Session",e);
438 }
439 try
440 {
441 extensionStack.start();
442 }
443 catch (Exception e)
444 {
445 throw new IOException("Unable to start Extension Stack",e);
446 }
447
448
449 request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE,connection);
450
451
452 LOG.debug("Handshake Response: {}",handshaker);
453 handshaker.doHandshakeResponse(request,response);
454
455 LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection);
456 return true;
457 }
458 }