View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.util.List;
24  import java.util.concurrent.CopyOnWriteArrayList;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.eclipse.jetty.util.StringUtil;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.websocket.api.StatusCode;
31  import org.eclipse.jetty.websocket.common.CloseInfo;
32  import org.eclipse.jetty.websocket.common.ConnectionState;
33  
34  /**
35   * Simple state tracker for Input / Output and {@link ConnectionState}.
36   * <p>
37   * Use the various known .on*() methods to trigger a state change.
38   * <ul>
39   * <li>{@link #onOpened()} - connection has been opened</li>
40   * </ul>
41   */
42  public class IOState
43  {
44      /**
45       * The source of a close handshake. (ie: who initiated it).
46       */
47      private static enum CloseHandshakeSource
48      {
49          /** No close handshake initiated (yet) */
50          NONE,
51          /** Local side initiated the close handshake */
52          LOCAL,
53          /** Remote side initiated the close handshake */
54          REMOTE,
55          /** An abnormal close situation (disconnect, timeout, etc...) */
56          ABNORMAL;
57      }
58  
59      public static interface ConnectionStateListener
60      {
61          public void onConnectionStateChange(ConnectionState state);
62      }
63  
64      private static final Logger LOG = Log.getLogger(IOState.class);
65      private ConnectionState state;
66      private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
67  
68      /** 
69       * Is input on websocket available (for reading frames).
70       * Used to determine close handshake completion, and track half-close states
71       */
72      private boolean inputAvailable;
73      /** 
74       * Is output on websocket available (for writing frames).
75       * Used to determine close handshake completion, and track half-closed states.
76       */
77      private boolean outputAvailable;
78      /** 
79       * Initiator of the close handshake.
80       * Used to determine who initiated a close handshake for reply reasons.
81       */
82      private CloseHandshakeSource closeHandshakeSource;
83      /**
84       * The close info for the initiator of the close handshake.
85       * It is possible in abnormal close scenarios to have a different
86       * final close info that is used to notify the WS-Endpoint's onClose()
87       * events with.
88       */
89      private CloseInfo closeInfo;
90      /**
91       * Atomic reference to the final close info.
92       * This can only be set once, and is used for the WS-Endpoint's onClose()
93       * event.
94       */
95      private AtomicReference<CloseInfo> finalClose = new AtomicReference<>();
96      /**
97       * Tracker for if the close handshake was completed successfully by
98       * both sides.  False if close was sudden or abnormal.
99       */
100     private boolean cleanClose;
101 
102     /**
103      * Create a new IOState, initialized to {@link ConnectionState#CONNECTING}
104      */
105     public IOState()
106     {
107         this.state = ConnectionState.CONNECTING;
108         this.inputAvailable = false;
109         this.outputAvailable = false;
110         this.closeHandshakeSource = CloseHandshakeSource.NONE;
111         this.closeInfo = null;
112         this.cleanClose = false;
113     }
114 
115     public void addListener(ConnectionStateListener listener)
116     {
117         listeners.add(listener);
118     }
119 
120     public void assertInputOpen() throws IOException
121     {
122         if (!isInputAvailable())
123         {
124             throw new IOException("Connection input is closed");
125         }
126     }
127 
128     public void assertOutputOpen() throws IOException
129     {
130         if (!isOutputAvailable())
131         {
132             throw new IOException("Connection output is closed");
133         }
134     }
135 
136     public CloseInfo getCloseInfo()
137     {
138         CloseInfo ci = finalClose.get();
139         if (ci != null)
140         {
141             return ci;
142         }
143         return closeInfo;
144     }
145 
146     public ConnectionState getConnectionState()
147     {
148         return state;
149     }
150 
151     public boolean isClosed()
152     {
153         synchronized (state)
154         {
155             return (state == ConnectionState.CLOSED);
156         }
157     }
158 
159     public boolean isInputAvailable()
160     {
161         return inputAvailable;
162     }
163 
164     public boolean isOpen()
165     {
166         return (getConnectionState() != ConnectionState.CLOSED);
167     }
168 
169     public boolean isOutputAvailable()
170     {
171         return outputAvailable;
172     }
173 
174     private void notifyStateListeners(ConnectionState state)
175     {
176         if (LOG.isDebugEnabled())
177             LOG.debug("Notify State Listeners: {}",state);
178         for (ConnectionStateListener listener : listeners)
179         {
180             if (LOG.isDebugEnabled())
181             {
182                 LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
183             }
184             listener.onConnectionStateChange(state);
185         }
186     }
187 
188     /**
189      * A websocket connection has been disconnected for abnormal close reasons.
190      * <p>
191      * This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout.
192      * @param close the close information
193      */
194     public void onAbnormalClose(CloseInfo close)
195     {
196         if (LOG.isDebugEnabled())
197             LOG.debug("onAbnormalClose({})",close);
198         ConnectionState event = null;
199         synchronized (this)
200         {
201             if (this.state == ConnectionState.CLOSED)
202             {
203                 // already closed
204                 return;
205             }
206 
207             if (this.state == ConnectionState.OPEN)
208             {
209                 this.cleanClose = false;
210             }
211 
212             this.state = ConnectionState.CLOSED;
213             finalClose.compareAndSet(null,close);
214             this.inputAvailable = false;
215             this.outputAvailable = false;
216             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
217             event = this.state;
218         }
219         notifyStateListeners(event);
220     }
221 
222     /**
223      * A close handshake has been issued from the local endpoint
224      * @param close the close information
225      */
226     public void onCloseLocal(CloseInfo close)
227     {
228         ConnectionState event = null;
229         ConnectionState abnormalEvent = null;
230         ConnectionState initialState = this.state;
231         if (LOG.isDebugEnabled())
232             LOG.debug("onCloseLocal({}) : {}",close,initialState);
233         if (initialState == ConnectionState.CLOSED)
234         {
235             // already closed
236             LOG.debug("already closed");
237             return;
238         }
239 
240         if (initialState == ConnectionState.CONNECTED)
241         {
242             // fast close. a local close request from end-user onConnect/onOpen method
243             LOG.debug("FastClose in CONNECTED detected");
244             // Force the state open (to allow read/write to endpoint)
245             onOpened();
246             if (LOG.isDebugEnabled())
247                 LOG.debug("FastClose continuing with Closure");
248         }
249 
250         synchronized (this)
251         {
252             closeInfo = close;
253             
254             // Turn off further output
255             outputAvailable = false;
256 
257             boolean in = inputAvailable;
258             boolean out = outputAvailable;
259             if (closeHandshakeSource == CloseHandshakeSource.NONE)
260             {
261                 closeHandshakeSource = CloseHandshakeSource.LOCAL;
262             }
263             
264             LOG.debug("onCloseLocal(), input={}, output={}",in,out);
265 
266             if (!in && !out)
267             {
268                 LOG.debug("Close Handshake satisfied, disconnecting");
269                 cleanClose = true;
270                 this.state = ConnectionState.CLOSED;
271                 finalClose.compareAndSet(null,close);
272                 event = this.state;
273             }
274             else if (this.state == ConnectionState.OPEN)
275             {
276                 // We are now entering CLOSING (or half-closed)
277                 this.state = ConnectionState.CLOSING;
278                 event = this.state;
279                 
280                 // if abnormal, we don't expect an answer.
281                 if (close.isAbnormal())
282                 {
283                     abnormalEvent = ConnectionState.CLOSED;
284                     finalClose.compareAndSet(null,close);
285                     cleanClose = false;
286                     outputAvailable = false;
287                     inputAvailable = false;
288                     closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
289                 }
290             }
291         }
292 
293         // Only notify on state change events
294         if (event != null)
295         {
296             notifyStateListeners(event);
297             
298             if(abnormalEvent != null) 
299             {
300                 notifyStateListeners(abnormalEvent);
301             }
302         }
303     }
304 
305     /**
306      * A close handshake has been received from the remote endpoint
307      * @param close the close information
308      */
309     public void onCloseRemote(CloseInfo close)
310     {
311         if (LOG.isDebugEnabled())
312             LOG.debug("onCloseRemote({})",close);
313         ConnectionState event = null;
314         synchronized (this)
315         {
316             if (this.state == ConnectionState.CLOSED)
317             {
318                 // already closed
319                 return;
320             }
321 
322             closeInfo = close;
323             
324             // turn off further input
325             inputAvailable = false;
326 
327             boolean in = inputAvailable;
328             boolean out = outputAvailable;
329             if (closeHandshakeSource == CloseHandshakeSource.NONE)
330             {
331                 closeHandshakeSource = CloseHandshakeSource.REMOTE;
332             }
333 
334             if (LOG.isDebugEnabled())
335                 LOG.debug("onCloseRemote(), input={}, output={}",in,out);
336 
337             if (!in && !out)
338             {
339                 LOG.debug("Close Handshake satisfied, disconnecting");
340                 cleanClose = true;
341                 state = ConnectionState.CLOSED;
342                 finalClose.compareAndSet(null,close);
343                 event = this.state;
344             }
345             else if (this.state == ConnectionState.OPEN)
346             {
347                 // We are now entering CLOSING (or half-closed)
348                 this.state = ConnectionState.CLOSING;
349                 event = this.state;
350             }
351         }
352 
353         // Only notify on state change events
354         if (event != null)
355         {
356             notifyStateListeners(event);
357         }
358     }
359 
360     /**
361      * WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet.
362      * <p>
363      * This is an intermediate state between the RFC's {@link ConnectionState#CONNECTING} and {@link ConnectionState#OPEN}
364      */
365     public void onConnected()
366     {
367         ConnectionState event = null;
368         synchronized (this)
369         {
370             if (this.state != ConnectionState.CONNECTING)
371             {
372                 LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
373                 return;
374             }
375 
376             this.state = ConnectionState.CONNECTED;
377             inputAvailable = false; // cannot read (yet)
378             outputAvailable = true; // write allowed
379             event = this.state;
380         }
381         notifyStateListeners(event);
382     }
383 
384     /**
385      * A websocket connection has failed its upgrade handshake, and is now closed.
386      */
387     public void onFailedUpgrade()
388     {
389         assert (this.state == ConnectionState.CONNECTING);
390         ConnectionState event = null;
391         synchronized (this)
392         {
393             this.state = ConnectionState.CLOSED;
394             cleanClose = false;
395             inputAvailable = false;
396             outputAvailable = false;
397             event = this.state;
398         }
399         notifyStateListeners(event);
400     }
401 
402     /**
403      * A websocket connection has finished its upgrade handshake, and is now open.
404      */
405     public void onOpened()
406     {
407         if(LOG.isDebugEnabled())
408             LOG.debug(" onOpened()");
409         
410         ConnectionState event = null;
411         synchronized (this)
412         {
413             if (this.state == ConnectionState.OPEN)
414             {
415                 // already opened
416                 return;
417             }
418 
419             if (this.state != ConnectionState.CONNECTED)
420             {
421                 LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
422                 return;
423             }
424 
425             this.state = ConnectionState.OPEN;
426             this.inputAvailable = true;
427             this.outputAvailable = true;
428             event = this.state;
429         }
430         notifyStateListeners(event);
431     }
432 
433     /**
434      * The local endpoint has reached a read failure.
435      * <p>
436      * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect.
437      * @param t the read failure
438      */
439     public void onReadFailure(Throwable t)
440     {
441         ConnectionState event = null;
442         synchronized (this)
443         {
444             if (this.state == ConnectionState.CLOSED)
445             {
446                 // already closed
447                 return;
448             }
449 
450          // Build out Close Reason
451             String reason = "WebSocket Read Failure";
452             if (t instanceof EOFException)
453             {
454                 reason = "WebSocket Read EOF";
455                 Throwable cause = t.getCause();
456                 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
457                 {
458                     reason = "EOF: " + cause.getMessage();
459                 }
460             }
461             else
462             {
463                 if (StringUtil.isNotBlank(t.getMessage()))
464                 {
465                     reason = t.getMessage();
466                 }
467             }
468 
469             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
470 
471             finalClose.compareAndSet(null,close);
472 
473             this.cleanClose = false;
474             this.state = ConnectionState.CLOSED;
475             this.closeInfo = close;
476             this.inputAvailable = false;
477             this.outputAvailable = false;
478             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
479             event = this.state;
480         }
481         notifyStateListeners(event);
482     }
483 
484     /**
485      * The local endpoint has reached a write failure.
486      * <p>
487      * A low level I/O failure, or even a jetty side EndPoint close (from idle timeout) are a few scenarios
488      * @param t the throwable that caused the write failure
489      */
490     public void onWriteFailure(Throwable t)
491     {
492         ConnectionState event = null;
493         synchronized (this)
494         {
495             if (this.state == ConnectionState.CLOSED)
496             {
497                 // already closed
498                 return;
499             }
500 
501             // Build out Close Reason
502             String reason = "WebSocket Write Failure";
503             if (t instanceof EOFException)
504             {
505                 reason = "WebSocket Write EOF";
506                 Throwable cause = t.getCause();
507                 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
508                 {
509                     reason = "EOF: " + cause.getMessage();
510                 }
511             }
512             else
513             {
514                 if (StringUtil.isNotBlank(t.getMessage()))
515                 {
516                     reason = t.getMessage();
517                 }
518             }
519 
520             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
521 
522             finalClose.compareAndSet(null,close);
523 
524             this.cleanClose = false;
525             this.state = ConnectionState.CLOSED;
526             this.inputAvailable = false;
527             this.outputAvailable = false;
528             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
529             event = this.state;
530         }
531         notifyStateListeners(event);
532     }
533 
534     public void onDisconnected()
535     {
536         ConnectionState event = null;
537         synchronized (this)
538         {
539             if (this.state == ConnectionState.CLOSED)
540             {
541                 // already closed
542                 return;
543             }
544 
545             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
546 
547             this.cleanClose = false;
548             this.state = ConnectionState.CLOSED;
549             this.closeInfo = close;
550             this.inputAvailable = false;
551             this.outputAvailable = false;
552             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
553             event = this.state;
554         }
555         notifyStateListeners(event);
556     }
557 
558     @Override
559     public String toString()
560     {
561         StringBuilder str = new StringBuilder();
562         str.append(this.getClass().getSimpleName());
563         str.append("@").append(Integer.toHexString(hashCode()));
564         str.append("[").append(state);
565         str.append(',');
566         if (!inputAvailable)
567         {
568             str.append('!');
569         }
570         str.append("in,");
571         if (!outputAvailable)
572         {
573             str.append('!');
574         }
575         str.append("out");
576         if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
577         {
578             CloseInfo ci = finalClose.get();
579             if (ci != null)
580             {
581                 str.append(",finalClose=").append(ci);
582             }
583             else
584             {
585                 str.append(",close=").append(closeInfo);
586             }
587             str.append(",clean=").append(cleanClose);
588             str.append(",closeSource=").append(closeHandshakeSource);
589         }
590         str.append(']');
591         return str.toString();
592     }
593 
594     public boolean wasAbnormalClose()
595     {
596         return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
597     }
598 
599     public boolean wasCleanClose()
600     {
601         return cleanClose;
602     }
603 
604     public boolean wasLocalCloseInitiated()
605     {
606         return closeHandshakeSource == CloseHandshakeSource.LOCAL;
607     }
608 
609     public boolean wasRemoteCloseInitiated()
610     {
611         return closeHandshakeSource == CloseHandshakeSource.REMOTE;
612     }
613 
614 }