1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.URI;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Objects;
28 import java.util.concurrent.Executor;
29
30 import org.eclipse.jetty.io.ByteBufferPool;
31 import org.eclipse.jetty.io.Connection;
32 import org.eclipse.jetty.util.annotation.ManagedAttribute;
33 import org.eclipse.jetty.util.annotation.ManagedObject;
34 import org.eclipse.jetty.util.component.ContainerLifeCycle;
35 import org.eclipse.jetty.util.component.Dumpable;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38 import org.eclipse.jetty.util.thread.ThreadClassLoaderScope;
39 import org.eclipse.jetty.websocket.api.BatchMode;
40 import org.eclipse.jetty.websocket.api.CloseException;
41 import org.eclipse.jetty.websocket.api.CloseStatus;
42 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
43 import org.eclipse.jetty.websocket.api.Session;
44 import org.eclipse.jetty.websocket.api.StatusCode;
45 import org.eclipse.jetty.websocket.api.SuspendToken;
46 import org.eclipse.jetty.websocket.api.UpgradeRequest;
47 import org.eclipse.jetty.websocket.api.UpgradeResponse;
48 import org.eclipse.jetty.websocket.api.WebSocketBehavior;
49 import org.eclipse.jetty.websocket.api.WebSocketException;
50 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
51 import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
52 import org.eclipse.jetty.websocket.api.extensions.Frame;
53 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
54 import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
55 import org.eclipse.jetty.websocket.common.events.EventDriver;
56 import org.eclipse.jetty.websocket.common.io.IOState;
57 import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
58 import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
59 import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
60
61 @ManagedObject("A Jetty WebSocket Session")
62 public class WebSocketSession extends ContainerLifeCycle implements Session, WebSocketSessionScope, IncomingFrames, Connection.Listener, ConnectionStateListener
63 {
64 private static final Logger LOG = Log.getLogger(WebSocketSession.class);
65 private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN");
66 private final WebSocketContainerScope containerScope;
67 private final URI requestURI;
68 private final LogicalConnection connection;
69 private final EventDriver websocket;
70 private final Executor executor;
71 private ClassLoader classLoader;
72 private ExtensionFactory extensionFactory;
73 private String protocolVersion;
74 private Map<String, String[]> parameterMap = new HashMap<>();
75 private WebSocketRemoteEndpoint remote;
76 private IncomingFrames incomingHandler;
77 private OutgoingFrames outgoingHandler;
78 private WebSocketPolicy policy;
79 private UpgradeRequest upgradeRequest;
80 private UpgradeResponse upgradeResponse;
81
82 public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, EventDriver websocket, LogicalConnection connection)
83 {
84 Objects.requireNonNull(containerScope,"Container Scope cannot be null");
85 Objects.requireNonNull(requestURI,"Request URI cannot be null");
86
87 this.classLoader = Thread.currentThread().getContextClassLoader();
88 this.containerScope = containerScope;
89 this.requestURI = requestURI;
90 this.websocket = websocket;
91 this.connection = connection;
92 this.executor = connection.getExecutor();
93 this.outgoingHandler = connection;
94 this.incomingHandler = websocket;
95 this.connection.getIOState().addListener(this);
96 this.policy = containerScope.getPolicy();
97
98 addBean(this.connection);
99 addBean(this.websocket);
100 }
101
102 @Override
103 public void close()
104 {
105
106 connection.close(StatusCode.NORMAL, null);
107 }
108
109 @Override
110 public void close(CloseStatus closeStatus)
111 {
112 this.close(closeStatus.getCode(),closeStatus.getPhrase());
113 }
114
115 @Override
116 public void close(int statusCode, String reason)
117 {
118 connection.close(statusCode,reason);
119 }
120
121
122
123
124 @Override
125 public void disconnect()
126 {
127 connection.disconnect();
128
129
130 notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
131 }
132
133 public void dispatch(Runnable runnable)
134 {
135 executor.execute(runnable);
136 }
137
138 @Override
139 protected void doStart() throws Exception
140 {
141 if(LOG.isDebugEnabled())
142 LOG.debug("starting - {}",this);
143
144 super.doStart();
145 }
146
147 @Override
148 protected void doStop() throws Exception
149 {
150 if(LOG.isDebugEnabled())
151 LOG.debug("stopping - {}",this);
152
153 if (getConnection() != null)
154 {
155 try
156 {
157 getConnection().close(StatusCode.SHUTDOWN,"Shutdown");
158 }
159 catch (Throwable t)
160 {
161 LOG.debug("During Connection Shutdown",t);
162 }
163 }
164 super.doStop();
165 }
166
167 @Override
168 public void dump(Appendable out, String indent) throws IOException
169 {
170 dumpThis(out);
171 out.append(indent).append(" +- incomingHandler : ");
172 if (incomingHandler instanceof Dumpable)
173 {
174 ((Dumpable)incomingHandler).dump(out,indent + " ");
175 }
176 else
177 {
178 out.append(incomingHandler.toString()).append(System.lineSeparator());
179 }
180
181 out.append(indent).append(" +- outgoingHandler : ");
182 if (outgoingHandler instanceof Dumpable)
183 {
184 ((Dumpable)outgoingHandler).dump(out,indent + " ");
185 }
186 else
187 {
188 out.append(outgoingHandler.toString()).append(System.lineSeparator());
189 }
190 }
191
192 @Override
193 public boolean equals(Object obj)
194 {
195 if (this == obj)
196 {
197 return true;
198 }
199 if (obj == null)
200 {
201 return false;
202 }
203 if (getClass() != obj.getClass())
204 {
205 return false;
206 }
207 WebSocketSession other = (WebSocketSession)obj;
208 if (connection == null)
209 {
210 if (other.connection != null)
211 {
212 return false;
213 }
214 }
215 else if (!connection.equals(other.connection))
216 {
217 return false;
218 }
219 return true;
220 }
221
222 public ByteBufferPool getBufferPool()
223 {
224 return this.connection.getBufferPool();
225 }
226
227 public ClassLoader getClassLoader()
228 {
229 return this.getClass().getClassLoader();
230 }
231
232 public LogicalConnection getConnection()
233 {
234 return connection;
235 }
236
237 @Override
238 public WebSocketContainerScope getContainerScope()
239 {
240 return this.containerScope;
241 }
242
243 public ExtensionFactory getExtensionFactory()
244 {
245 return extensionFactory;
246 }
247
248
249
250
251 @Override
252 public long getIdleTimeout()
253 {
254 return connection.getMaxIdleTimeout();
255 }
256
257 @ManagedAttribute(readonly = true)
258 public IncomingFrames getIncomingHandler()
259 {
260 return incomingHandler;
261 }
262
263 @Override
264 public InetSocketAddress getLocalAddress()
265 {
266 return connection.getLocalAddress();
267 }
268
269 @ManagedAttribute(readonly = true)
270 public OutgoingFrames getOutgoingHandler()
271 {
272 return outgoingHandler;
273 }
274
275 @Override
276 public WebSocketPolicy getPolicy()
277 {
278 return policy;
279 }
280
281 @Override
282 public String getProtocolVersion()
283 {
284 return protocolVersion;
285 }
286
287 @Override
288 public RemoteEndpoint getRemote()
289 {
290 if(LOG_OPEN.isDebugEnabled())
291 LOG_OPEN.debug("[{}] {}.getRemote()",policy.getBehavior(),this.getClass().getSimpleName());
292 ConnectionState state = connection.getIOState().getConnectionState();
293
294 if ((state == ConnectionState.OPEN) || (state == ConnectionState.CONNECTED))
295 {
296 return remote;
297 }
298
299 throw new WebSocketException("RemoteEndpoint unavailable, current state [" + state + "], expecting [OPEN or CONNECTED]");
300 }
301
302 @Override
303 public InetSocketAddress getRemoteAddress()
304 {
305 return remote.getInetSocketAddress();
306 }
307
308 public URI getRequestURI()
309 {
310 return requestURI;
311 }
312
313 @Override
314 public UpgradeRequest getUpgradeRequest()
315 {
316 return this.upgradeRequest;
317 }
318
319 @Override
320 public UpgradeResponse getUpgradeResponse()
321 {
322 return this.upgradeResponse;
323 }
324
325
326 @Override
327 public WebSocketSession getWebSocketSession()
328 {
329 return this;
330 }
331
332 @Override
333 public int hashCode()
334 {
335 final int prime = 31;
336 int result = 1;
337 result = (prime * result) + ((connection == null)?0:connection.hashCode());
338 return result;
339 }
340
341
342
343
344 @Override
345 public void incomingError(Throwable t)
346 {
347 if (connection.getIOState().isInputAvailable())
348 {
349
350 websocket.incomingError(t);
351 }
352 }
353
354
355
356
357 @Override
358 public void incomingFrame(Frame frame)
359 {
360 ClassLoader old = Thread.currentThread().getContextClassLoader();
361 try
362 {
363 Thread.currentThread().setContextClassLoader(classLoader);
364 if (connection.getIOState().isInputAvailable())
365 {
366
367 incomingHandler.incomingFrame(frame);
368 }
369 }
370 finally
371 {
372 Thread.currentThread().setContextClassLoader(old);
373 }
374 }
375
376 @Override
377 public boolean isOpen()
378 {
379 if (this.connection == null)
380 {
381 return false;
382 }
383 return this.connection.isOpen();
384 }
385
386 @Override
387 public boolean isSecure()
388 {
389 if (upgradeRequest == null)
390 {
391 throw new IllegalStateException("No valid UpgradeRequest yet");
392 }
393
394 URI requestURI = upgradeRequest.getRequestURI();
395
396 return "wss".equalsIgnoreCase(requestURI.getScheme());
397 }
398
399 public void notifyClose(int statusCode, String reason)
400 {
401 if (LOG.isDebugEnabled())
402 {
403 LOG.debug("notifyClose({},{})",statusCode,reason);
404 }
405 websocket.onClose(new CloseInfo(statusCode,reason));
406 }
407
408 public void notifyError(Throwable cause)
409 {
410 incomingError(cause);
411 }
412
413 @Override
414 public void onClosed(Connection connection)
415 {
416 }
417
418 @Override
419 public void onOpened(Connection connection)
420 {
421 if(LOG_OPEN.isDebugEnabled())
422 LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
423 open();
424 }
425
426 @SuppressWarnings("incomplete-switch")
427 @Override
428 public void onConnectionStateChange(ConnectionState state)
429 {
430 switch (state)
431 {
432 case CLOSED:
433 IOState ioState = this.connection.getIOState();
434 CloseInfo close = ioState.getCloseInfo();
435
436 notifyClose(close.getStatusCode(),close.getReason());
437 try
438 {
439 if (LOG.isDebugEnabled())
440 LOG.debug("{}.onSessionClosed()",containerScope.getClass().getSimpleName());
441 containerScope.onSessionClosed(this);
442 }
443 catch (Throwable t)
444 {
445 LOG.ignore(t);
446 }
447 break;
448 case CONNECTED:
449
450 try
451 {
452 if (LOG.isDebugEnabled())
453 LOG.debug("{}.onSessionOpened()",containerScope.getClass().getSimpleName());
454 containerScope.onSessionOpened(this);
455 }
456 catch (Throwable t)
457 {
458 LOG.ignore(t);
459 }
460 break;
461 }
462 }
463
464
465
466
467 public void open()
468 {
469 if(LOG_OPEN.isDebugEnabled())
470 LOG_OPEN.debug("[{}] {}.open()",policy.getBehavior(),this.getClass().getSimpleName());
471
472 if (remote != null)
473 {
474
475 return;
476 }
477
478 try(ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader))
479 {
480
481 connection.getIOState().onConnected();
482
483
484 remote = new WebSocketRemoteEndpoint(connection,outgoingHandler,getBatchMode());
485 if(LOG_OPEN.isDebugEnabled())
486 LOG_OPEN.debug("[{}] {}.open() remote={}",policy.getBehavior(),this.getClass().getSimpleName(),remote);
487
488
489 websocket.openSession(this);
490
491
492 connection.getIOState().onOpened();
493
494 if (LOG.isDebugEnabled())
495 {
496 LOG.debug("open -> {}",dump());
497 }
498 }
499 catch (CloseException ce)
500 {
501 LOG.warn(ce);
502 close(ce.getStatusCode(),ce.getMessage());
503 }
504 catch (Throwable t)
505 {
506 LOG.warn(t);
507
508
509 int statusCode = StatusCode.SERVER_ERROR;
510 if(policy.getBehavior() == WebSocketBehavior.CLIENT)
511 {
512 statusCode = StatusCode.POLICY_VIOLATION;
513 }
514 close(statusCode,t.getMessage());
515 }
516 }
517
518 public void setExtensionFactory(ExtensionFactory extensionFactory)
519 {
520 this.extensionFactory = extensionFactory;
521 }
522
523
524
525
526 @Override
527 public void setIdleTimeout(long ms)
528 {
529 connection.setMaxIdleTimeout(ms);
530 }
531
532 public void setOutgoingHandler(OutgoingFrames outgoing)
533 {
534 this.outgoingHandler = outgoing;
535 }
536
537 public void setPolicy(WebSocketPolicy policy)
538 {
539 this.policy = policy;
540 }
541
542 public void setUpgradeRequest(UpgradeRequest request)
543 {
544 this.upgradeRequest = request;
545 this.protocolVersion = request.getProtocolVersion();
546 this.parameterMap.clear();
547 if (request.getParameterMap() != null)
548 {
549 for (Map.Entry<String, List<String>> entry : request.getParameterMap().entrySet())
550 {
551 List<String> values = entry.getValue();
552 if (values != null)
553 {
554 this.parameterMap.put(entry.getKey(),values.toArray(new String[values.size()]));
555 }
556 else
557 {
558 this.parameterMap.put(entry.getKey(),new String[0]);
559 }
560 }
561 }
562 }
563
564 public void setUpgradeResponse(UpgradeResponse response)
565 {
566 this.upgradeResponse = response;
567 }
568
569 @Override
570 public SuspendToken suspend()
571 {
572 return connection.suspend();
573 }
574
575
576
577
578 public BatchMode getBatchMode()
579 {
580 return BatchMode.AUTO;
581 }
582
583 @Override
584 public String toString()
585 {
586 StringBuilder builder = new StringBuilder();
587 builder.append("WebSocketSession[");
588 builder.append("websocket=").append(websocket);
589 builder.append(",behavior=").append(policy.getBehavior());
590 builder.append(",connection=").append(connection);
591 builder.append(",remote=").append(remote);
592 builder.append(",incoming=").append(incomingHandler);
593 builder.append(",outgoing=").append(outgoingHandler);
594 builder.append("]");
595 return builder.toString();
596 }
597
598 }