View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.URI;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Objects;
28  import java.util.concurrent.Executor;
29  
30  import org.eclipse.jetty.io.ByteBufferPool;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.util.annotation.ManagedAttribute;
33  import org.eclipse.jetty.util.annotation.ManagedObject;
34  import org.eclipse.jetty.util.component.ContainerLifeCycle;
35  import org.eclipse.jetty.util.component.Dumpable;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  import org.eclipse.jetty.util.thread.ThreadClassLoaderScope;
39  import org.eclipse.jetty.websocket.api.BatchMode;
40  import org.eclipse.jetty.websocket.api.CloseException;
41  import org.eclipse.jetty.websocket.api.CloseStatus;
42  import org.eclipse.jetty.websocket.api.RemoteEndpoint;
43  import org.eclipse.jetty.websocket.api.Session;
44  import org.eclipse.jetty.websocket.api.StatusCode;
45  import org.eclipse.jetty.websocket.api.SuspendToken;
46  import org.eclipse.jetty.websocket.api.UpgradeRequest;
47  import org.eclipse.jetty.websocket.api.UpgradeResponse;
48  import org.eclipse.jetty.websocket.api.WebSocketBehavior;
49  import org.eclipse.jetty.websocket.api.WebSocketException;
50  import org.eclipse.jetty.websocket.api.WebSocketPolicy;
51  import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
52  import org.eclipse.jetty.websocket.api.extensions.Frame;
53  import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
54  import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
55  import org.eclipse.jetty.websocket.common.events.EventDriver;
56  import org.eclipse.jetty.websocket.common.io.IOState;
57  import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
58  import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
59  import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
60  
61  @ManagedObject("A Jetty WebSocket Session")
62  public class WebSocketSession extends ContainerLifeCycle implements Session, WebSocketSessionScope, IncomingFrames, Connection.Listener, ConnectionStateListener
63  {
64      private static final Logger LOG = Log.getLogger(WebSocketSession.class);
65      private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN");
66      private final WebSocketContainerScope containerScope;
67      private final URI requestURI;
68      private final LogicalConnection connection;
69      private final EventDriver websocket;
70      private final SessionListener[] sessionListeners;
71      private final Executor executor;
72      private ClassLoader classLoader;
73      private ExtensionFactory extensionFactory;
74      private String protocolVersion;
75      private Map<String, String[]> parameterMap = new HashMap<>();
76      private WebSocketRemoteEndpoint remote;
77      private IncomingFrames incomingHandler;
78      private OutgoingFrames outgoingHandler;
79      private WebSocketPolicy policy;
80      private UpgradeRequest upgradeRequest;
81      private UpgradeResponse upgradeResponse;
82  
83      public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener... sessionListeners)
84      {
85          Objects.requireNonNull(containerScope,"Container Scope cannot be null");
86          Objects.requireNonNull(requestURI,"Request URI cannot be null");
87  
88          this.classLoader = Thread.currentThread().getContextClassLoader();
89          this.containerScope = containerScope;
90          this.requestURI = requestURI;
91          this.websocket = websocket;
92          this.connection = connection;
93          this.sessionListeners = sessionListeners;
94          this.executor = connection.getExecutor();
95          this.outgoingHandler = connection;
96          this.incomingHandler = websocket;
97          this.connection.getIOState().addListener(this);
98          
99          addBean(this.connection);
100         addBean(this.websocket);
101     }
102 
103     @Override
104     public void close()
105     {
106         connection.close();
107     }
108 
109     @Override
110     public void close(CloseStatus closeStatus)
111     {
112         this.close(closeStatus.getCode(),closeStatus.getPhrase());
113     }
114 
115     @Override
116     public void close(int statusCode, String reason)
117     {
118         connection.close(statusCode,reason);
119     }
120 
121     /**
122      * Harsh disconnect
123      */
124     @Override
125     public void disconnect()
126     {
127         connection.disconnect();
128 
129         // notify of harsh disconnect
130         notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
131     }
132 
133     public void dispatch(Runnable runnable)
134     {
135         executor.execute(runnable);
136     }
137     
138     @Override
139     protected void doStart() throws Exception
140     {
141         if(LOG.isDebugEnabled())
142             LOG.debug("starting - {}",this);
143 
144         super.doStart();
145     }
146     
147     @Override
148     protected void doStop() throws Exception
149     {
150         if(LOG.isDebugEnabled())
151             LOG.debug("stopping - {}",this);
152         
153         if (getConnection() != null)
154         {
155             try
156             {
157                 getConnection().close(StatusCode.SHUTDOWN,"Shutdown");
158             }
159             catch (Throwable t)
160             {
161                 LOG.debug("During Connection Shutdown",t);
162             }
163         }
164         super.doStop();
165     }
166     
167     @Override
168     public void dump(Appendable out, String indent) throws IOException
169     {
170         dumpThis(out);
171         out.append(indent).append(" +- incomingHandler : ");
172         if (incomingHandler instanceof Dumpable)
173         {
174             ((Dumpable)incomingHandler).dump(out,indent + "    ");
175         }
176         else
177         {
178             out.append(incomingHandler.toString()).append(System.lineSeparator());
179         }
180 
181         out.append(indent).append(" +- outgoingHandler : ");
182         if (outgoingHandler instanceof Dumpable)
183         {
184             ((Dumpable)outgoingHandler).dump(out,indent + "    ");
185         }
186         else
187         {
188             out.append(outgoingHandler.toString()).append(System.lineSeparator());
189         }
190     }
191 
192     @Override
193     public boolean equals(Object obj)
194     {
195         if (this == obj)
196         {
197             return true;
198         }
199         if (obj == null)
200         {
201             return false;
202         }
203         if (getClass() != obj.getClass())
204         {
205             return false;
206         }
207         WebSocketSession other = (WebSocketSession)obj;
208         if (connection == null)
209         {
210             if (other.connection != null)
211             {
212                 return false;
213             }
214         }
215         else if (!connection.equals(other.connection))
216         {
217             return false;
218         }
219         return true;
220     }
221 
222     public ByteBufferPool getBufferPool()
223     {
224         return this.connection.getBufferPool();
225     }
226     
227     public ClassLoader getClassLoader()
228     {
229         return this.getClass().getClassLoader();
230     }
231 
232     public LogicalConnection getConnection()
233     {
234         return connection;
235     }
236 
237     @Override
238     public WebSocketContainerScope getContainerScope()
239     {
240         return this.containerScope;
241     }
242 
243     public ExtensionFactory getExtensionFactory()
244     {
245         return extensionFactory;
246     }
247 
248     /**
249      * The idle timeout in milliseconds
250      */
251     @Override
252     public long getIdleTimeout()
253     {
254         return connection.getMaxIdleTimeout();
255     }
256 
257     @ManagedAttribute(readonly = true)
258     public IncomingFrames getIncomingHandler()
259     {
260         return incomingHandler;
261     }
262 
263     @Override
264     public InetSocketAddress getLocalAddress()
265     {
266         return connection.getLocalAddress();
267     }
268 
269     @ManagedAttribute(readonly = true)
270     public OutgoingFrames getOutgoingHandler()
271     {
272         return outgoingHandler;
273     }
274 
275     @Override
276     public WebSocketPolicy getPolicy()
277     {
278         return policy;
279     }
280 
281     @Override
282     public String getProtocolVersion()
283     {
284         return protocolVersion;
285     }
286 
287     @Override
288     public RemoteEndpoint getRemote()
289     {
290         if(LOG_OPEN.isDebugEnabled())
291             LOG_OPEN.debug("[{}] {}.getRemote()",policy.getBehavior(),this.getClass().getSimpleName());
292         ConnectionState state = connection.getIOState().getConnectionState();
293 
294         if ((state == ConnectionState.OPEN) || (state == ConnectionState.CONNECTED))
295         {
296             return remote;
297         }
298 
299         throw new WebSocketException("RemoteEndpoint unavailable, current state [" + state + "], expecting [OPEN or CONNECTED]");
300     }
301 
302     @Override
303     public InetSocketAddress getRemoteAddress()
304     {
305         return remote.getInetSocketAddress();
306     }
307 
308     public URI getRequestURI()
309     {
310         return requestURI;
311     }
312 
313     @Override
314     public UpgradeRequest getUpgradeRequest()
315     {
316         return this.upgradeRequest;
317     }
318 
319     @Override
320     public UpgradeResponse getUpgradeResponse()
321     {
322         return this.upgradeResponse;
323     }
324     
325 
326     @Override
327     public WebSocketSession getWebSocketSession()
328     {
329         return this;
330     }
331 
332     @Override
333     public int hashCode()
334     {
335         final int prime = 31;
336         int result = 1;
337         result = (prime * result) + ((connection == null)?0:connection.hashCode());
338         return result;
339     }
340 
341     /**
342      * Incoming Errors from Parser
343      */
344     @Override
345     public void incomingError(Throwable t)
346     {
347         if (connection.getIOState().isInputAvailable())
348         {
349             // Forward Errors to User WebSocket Object
350             websocket.incomingError(t);
351         }
352     }
353 
354     /**
355      * Incoming Raw Frames from Parser
356      */
357     @Override
358     public void incomingFrame(Frame frame)
359     {
360         ClassLoader old = Thread.currentThread().getContextClassLoader();
361         try
362         {
363             Thread.currentThread().setContextClassLoader(classLoader);
364             if (connection.getIOState().isInputAvailable())
365             {
366                 // Forward Frames Through Extension List
367                 incomingHandler.incomingFrame(frame);
368             }
369         }
370         finally
371         {
372             Thread.currentThread().setContextClassLoader(old);
373         }
374     }
375 
376     @Override
377     public boolean isOpen()
378     {
379         if (this.connection == null)
380         {
381             return false;
382         }
383         return this.connection.isOpen();
384     }
385 
386     @Override
387     public boolean isSecure()
388     {
389         if (upgradeRequest == null)
390         {
391             throw new IllegalStateException("No valid UpgradeRequest yet");
392         }
393 
394         URI requestURI = upgradeRequest.getRequestURI();
395 
396         return "wss".equalsIgnoreCase(requestURI.getScheme());
397     }
398 
399     public void notifyClose(int statusCode, String reason)
400     {
401         if (LOG.isDebugEnabled())
402         {
403             LOG.debug("notifyClose({},{})",statusCode,reason);
404         }
405         websocket.onClose(new CloseInfo(statusCode,reason));
406     }
407 
408     public void notifyError(Throwable cause)
409     {
410         incomingError(cause);
411     }
412     
413     @Override
414     public void onClosed(Connection connection)
415     {
416     }
417     
418     @Override
419     public void onOpened(Connection connection)
420     {
421         if(LOG_OPEN.isDebugEnabled())
422             LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
423         open();
424     }
425 
426     @SuppressWarnings("incomplete-switch")
427     @Override
428     public void onConnectionStateChange(ConnectionState state)
429     {
430         switch (state)
431         {
432             case CLOSED:
433                 IOState ioState = this.connection.getIOState();
434                 CloseInfo close = ioState.getCloseInfo();
435                 // confirmed close of local endpoint
436                 notifyClose(close.getStatusCode(),close.getReason());
437                 
438                 // notify session listeners
439                 for (SessionListener listener : sessionListeners)
440                 {
441                     try
442                     {
443                         if (LOG.isDebugEnabled())
444                             LOG.debug("{}.onSessionClosed()",listener.getClass().getSimpleName());
445                         listener.onSessionClosed(this);
446                     }
447                     catch (Throwable t)
448                     {
449                         LOG.ignore(t);
450                     }
451                 }
452                 break;
453             case CONNECTED:
454                 // notify session listeners
455                 for (SessionListener listener : sessionListeners)
456                 {
457                     try
458                     {
459                         if (LOG.isDebugEnabled())
460                             LOG.debug("{}.onSessionOpen()", listener.getClass().getSimpleName());
461                         listener.onSessionOpened(this);
462                     }
463                     catch (Throwable t)
464                     {
465                         LOG.ignore(t);
466                     }
467                 }
468                 break;
469         }
470     }
471     
472     /**
473      * Open/Activate the session
474      */
475     public void open()
476     {
477         if(LOG_OPEN.isDebugEnabled())
478             LOG_OPEN.debug("[{}] {}.open()",policy.getBehavior(),this.getClass().getSimpleName());
479 
480         if (remote != null)
481         {
482             // already opened
483             return;
484         }
485         
486         try(ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader)) 
487         {
488             // Upgrade success
489             connection.getIOState().onConnected();
490     
491             // Connect remote
492             remote = new WebSocketRemoteEndpoint(connection,outgoingHandler,getBatchMode());
493             if(LOG_OPEN.isDebugEnabled())
494                 LOG_OPEN.debug("[{}] {}.open() remote={}",policy.getBehavior(),this.getClass().getSimpleName(),remote);
495             
496             // Open WebSocket
497             websocket.openSession(this);
498 
499             // Open connection
500             connection.getIOState().onOpened();
501 
502             if (LOG.isDebugEnabled())
503             {
504                 LOG.debug("open -> {}",dump());
505             }
506         }
507         catch (CloseException ce)
508         {
509             LOG.warn(ce);
510             close(ce.getStatusCode(),ce.getMessage());
511         }
512         catch (Throwable t)
513         {
514             LOG.warn(t);
515             // Exception on end-user WS-Endpoint.
516             // Fast-fail & close connection with reason.
517             int statusCode = StatusCode.SERVER_ERROR;
518             if(policy.getBehavior() == WebSocketBehavior.CLIENT)
519             {
520                 statusCode = StatusCode.POLICY_VIOLATION;
521             }
522             close(statusCode,t.getMessage());
523         }
524     }
525     
526     public void setExtensionFactory(ExtensionFactory extensionFactory)
527     {
528         this.extensionFactory = extensionFactory;
529     }
530 
531     /**
532      * Set the timeout in milliseconds
533      */
534     @Override
535     public void setIdleTimeout(long ms)
536     {
537         connection.setMaxIdleTimeout(ms);
538     }
539 
540     public void setOutgoingHandler(OutgoingFrames outgoing)
541     {
542         this.outgoingHandler = outgoing;
543     }
544 
545     public void setPolicy(WebSocketPolicy policy)
546     {
547         this.policy = policy;
548     }
549 
550     public void setUpgradeRequest(UpgradeRequest request)
551     {
552         this.upgradeRequest = request;
553         this.protocolVersion = request.getProtocolVersion();
554         this.parameterMap.clear();
555         if (request.getParameterMap() != null)
556         {
557             for (Map.Entry<String, List<String>> entry : request.getParameterMap().entrySet())
558             {
559                 List<String> values = entry.getValue();
560                 if (values != null)
561                 {
562                     this.parameterMap.put(entry.getKey(),values.toArray(new String[values.size()]));
563                 }
564                 else
565                 {
566                     this.parameterMap.put(entry.getKey(),new String[0]);
567                 }
568             }
569         }
570     }
571 
572     public void setUpgradeResponse(UpgradeResponse response)
573     {
574         this.upgradeResponse = response;
575     }
576 
577     @Override
578     public SuspendToken suspend()
579     {
580         return connection.suspend();
581     }
582 
583     /**
584      * @return the default (initial) value for the batching mode.
585      */
586     public BatchMode getBatchMode()
587     {
588         return BatchMode.AUTO;
589     }
590 
591     @Override
592     public String toString()
593     {
594         StringBuilder builder = new StringBuilder();
595         builder.append("WebSocketSession[");
596         builder.append("websocket=").append(websocket);
597         builder.append(",behavior=").append(policy.getBehavior());
598         builder.append(",connection=").append(connection);
599         builder.append(",remote=").append(remote);
600         builder.append(",incoming=").append(incomingHandler);
601         builder.append(",outgoing=").append(outgoingHandler);
602         builder.append("]");
603         return builder.toString();
604     }
605 
606 }