View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.URI;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Objects;
28  import java.util.concurrent.Executor;
29  
30  import org.eclipse.jetty.io.ByteBufferPool;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.util.annotation.ManagedAttribute;
33  import org.eclipse.jetty.util.annotation.ManagedObject;
34  import org.eclipse.jetty.util.component.ContainerLifeCycle;
35  import org.eclipse.jetty.util.component.Dumpable;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  import org.eclipse.jetty.util.thread.ThreadClassLoaderScope;
39  import org.eclipse.jetty.websocket.api.BatchMode;
40  import org.eclipse.jetty.websocket.api.CloseException;
41  import org.eclipse.jetty.websocket.api.CloseStatus;
42  import org.eclipse.jetty.websocket.api.RemoteEndpoint;
43  import org.eclipse.jetty.websocket.api.Session;
44  import org.eclipse.jetty.websocket.api.StatusCode;
45  import org.eclipse.jetty.websocket.api.SuspendToken;
46  import org.eclipse.jetty.websocket.api.UpgradeRequest;
47  import org.eclipse.jetty.websocket.api.UpgradeResponse;
48  import org.eclipse.jetty.websocket.api.WebSocketBehavior;
49  import org.eclipse.jetty.websocket.api.WebSocketException;
50  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
51  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
52  import org.eclipse.jetty.websocket.api.extensions.Frame;
53  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
54  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
55  import org.eclipse.jetty.websocket.common.events.EventDriver;
56  import org.eclipse.jetty.websocket.common.io.IOState;
57  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
58  import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
59  import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
60  
61  @ManagedObject("A Jetty WebSocket Session")
62  public class WebSocketSession extends ContainerLifeCycle implements Session, WebSocketSessionScope, IncomingFrames, Connection.Listener, ConnectionStateListener
63  {
64      private static final Logger LOG = Log.getLogger(WebSocketSession.class);
65      private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN");
66      private final WebSocketContainerScope containerScope;
67      private final URI requestURI;
68      private final LogicalConnection connection;
69      private final EventDriver websocket;
70      private final Executor executor;
71      private ClassLoader classLoader;
72      private ExtensionFactory extensionFactory;
73      private String protocolVersion;
74      private Map<String, String[]> parameterMap = new HashMap<>();
75      private WebSocketRemoteEndpoint remote;
76      private IncomingFrames incomingHandler;
77      private OutgoingFrames outgoingHandler;
78      private WebSocketPolicy policy;
79      private UpgradeRequest upgradeRequest;
80      private UpgradeResponse upgradeResponse;
81  
82      public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, EventDriver websocket, LogicalConnection connection)
83      {
84          Objects.requireNonNull(containerScope,"Container Scope cannot be null");
85          Objects.requireNonNull(requestURI,"Request URI cannot be null");
86  
87          this.classLoader = Thread.currentThread().getContextClassLoader();
88          this.containerScope = containerScope;
89          this.requestURI = requestURI;
90          this.websocket = websocket;
91          this.connection = connection;
92          this.executor = connection.getExecutor();
93          this.outgoingHandler = connection;
94          this.incomingHandler = websocket;
95          this.connection.getIOState().addListener(this);
96          this.policy = containerScope.getPolicy();
97          
98          addBean(this.connection);
99          addBean(this.websocket);
100     }
101 
102     @Override
103     public void close()
104     {
105         /* This is assumed to always be a NORMAL closure, no reason phrase */
106         connection.close(StatusCode.NORMAL, null);
107     }
108 
109     @Override
110     public void close(CloseStatus closeStatus)
111     {
112         this.close(closeStatus.getCode(),closeStatus.getPhrase());
113     }
114 
115     @Override
116     public void close(int statusCode, String reason)
117     {
118         connection.close(statusCode,reason);
119     }
120 
121     /**
122      * Harsh disconnect
123      */
124     @Override
125     public void disconnect()
126     {
127         connection.disconnect();
128 
129         // notify of harsh disconnect
130         notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
131     }
132 
133     public void dispatch(Runnable runnable)
134     {
135         executor.execute(runnable);
136     }
137     
138     @Override
139     protected void doStart() throws Exception
140     {
141         if(LOG.isDebugEnabled())
142             LOG.debug("starting - {}",this);
143 
144         super.doStart();
145     }
146     
147     @Override
148     protected void doStop() throws Exception
149     {
150         if(LOG.isDebugEnabled())
151             LOG.debug("stopping - {}",this);
152         
153         if (getConnection() != null)
154         {
155             try
156             {
157                 getConnection().close(StatusCode.SHUTDOWN,"Shutdown");
158             }
159             catch (Throwable t)
160             {
161                 LOG.debug("During Connection Shutdown",t);
162             }
163         }
164         super.doStop();
165     }
166     
167     @Override
168     public void dump(Appendable out, String indent) throws IOException
169     {
170         dumpThis(out);
171         out.append(indent).append(" +- incomingHandler : ");
172         if (incomingHandler instanceof Dumpable)
173         {
174             ((Dumpable)incomingHandler).dump(out,indent + "    ");
175         }
176         else
177         {
178             out.append(incomingHandler.toString()).append(System.lineSeparator());
179         }
180 
181         out.append(indent).append(" +- outgoingHandler : ");
182         if (outgoingHandler instanceof Dumpable)
183         {
184             ((Dumpable)outgoingHandler).dump(out,indent + "    ");
185         }
186         else
187         {
188             out.append(outgoingHandler.toString()).append(System.lineSeparator());
189         }
190     }
191 
192     @Override
193     public boolean equals(Object obj)
194     {
195         if (this == obj)
196         {
197             return true;
198         }
199         if (obj == null)
200         {
201             return false;
202         }
203         if (getClass() != obj.getClass())
204         {
205             return false;
206         }
207         WebSocketSession other = (WebSocketSession)obj;
208         if (connection == null)
209         {
210             if (other.connection != null)
211             {
212                 return false;
213             }
214         }
215         else if (!connection.equals(other.connection))
216         {
217             return false;
218         }
219         return true;
220     }
221 
222     public ByteBufferPool getBufferPool()
223     {
224         return this.connection.getBufferPool();
225     }
226     
227     public ClassLoader getClassLoader()
228     {
229         return this.getClass().getClassLoader();
230     }
231 
232     public LogicalConnection getConnection()
233     {
234         return connection;
235     }
236 
237     @Override
238     public WebSocketContainerScope getContainerScope()
239     {
240         return this.containerScope;
241     }
242 
243     public ExtensionFactory getExtensionFactory()
244     {
245         return extensionFactory;
246     }
247 
248     /**
249      * The idle timeout in milliseconds
250      */
251     @Override
252     public long getIdleTimeout()
253     {
254         return connection.getMaxIdleTimeout();
255     }
256 
257     @ManagedAttribute(readonly = true)
258     public IncomingFrames getIncomingHandler()
259     {
260         return incomingHandler;
261     }
262 
263     @Override
264     public InetSocketAddress getLocalAddress()
265     {
266         return connection.getLocalAddress();
267     }
268 
269     @ManagedAttribute(readonly = true)
270     public OutgoingFrames getOutgoingHandler()
271     {
272         return outgoingHandler;
273     }
274 
275     @Override
276     public WebSocketPolicy getPolicy()
277     {
278         return policy;
279     }
280 
281     @Override
282     public String getProtocolVersion()
283     {
284         return protocolVersion;
285     }
286 
287     @Override
288     public RemoteEndpoint getRemote()
289     {
290         if(LOG_OPEN.isDebugEnabled())
291             LOG_OPEN.debug("[{}] {}.getRemote()",policy.getBehavior(),this.getClass().getSimpleName());
292         ConnectionState state = connection.getIOState().getConnectionState();
293 
294         if ((state == ConnectionState.OPEN) || (state == ConnectionState.CONNECTED))
295         {
296             return remote;
297         }
298 
299         throw new WebSocketException("RemoteEndpoint unavailable, current state [" + state + "], expecting [OPEN or CONNECTED]");
300     }
301 
302     @Override
303     public InetSocketAddress getRemoteAddress()
304     {
305         return remote.getInetSocketAddress();
306     }
307 
308     public URI getRequestURI()
309     {
310         return requestURI;
311     }
312 
313     @Override
314     public UpgradeRequest getUpgradeRequest()
315     {
316         return this.upgradeRequest;
317     }
318 
319     @Override
320     public UpgradeResponse getUpgradeResponse()
321     {
322         return this.upgradeResponse;
323     }
324     
325 
326     @Override
327     public WebSocketSession getWebSocketSession()
328     {
329         return this;
330     }
331 
332     @Override
333     public int hashCode()
334     {
335         final int prime = 31;
336         int result = 1;
337         result = (prime * result) + ((connection == null)?0:connection.hashCode());
338         return result;
339     }
340 
341     /**
342      * Incoming Errors from Parser
343      */
344     @Override
345     public void incomingError(Throwable t)
346     {
347         if (connection.getIOState().isInputAvailable())
348         {
349             // Forward Errors to User WebSocket Object
350             websocket.incomingError(t);
351         }
352     }
353 
354     /**
355      * Incoming Raw Frames from Parser
356      */
357     @Override
358     public void incomingFrame(Frame frame)
359     {
360         ClassLoader old = Thread.currentThread().getContextClassLoader();
361         try
362         {
363             Thread.currentThread().setContextClassLoader(classLoader);
364             if (connection.getIOState().isInputAvailable())
365             {
366                 // Forward Frames Through Extension List
367                 incomingHandler.incomingFrame(frame);
368             }
369         }
370         finally
371         {
372             Thread.currentThread().setContextClassLoader(old);
373         }
374     }
375 
376     @Override
377     public boolean isOpen()
378     {
379         if (this.connection == null)
380         {
381             return false;
382         }
383         return this.connection.isOpen();
384     }
385 
386     @Override
387     public boolean isSecure()
388     {
389         if (upgradeRequest == null)
390         {
391             throw new IllegalStateException("No valid UpgradeRequest yet");
392         }
393 
394         URI requestURI = upgradeRequest.getRequestURI();
395 
396         return "wss".equalsIgnoreCase(requestURI.getScheme());
397     }
398 
399     public void notifyClose(int statusCode, String reason)
400     {
401         if (LOG.isDebugEnabled())
402         {
403             LOG.debug("notifyClose({},{})",statusCode,reason);
404         }
405         websocket.onClose(new CloseInfo(statusCode,reason));
406     }
407 
408     public void notifyError(Throwable cause)
409     {
410         incomingError(cause);
411     }
412     
413     @Override
414     public void onClosed(Connection connection)
415     {
416     }
417     
418     @Override
419     public void onOpened(Connection connection)
420     {
421         if(LOG_OPEN.isDebugEnabled())
422             LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
423         open();
424     }
425 
426     @SuppressWarnings("incomplete-switch")
427     @Override
428     public void onConnectionStateChange(ConnectionState state)
429     {
430         switch (state)
431         {
432             case CLOSED:
433                 IOState ioState = this.connection.getIOState();
434                 CloseInfo close = ioState.getCloseInfo();
435                 // confirmed close of local endpoint
436                 notifyClose(close.getStatusCode(),close.getReason());
437                 try
438                 {
439                     if (LOG.isDebugEnabled())
440                         LOG.debug("{}.onSessionClosed()",containerScope.getClass().getSimpleName());
441                     containerScope.onSessionClosed(this);
442                 }
443                 catch (Throwable t)
444                 {
445                     LOG.ignore(t);
446                 }
447                 break;
448             case CONNECTED:
449                 // notify session listeners
450                 try
451                 {
452                     if (LOG.isDebugEnabled())
453                         LOG.debug("{}.onSessionOpened()",containerScope.getClass().getSimpleName());
454                     containerScope.onSessionOpened(this);
455                 }
456                 catch (Throwable t)
457                 {
458                     LOG.ignore(t);
459                 }
460                 break;
461         }
462     }
463     
464     /**
465      * Open/Activate the session
466      */
467     public void open()
468     {
469         if(LOG_OPEN.isDebugEnabled())
470             LOG_OPEN.debug("[{}] {}.open()",policy.getBehavior(),this.getClass().getSimpleName());
471 
472         if (remote != null)
473         {
474             // already opened
475             return;
476         }
477         
478         try(ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader)) 
479         {
480             // Upgrade success
481             connection.getIOState().onConnected();
482     
483             // Connect remote
484             remote = new WebSocketRemoteEndpoint(connection,outgoingHandler,getBatchMode());
485             if(LOG_OPEN.isDebugEnabled())
486                 LOG_OPEN.debug("[{}] {}.open() remote={}",policy.getBehavior(),this.getClass().getSimpleName(),remote);
487             
488             // Open WebSocket
489             websocket.openSession(this);
490 
491             // Open connection
492             connection.getIOState().onOpened();
493 
494             if (LOG.isDebugEnabled())
495             {
496                 LOG.debug("open -> {}",dump());
497             }
498         }
499         catch (CloseException ce)
500         {
501             LOG.warn(ce);
502             close(ce.getStatusCode(),ce.getMessage());
503         }
504         catch (Throwable t)
505         {
506             LOG.warn(t);
507             // Exception on end-user WS-Endpoint.
508             // Fast-fail & close connection with reason.
509             int statusCode = StatusCode.SERVER_ERROR;
510             if(policy.getBehavior() == WebSocketBehavior.CLIENT)
511             {
512                 statusCode = StatusCode.POLICY_VIOLATION;
513             }
514             close(statusCode,t.getMessage());
515         }
516     }
517     
518     public void setExtensionFactory(ExtensionFactory extensionFactory)
519     {
520         this.extensionFactory = extensionFactory;
521     }
522 
523     /**
524      * Set the timeout in milliseconds
525      */
526     @Override
527     public void setIdleTimeout(long ms)
528     {
529         connection.setMaxIdleTimeout(ms);
530     }
531 
532     public void setOutgoingHandler(OutgoingFrames outgoing)
533     {
534         this.outgoingHandler = outgoing;
535     }
536 
537     public void setPolicy(WebSocketPolicy policy)
538     {
539         this.policy = policy;
540     }
541 
542     public void setUpgradeRequest(UpgradeRequest request)
543     {
544         this.upgradeRequest = request;
545         this.protocolVersion = request.getProtocolVersion();
546         this.parameterMap.clear();
547         if (request.getParameterMap() != null)
548         {
549             for (Map.Entry<String, List<String>> entry : request.getParameterMap().entrySet())
550             {
551                 List<String> values = entry.getValue();
552                 if (values != null)
553                 {
554                     this.parameterMap.put(entry.getKey(),values.toArray(new String[values.size()]));
555                 }
556                 else
557                 {
558                     this.parameterMap.put(entry.getKey(),new String[0]);
559                 }
560             }
561         }
562     }
563 
564     public void setUpgradeResponse(UpgradeResponse response)
565     {
566         this.upgradeResponse = response;
567     }
568 
569     @Override
570     public SuspendToken suspend()
571     {
572         return connection.suspend();
573     }
574 
575     /**
576      * @return the default (initial) value for the batching mode.
577      */
578     public BatchMode getBatchMode()
579     {
580         return BatchMode.AUTO;
581     }
582 
583     @Override
584     public String toString()
585     {
586         StringBuilder builder = new StringBuilder();
587         builder.append("WebSocketSession[");
588         builder.append("websocket=").append(websocket);
589         builder.append(",behavior=").append(policy.getBehavior());
590         builder.append(",connection=").append(connection);
591         builder.append(",remote=").append(remote);
592         builder.append(",incoming=").append(incomingHandler);
593         builder.append(",outgoing=").append(outgoingHandler);
594         builder.append("]");
595         return builder.toString();
596     }
597 
598 }