View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.util.List;
24  import java.util.concurrent.CopyOnWriteArrayList;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.eclipse.jetty.util.StringUtil;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.websocket.api.StatusCode;
31  import org.eclipse.jetty.websocket.common.CloseInfo;
32  import org.eclipse.jetty.websocket.common.ConnectionState;
33  
34  /**
35   * Simple state tracker for Input / Output and {@link ConnectionState}.
36   * <p>
37   * Use the various known .on*() methods to trigger a state change.
38   * <ul>
39   * <li>{@link #onOpened()} - connection has been opened</li>
40   * </ul>
41   */
42  public class IOState
43  {
44      /**
45       * The source of a close handshake. (ie: who initiated it).
46       */
47      private enum CloseHandshakeSource
48      {
49          /** No close handshake initiated (yet) */
50          NONE,
51          /** Local side initiated the close handshake */
52          LOCAL,
53          /** Remote side initiated the close handshake */
54          REMOTE,
55          /** An abnormal close situation (disconnect, timeout, etc...) */
56          ABNORMAL
57      }
58  
59      public static interface ConnectionStateListener
60      {
61          public void onConnectionStateChange(ConnectionState state);
62      }
63  
64      private static final Logger LOG = Log.getLogger(IOState.class);
65      private ConnectionState state;
66      private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
67  
68      /**
69       * Is input on websocket available (for reading frames).
70       * Used to determine close handshake completion, and track half-close states
71       */
72      private boolean inputAvailable;
73      /**
74       * Is output on websocket available (for writing frames).
75       * Used to determine close handshake completion, and track half-closed states.
76       */
77      private boolean outputAvailable;
78      /**
79       * Initiator of the close handshake.
80       * Used to determine who initiated a close handshake for reply reasons.
81       */
82      private CloseHandshakeSource closeHandshakeSource;
83      /**
84       * The close info for the initiator of the close handshake.
85       * It is possible in abnormal close scenarios to have a different
86       * final close info that is used to notify the WS-Endpoint's onClose()
87       * events with.
88       */
89      private CloseInfo closeInfo;
90      /**
91       * Atomic reference to the final close info.
92       * This can only be set once, and is used for the WS-Endpoint's onClose()
93       * event.
94       */
95      private AtomicReference<CloseInfo> finalClose = new AtomicReference<>();
96      /**
97       * Tracker for if the close handshake was completed successfully by
98       * both sides.  False if close was sudden or abnormal.
99       */
100     private boolean cleanClose;
101 
102     /**
103      * Create a new IOState, initialized to {@link ConnectionState#CONNECTING}
104      */
105     public IOState()
106     {
107         this.state = ConnectionState.CONNECTING;
108         this.inputAvailable = false;
109         this.outputAvailable = false;
110         this.closeHandshakeSource = CloseHandshakeSource.NONE;
111         this.closeInfo = null;
112         this.cleanClose = false;
113     }
114 
115     public void addListener(ConnectionStateListener listener)
116     {
117         listeners.add(listener);
118     }
119 
120     public void assertInputOpen() throws IOException
121     {
122         if (!isInputAvailable())
123         {
124             throw new IOException("Connection input is closed");
125         }
126     }
127 
128     public void assertOutputOpen() throws IOException
129     {
130         if (!isOutputAvailable())
131         {
132             throw new IOException("Connection output is closed");
133         }
134     }
135 
136     public CloseInfo getCloseInfo()
137     {
138         CloseInfo ci = finalClose.get();
139         if (ci != null)
140         {
141             return ci;
142         }
143         return closeInfo;
144     }
145 
146     public ConnectionState getConnectionState()
147     {
148         return state;
149     }
150 
151     public boolean isClosed()
152     {
153         synchronized (this)
154         {
155             return (state == ConnectionState.CLOSED);
156         }
157     }
158 
159     public boolean isInputAvailable()
160     {
161         return inputAvailable;
162     }
163 
164     public boolean isOpen()
165     {
166         return !isClosed();
167     }
168 
169     public boolean isOutputAvailable()
170     {
171         return outputAvailable;
172     }
173 
174     private void notifyStateListeners(ConnectionState state)
175     {
176         if (LOG.isDebugEnabled())
177             LOG.debug("Notify State Listeners: {}",state);
178         for (ConnectionStateListener listener : listeners)
179         {
180             if (LOG.isDebugEnabled())
181             {
182                 LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
183             }
184             listener.onConnectionStateChange(state);
185         }
186     }
187 
188     /**
189      * A websocket connection has been disconnected for abnormal close reasons.
190      * <p>
191      * This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout.
192      * @param close the close information
193      */
194     public void onAbnormalClose(CloseInfo close)
195     {
196         if (LOG.isDebugEnabled())
197             LOG.debug("onAbnormalClose({})",close);
198         ConnectionState event = null;
199         synchronized (this)
200         {
201             if (this.state == ConnectionState.CLOSED)
202             {
203                 // already closed
204                 return;
205             }
206 
207             if (this.state == ConnectionState.OPEN)
208             {
209                 this.cleanClose = false;
210             }
211 
212             this.state = ConnectionState.CLOSED;
213             finalClose.compareAndSet(null,close);
214             this.inputAvailable = false;
215             this.outputAvailable = false;
216             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
217             event = this.state;
218         }
219         notifyStateListeners(event);
220     }
221 
222     /**
223      * A close handshake has been issued from the local endpoint
224      * @param closeInfo the close information
225      */
226     public void onCloseLocal(CloseInfo closeInfo)
227     {
228         boolean open = false;
229         synchronized (this)
230         {
231             ConnectionState initialState = this.state;
232             if (LOG.isDebugEnabled())
233                 LOG.debug("onCloseLocal({}) : {}", closeInfo, initialState);
234             if (initialState == ConnectionState.CLOSED)
235             {
236                 // already closed
237                 if (LOG.isDebugEnabled())
238                     LOG.debug("already closed");
239                 return;
240             }
241 
242             if (initialState == ConnectionState.CONNECTED)
243             {
244                 // fast close. a local close request from end-user onConnect/onOpen method
245                 if (LOG.isDebugEnabled())
246                     LOG.debug("FastClose in CONNECTED detected");
247                 open = true;
248             }
249         }
250 
251         if (open)
252             openAndCloseLocal(closeInfo);
253         else
254             closeLocal(closeInfo);
255     }
256 
257     private void openAndCloseLocal(CloseInfo closeInfo)
258     {
259         // Force the state open (to allow read/write to endpoint)
260         onOpened();
261         if (LOG.isDebugEnabled())
262             LOG.debug("FastClose continuing with Closure");
263         closeLocal(closeInfo);
264     }
265 
266     private void closeLocal(CloseInfo closeInfo)
267     {
268         ConnectionState event = null;
269         ConnectionState abnormalEvent = null;
270         synchronized (this)
271         {
272             if (LOG.isDebugEnabled())
273                 LOG.debug("onCloseLocal(), input={}, output={}", inputAvailable, outputAvailable);
274 
275             this.closeInfo = closeInfo;
276 
277             // Turn off further output.
278             outputAvailable = false;
279 
280             if (closeHandshakeSource == CloseHandshakeSource.NONE)
281             {
282                 closeHandshakeSource = CloseHandshakeSource.LOCAL;
283             }
284 
285             if (!inputAvailable)
286             {
287                 if (LOG.isDebugEnabled())
288                     LOG.debug("Close Handshake satisfied, disconnecting");
289                 cleanClose = true;
290                 this.state = ConnectionState.CLOSED;
291                 finalClose.compareAndSet(null,closeInfo);
292                 event = this.state;
293             }
294             else if (this.state == ConnectionState.OPEN)
295             {
296                 // We are now entering CLOSING (or half-closed).
297                 this.state = ConnectionState.CLOSING;
298                 event = this.state;
299 
300                 // If abnormal, we don't expect an answer.
301                 if (closeInfo.isAbnormal())
302                 {
303                     abnormalEvent = ConnectionState.CLOSED;
304                     finalClose.compareAndSet(null,closeInfo);
305                     cleanClose = false;
306                     outputAvailable = false;
307                     inputAvailable = false;
308                     closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
309                 }
310             }
311         }
312 
313         // Only notify on state change events
314         if (event != null)
315         {
316             notifyStateListeners(event);
317             if (abnormalEvent != null)
318             {
319                 notifyStateListeners(abnormalEvent);
320             }
321         }
322     }
323 
324     /**
325      * A close handshake has been received from the remote endpoint
326      * @param closeInfo the close information
327      */
328     public void onCloseRemote(CloseInfo closeInfo)
329     {
330         if (LOG.isDebugEnabled())
331             LOG.debug("onCloseRemote({})", closeInfo);
332         ConnectionState event = null;
333         synchronized (this)
334         {
335             if (this.state == ConnectionState.CLOSED)
336             {
337                 // already closed
338                 return;
339             }
340 
341             if (LOG.isDebugEnabled())
342                 LOG.debug("onCloseRemote(), input={}, output={}", inputAvailable, outputAvailable);
343 
344             this.closeInfo = closeInfo;
345 
346             // turn off further input
347             inputAvailable = false;
348 
349             if (closeHandshakeSource == CloseHandshakeSource.NONE)
350             {
351                 closeHandshakeSource = CloseHandshakeSource.REMOTE;
352             }
353 
354             if (!outputAvailable)
355             {
356                 LOG.debug("Close Handshake satisfied, disconnecting");
357                 cleanClose = true;
358                 state = ConnectionState.CLOSED;
359                 finalClose.compareAndSet(null,closeInfo);
360                 event = this.state;
361             }
362             else if (this.state == ConnectionState.OPEN)
363             {
364                 // We are now entering CLOSING (or half-closed)
365                 this.state = ConnectionState.CLOSING;
366                 event = this.state;
367             }
368         }
369 
370         // Only notify on state change events
371         if (event != null)
372         {
373             notifyStateListeners(event);
374         }
375     }
376 
377     /**
378      * WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet.
379      * <p>
380      * This is an intermediate state between the RFC's {@link ConnectionState#CONNECTING} and {@link ConnectionState#OPEN}
381      */
382     public void onConnected()
383     {
384         ConnectionState event = null;
385         synchronized (this)
386         {
387             if (this.state != ConnectionState.CONNECTING)
388             {
389                 LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
390                 return;
391             }
392 
393             this.state = ConnectionState.CONNECTED;
394             inputAvailable = false; // cannot read (yet)
395             outputAvailable = true; // write allowed
396             event = this.state;
397         }
398         notifyStateListeners(event);
399     }
400 
401     /**
402      * A websocket connection has failed its upgrade handshake, and is now closed.
403      */
404     public void onFailedUpgrade()
405     {
406         assert (this.state == ConnectionState.CONNECTING);
407         ConnectionState event = null;
408         synchronized (this)
409         {
410             this.state = ConnectionState.CLOSED;
411             cleanClose = false;
412             inputAvailable = false;
413             outputAvailable = false;
414             event = this.state;
415         }
416         notifyStateListeners(event);
417     }
418 
419     /**
420      * A websocket connection has finished its upgrade handshake, and is now open.
421      */
422     public void onOpened()
423     {
424         if(LOG.isDebugEnabled())
425             LOG.debug(" onOpened()");
426 
427         ConnectionState event = null;
428         synchronized (this)
429         {
430             if (this.state == ConnectionState.OPEN)
431             {
432                 // already opened
433                 return;
434             }
435 
436             if (this.state != ConnectionState.CONNECTED)
437             {
438                 LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
439                 return;
440             }
441 
442             this.state = ConnectionState.OPEN;
443             this.inputAvailable = true;
444             this.outputAvailable = true;
445             event = this.state;
446         }
447         notifyStateListeners(event);
448     }
449 
450     /**
451      * The local endpoint has reached a read failure.
452      * <p>
453      * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect.
454      * @param t the read failure
455      */
456     public void onReadFailure(Throwable t)
457     {
458         ConnectionState event = null;
459         synchronized (this)
460         {
461             if (this.state == ConnectionState.CLOSED)
462             {
463                 // already closed
464                 return;
465             }
466 
467          // Build out Close Reason
468             String reason = "WebSocket Read Failure";
469             if (t instanceof EOFException)
470             {
471                 reason = "WebSocket Read EOF";
472                 Throwable cause = t.getCause();
473                 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
474                 {
475                     reason = "EOF: " + cause.getMessage();
476                 }
477             }
478             else
479             {
480                 if (StringUtil.isNotBlank(t.getMessage()))
481                 {
482                     reason = t.getMessage();
483                 }
484             }
485 
486             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
487 
488             finalClose.compareAndSet(null,close);
489 
490             this.cleanClose = false;
491             this.state = ConnectionState.CLOSED;
492             this.closeInfo = close;
493             this.inputAvailable = false;
494             this.outputAvailable = false;
495             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
496             event = this.state;
497         }
498         notifyStateListeners(event);
499     }
500 
501     /**
502      * The local endpoint has reached a write failure.
503      * <p>
504      * A low level I/O failure, or even a jetty side EndPoint close (from idle timeout) are a few scenarios
505      * @param t the throwable that caused the write failure
506      */
507     public void onWriteFailure(Throwable t)
508     {
509         ConnectionState event = null;
510         synchronized (this)
511         {
512             if (this.state == ConnectionState.CLOSED)
513             {
514                 // already closed
515                 return;
516             }
517 
518             // Build out Close Reason
519             String reason = "WebSocket Write Failure";
520             if (t instanceof EOFException)
521             {
522                 reason = "WebSocket Write EOF";
523                 Throwable cause = t.getCause();
524                 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
525                 {
526                     reason = "EOF: " + cause.getMessage();
527                 }
528             }
529             else
530             {
531                 if (StringUtil.isNotBlank(t.getMessage()))
532                 {
533                     reason = t.getMessage();
534                 }
535             }
536 
537             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
538 
539             finalClose.compareAndSet(null,close);
540 
541             this.cleanClose = false;
542             this.state = ConnectionState.CLOSED;
543             this.inputAvailable = false;
544             this.outputAvailable = false;
545             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
546             event = this.state;
547         }
548         notifyStateListeners(event);
549     }
550 
551     public void onDisconnected()
552     {
553         ConnectionState event = null;
554         synchronized (this)
555         {
556             if (this.state == ConnectionState.CLOSED)
557             {
558                 // already closed
559                 return;
560             }
561 
562             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
563 
564             this.cleanClose = false;
565             this.state = ConnectionState.CLOSED;
566             this.closeInfo = close;
567             this.inputAvailable = false;
568             this.outputAvailable = false;
569             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
570             event = this.state;
571         }
572         notifyStateListeners(event);
573     }
574 
575     @Override
576     public String toString()
577     {
578         StringBuilder str = new StringBuilder();
579         str.append(this.getClass().getSimpleName());
580         str.append("@").append(Integer.toHexString(hashCode()));
581         str.append("[").append(state);
582         str.append(',');
583         if (!inputAvailable)
584         {
585             str.append('!');
586         }
587         str.append("in,");
588         if (!outputAvailable)
589         {
590             str.append('!');
591         }
592         str.append("out");
593         if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
594         {
595             CloseInfo ci = finalClose.get();
596             if (ci != null)
597             {
598                 str.append(",finalClose=").append(ci);
599             }
600             else
601             {
602                 str.append(",close=").append(closeInfo);
603             }
604             str.append(",clean=").append(cleanClose);
605             str.append(",closeSource=").append(closeHandshakeSource);
606         }
607         str.append(']');
608         return str.toString();
609     }
610 
611     public boolean wasAbnormalClose()
612     {
613         return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
614     }
615 
616     public boolean wasCleanClose()
617     {
618         return cleanClose;
619     }
620 
621     public boolean wasLocalCloseInitiated()
622     {
623         return closeHandshakeSource == CloseHandshakeSource.LOCAL;
624     }
625 
626     public boolean wasRemoteCloseInitiated()
627     {
628         return closeHandshakeSource == CloseHandshakeSource.REMOTE;
629     }
630 
631 }