View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.util.List;
24  import java.util.concurrent.CopyOnWriteArrayList;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.eclipse.jetty.util.StringUtil;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.websocket.api.CloseStatus;
31  import org.eclipse.jetty.websocket.api.StatusCode;
32  import org.eclipse.jetty.websocket.common.CloseInfo;
33  import org.eclipse.jetty.websocket.common.ConnectionState;
34  
35  /**
36   * Simple state tracker for Input / Output and {@link ConnectionState}.
37   * <p>
38   * Use the various known .on*() methods to trigger a state change.
39   * <ul>
40   * <li>{@link #onOpened()} - connection has been opened</li>
41   * </ul>
42   */
43  public class IOState
44  {
45      /**
46       * The source of a close handshake. (ie: who initiated it).
47       */
48      private static enum CloseHandshakeSource
49      {
50          /** No close handshake initiated (yet) */
51          NONE,
52          /** Local side initiated the close handshake */
53          LOCAL,
54          /** Remote side initiated the close handshake */
55          REMOTE,
56          /** An abnormal close situation (disconnect, timeout, etc...) */
57          ABNORMAL;
58      }
59  
60      public static interface ConnectionStateListener
61      {
62          public void onConnectionStateChange(ConnectionState state);
63      }
64  
65      private static final Logger LOG = Log.getLogger(IOState.class);
66      private ConnectionState state;
67      private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
68  
69      /** 
70       * Is input on websocket available (for reading frames).
71       * Used to determine close handshake completion, and track half-close states
72       */
73      private boolean inputAvailable;
74      /** 
75       * Is output on websocket available (for writing frames).
76       * Used to determine close handshake completion, and track half-closed states.
77       */
78      private boolean outputAvailable;
79      /** 
80       * Initiator of the close handshake.
81       * Used to determine who initiated a close handshake for reply reasons.
82       */
83      private CloseHandshakeSource closeHandshakeSource;
84      /**
85       * The close info for the initiator of the close handshake.
86       * It is possible in abnormal close scenarios to have a different
87       * final close info that is used to notify the WS-Endpoint's onClose()
88       * events with.
89       */
90      private CloseInfo closeInfo;
91      /**
92       * Atomic reference to the final close info.
93       * This can only be set once, and is used for the WS-Endpoint's onClose()
94       * event.
95       */
96      private AtomicReference<CloseInfo> finalClose = new AtomicReference<>();
97      /**
98       * Tracker for if the close handshake was completed successfully by
99       * both sides.  False if close was sudden or abnormal.
100      */
101     private boolean cleanClose;
102 
103     /**
104      * Create a new IOState, initialized to {@link ConnectionState#CONNECTING}
105      */
106     public IOState()
107     {
108         this.state = ConnectionState.CONNECTING;
109         this.inputAvailable = false;
110         this.outputAvailable = false;
111         this.closeHandshakeSource = CloseHandshakeSource.NONE;
112         this.closeInfo = null;
113         this.cleanClose = false;
114     }
115 
116     public void addListener(ConnectionStateListener listener)
117     {
118         listeners.add(listener);
119     }
120 
121     public void assertInputOpen() throws IOException
122     {
123         if (!isInputAvailable())
124         {
125             throw new IOException("Connection input is closed");
126         }
127     }
128 
129     public void assertOutputOpen() throws IOException
130     {
131         if (!isOutputAvailable())
132         {
133             throw new IOException("Connection output is closed");
134         }
135     }
136 
137     public CloseInfo getCloseInfo()
138     {
139         CloseInfo ci = finalClose.get();
140         if (ci != null)
141         {
142             return ci;
143         }
144         return closeInfo;
145     }
146 
147     public ConnectionState getConnectionState()
148     {
149         return state;
150     }
151 
152     public boolean isClosed()
153     {
154         synchronized (state)
155         {
156             return (state == ConnectionState.CLOSED);
157         }
158     }
159 
160     public boolean isInputAvailable()
161     {
162         return inputAvailable;
163     }
164 
165     public boolean isOpen()
166     {
167         return (getConnectionState() != ConnectionState.CLOSED);
168     }
169 
170     public boolean isOutputAvailable()
171     {
172         return outputAvailable;
173     }
174 
175     private void notifyStateListeners(ConnectionState state)
176     {
177         if (LOG.isDebugEnabled())
178             LOG.debug("Notify State Listeners: {}",state);
179         for (ConnectionStateListener listener : listeners)
180         {
181             if (LOG.isDebugEnabled())
182             {
183                 LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
184             }
185             listener.onConnectionStateChange(state);
186         }
187     }
188 
189     /**
190      * A websocket connection has been disconnected for abnormal close reasons.
191      * <p>
192      * This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout.
193      */
194     public void onAbnormalClose(CloseInfo close)
195     {
196         if (LOG.isDebugEnabled())
197             LOG.debug("onAbnormalClose({})",close);
198         ConnectionState event = null;
199         synchronized (this)
200         {
201             if (this.state == ConnectionState.CLOSED)
202             {
203                 // already closed
204                 return;
205             }
206 
207             if (this.state == ConnectionState.OPEN)
208             {
209                 this.cleanClose = false;
210             }
211 
212             this.state = ConnectionState.CLOSED;
213             finalClose.compareAndSet(null,close);
214             this.inputAvailable = false;
215             this.outputAvailable = false;
216             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
217             event = this.state;
218         }
219         notifyStateListeners(event);
220     }
221 
222     /**
223      * A close handshake has been issued from the local endpoint
224      */
225     public void onCloseLocal(CloseInfo close)
226     {
227         ConnectionState event = null;
228         ConnectionState abnormalEvent = null;
229         ConnectionState initialState = this.state;
230         if (LOG.isDebugEnabled())
231             LOG.debug("onCloseLocal({}) : {}",close,initialState);
232         if (initialState == ConnectionState.CLOSED)
233         {
234             // already closed
235             LOG.debug("already closed");
236             return;
237         }
238 
239         if (initialState == ConnectionState.CONNECTED)
240         {
241             // fast close. a local close request from end-user onConnect/onOpen method
242             LOG.debug("FastClose in CONNECTED detected");
243             // Force the state open (to allow read/write to endpoint)
244             onOpened();
245             if (LOG.isDebugEnabled())
246                 LOG.debug("FastClose continuing with Closure");
247         }
248 
249         synchronized (this)
250         {
251             closeInfo = close;
252 
253             boolean in = inputAvailable;
254             boolean out = outputAvailable;
255             if (closeHandshakeSource == CloseHandshakeSource.NONE)
256             {
257                 closeHandshakeSource = CloseHandshakeSource.LOCAL;
258             }
259             out = false;
260             outputAvailable = false;
261 
262             LOG.debug("onCloseLocal(), input={}, output={}",in,out);
263 
264             if (!in && !out)
265             {
266                 LOG.debug("Close Handshake satisfied, disconnecting");
267                 cleanClose = true;
268                 this.state = ConnectionState.CLOSED;
269                 finalClose.compareAndSet(null,close);
270                 event = this.state;
271             }
272             else if (this.state == ConnectionState.OPEN)
273             {
274                 // We are now entering CLOSING (or half-closed)
275                 this.state = ConnectionState.CLOSING;
276                 event = this.state;
277                 
278                 // if abnormal, we don't expect an answer.
279                 if (close.isAbnormal())
280                 {
281                     abnormalEvent = ConnectionState.CLOSED;
282                     finalClose.compareAndSet(null,close);
283                     cleanClose = false;
284                     outputAvailable = false;
285                     inputAvailable = false;
286                     closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
287                 }
288             }
289         }
290 
291         // Only notify on state change events
292         if (event != null)
293         {
294             notifyStateListeners(event);
295             
296             if(abnormalEvent != null) {
297                 notifyStateListeners(abnormalEvent);
298             }
299         }
300     }
301 
302     /**
303      * A close handshake has been received from the remote endpoint
304      */
305     public void onCloseRemote(CloseInfo close)
306     {
307         if (LOG.isDebugEnabled())
308             LOG.debug("onCloseRemote({})",close);
309         ConnectionState event = null;
310         synchronized (this)
311         {
312             if (this.state == ConnectionState.CLOSED)
313             {
314                 // already closed
315                 return;
316             }
317 
318             closeInfo = close;
319 
320             boolean in = inputAvailable;
321             boolean out = outputAvailable;
322             if (closeHandshakeSource == CloseHandshakeSource.NONE)
323             {
324                 closeHandshakeSource = CloseHandshakeSource.REMOTE;
325             }
326             in = false;
327             inputAvailable = false;
328 
329             if (LOG.isDebugEnabled())
330                 LOG.debug("onCloseRemote(), input={}, output={}",in,out);
331 
332             if (!in && !out)
333             {
334                 LOG.debug("Close Handshake satisfied, disconnecting");
335                 cleanClose = true;
336                 state = ConnectionState.CLOSED;
337                 finalClose.compareAndSet(null,close);
338                 event = this.state;
339             }
340             else if (this.state == ConnectionState.OPEN)
341             {
342                 // We are now entering CLOSING (or half-closed)
343                 this.state = ConnectionState.CLOSING;
344                 event = this.state;
345             }
346         }
347 
348         // Only notify on state change events
349         if (event != null)
350         {
351             notifyStateListeners(event);
352         }
353     }
354 
355     /**
356      * WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet.
357      * <p>
358      * This is an intermediate state between the RFC's {@link ConnectionState#CONNECTING} and {@link ConnectionState#OPEN}
359      */
360     public void onConnected()
361     {
362         ConnectionState event = null;
363         synchronized (this)
364         {
365             if (this.state != ConnectionState.CONNECTING)
366             {
367                 LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
368                 return;
369             }
370 
371             this.state = ConnectionState.CONNECTED;
372             inputAvailable = false; // cannot read (yet)
373             outputAvailable = true; // write allowed
374             event = this.state;
375         }
376         notifyStateListeners(event);
377     }
378 
379     /**
380      * A websocket connection has failed its upgrade handshake, and is now closed.
381      */
382     public void onFailedUpgrade()
383     {
384         assert (this.state == ConnectionState.CONNECTING);
385         ConnectionState event = null;
386         synchronized (this)
387         {
388             this.state = ConnectionState.CLOSED;
389             cleanClose = false;
390             inputAvailable = false;
391             outputAvailable = false;
392             event = this.state;
393         }
394         notifyStateListeners(event);
395     }
396 
397     /**
398      * A websocket connection has finished its upgrade handshake, and is now open.
399      */
400     public void onOpened()
401     {
402         ConnectionState event = null;
403         synchronized (this)
404         {
405             if (this.state == ConnectionState.OPEN)
406             {
407                 // already opened
408                 return;
409             }
410 
411             if (this.state != ConnectionState.CONNECTED)
412             {
413                 LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
414                 return;
415             }
416 
417             this.state = ConnectionState.OPEN;
418             this.inputAvailable = true;
419             this.outputAvailable = true;
420             event = this.state;
421         }
422         notifyStateListeners(event);
423     }
424 
425     /**
426      * The local endpoint has reached a read failure.
427      * <p>
428      * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect.
429      */
430     public void onReadFailure(Throwable t)
431     {
432         ConnectionState event = null;
433         synchronized (this)
434         {
435             if (this.state == ConnectionState.CLOSED)
436             {
437                 // already closed
438                 return;
439             }
440 
441          // Build out Close Reason
442             String reason = "WebSocket Read Failure";
443             if (t instanceof EOFException)
444             {
445                 reason = "WebSocket Read EOF";
446                 Throwable cause = t.getCause();
447                 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
448                 {
449                     reason = "EOF: " + cause.getMessage();
450                 }
451             }
452             else
453             {
454                 if (StringUtil.isNotBlank(t.getMessage()))
455                 {
456                     reason = t.getMessage();
457                 }
458             }
459 
460             reason = CloseStatus.trimMaxReasonLength(reason);
461             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
462 
463             finalClose.compareAndSet(null,close);
464 
465             this.cleanClose = false;
466             this.state = ConnectionState.CLOSED;
467             this.closeInfo = close;
468             this.inputAvailable = false;
469             this.outputAvailable = false;
470             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
471             event = this.state;
472         }
473         notifyStateListeners(event);
474     }
475 
476     /**
477      * The local endpoint has reached a write failure.
478      * <p>
479      * A low level I/O failure, or even a jetty side EndPoint close (from idle timeout) are a few scenarios
480      */
481     public void onWriteFailure(Throwable t)
482     {
483         ConnectionState event = null;
484         synchronized (this)
485         {
486             if (this.state == ConnectionState.CLOSED)
487             {
488                 // already closed
489                 return;
490             }
491 
492             // Build out Close Reason
493             String reason = "WebSocket Write Failure";
494             if (t instanceof EOFException)
495             {
496                 reason = "WebSocket Write EOF";
497                 Throwable cause = t.getCause();
498                 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
499                 {
500                     reason = "EOF: " + cause.getMessage();
501                 }
502             }
503             else
504             {
505                 if (StringUtil.isNotBlank(t.getMessage()))
506                 {
507                     reason = t.getMessage();
508                 }
509             }
510 
511             reason = CloseStatus.trimMaxReasonLength(reason);
512             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
513 
514             finalClose.compareAndSet(null,close);
515 
516             this.cleanClose = false;
517             this.state = ConnectionState.CLOSED;
518             this.inputAvailable = false;
519             this.outputAvailable = false;
520             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
521             event = this.state;
522         }
523         notifyStateListeners(event);
524     }
525 
526     public void onDisconnected()
527     {
528         ConnectionState event = null;
529         synchronized (this)
530         {
531             if (this.state == ConnectionState.CLOSED)
532             {
533                 // already closed
534                 return;
535             }
536 
537             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
538 
539             this.cleanClose = false;
540             this.state = ConnectionState.CLOSED;
541             this.closeInfo = close;
542             this.inputAvailable = false;
543             this.outputAvailable = false;
544             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
545             event = this.state;
546         }
547         notifyStateListeners(event);
548     }
549 
550     @Override
551     public String toString()
552     {
553         StringBuilder str = new StringBuilder();
554         str.append(this.getClass().getSimpleName());
555         str.append("@").append(Integer.toHexString(hashCode()));
556         str.append("[").append(state);
557         str.append(',');
558         if (!inputAvailable)
559         {
560             str.append('!');
561         }
562         str.append("in,");
563         if (!outputAvailable)
564         {
565             str.append('!');
566         }
567         str.append("out");
568         if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
569         {
570             CloseInfo ci = finalClose.get();
571             if (ci != null)
572             {
573                 str.append(",finalClose=").append(ci);
574             }
575             else
576             {
577                 str.append(",close=").append(closeInfo);
578             }
579             str.append(",clean=").append(cleanClose);
580             str.append(",closeSource=").append(closeHandshakeSource);
581         }
582         str.append(']');
583         return str.toString();
584     }
585 
586     public boolean wasAbnormalClose()
587     {
588         return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
589     }
590 
591     public boolean wasCleanClose()
592     {
593         return cleanClose;
594     }
595 
596     public boolean wasLocalCloseInitiated()
597     {
598         return closeHandshakeSource == CloseHandshakeSource.LOCAL;
599     }
600 
601     public boolean wasRemoteCloseInitiated()
602     {
603         return closeHandshakeSource == CloseHandshakeSource.REMOTE;
604     }
605 
606 }