View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.util.List;
24  import java.util.concurrent.CopyOnWriteArrayList;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.eclipse.jetty.util.StringUtil;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.websocket.api.CloseStatus;
31  import org.eclipse.jetty.websocket.api.StatusCode;
32  import org.eclipse.jetty.websocket.common.CloseInfo;
33  import org.eclipse.jetty.websocket.common.ConnectionState;
34  
35  /**
36   * Simple state tracker for Input / Output and {@link ConnectionState}.
37   * <p>
38   * Use the various known .on*() methods to trigger a state change.
39   * <ul>
40   * <li>{@link #onOpened()} - connection has been opened</li>
41   * </ul>
42   */
43  public class IOState
44  {
45      /**
46       * The source of a close handshake. (ie: who initiated it).
47       */
48      private static enum CloseHandshakeSource
49      {
50          /** No close handshake initiated (yet) */
51          NONE,
52          /** Local side initiated the close handshake */
53          LOCAL,
54          /** Remote side initiated the close handshake */
55          REMOTE,
56          /** An abnormal close situation (disconnect, timeout, etc...) */
57          ABNORMAL;
58      }
59  
60      public static interface ConnectionStateListener
61      {
62          public void onConnectionStateChange(ConnectionState state);
63      }
64  
65      private static final Logger LOG = Log.getLogger(IOState.class);
66      private ConnectionState state;
67      private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
68  
69      /** 
70       * Is input on websocket available (for reading frames).
71       * Used to determine close handshake completion, and track half-close states
72       */
73      private boolean inputAvailable;
74      /** 
75       * Is output on websocket available (for writing frames).
76       * Used to determine close handshake completion, and track half-closed states.
77       */
78      private boolean outputAvailable;
79      /** 
80       * Initiator of the close handshake.
81       * Used to determine who initiated a close handshake for reply reasons.
82       */
83      private CloseHandshakeSource closeHandshakeSource;
84      /**
85       * The close info for the initiator of the close handshake.
86       * It is possible in abnormal close scenarios to have a different
87       * final close info that is used to notify the WS-Endpoint's onClose()
88       * events with.
89       */
90      private CloseInfo closeInfo;
91      /**
92       * Atomic reference to the final close info.
93       * This can only be set once, and is used for the WS-Endpoint's onClose()
94       * event.
95       */
96      private AtomicReference<CloseInfo> finalClose = new AtomicReference<>();
97      /**
98       * Tracker for if the close handshake was completed successfully by
99       * both sides.  False if close was sudden or abnormal.
100      */
101     private boolean cleanClose;
102 
103     /**
104      * Create a new IOState, initialized to {@link ConnectionState#CONNECTING}
105      */
106     public IOState()
107     {
108         this.state = ConnectionState.CONNECTING;
109         this.inputAvailable = false;
110         this.outputAvailable = false;
111         this.closeHandshakeSource = CloseHandshakeSource.NONE;
112         this.closeInfo = null;
113         this.cleanClose = false;
114     }
115 
116     public void addListener(ConnectionStateListener listener)
117     {
118         listeners.add(listener);
119     }
120 
121     public void assertInputOpen() throws IOException
122     {
123         if (!isInputAvailable())
124         {
125             throw new IOException("Connection input is closed");
126         }
127     }
128 
129     public void assertOutputOpen() throws IOException
130     {
131         if (!isOutputAvailable())
132         {
133             throw new IOException("Connection output is closed");
134         }
135     }
136 
137     public CloseInfo getCloseInfo()
138     {
139         CloseInfo ci = finalClose.get();
140         if (ci != null)
141         {
142             return ci;
143         }
144         return closeInfo;
145     }
146 
147     public ConnectionState getConnectionState()
148     {
149         return state;
150     }
151 
152     public boolean isClosed()
153     {
154         synchronized (state)
155         {
156             return (state == ConnectionState.CLOSED);
157         }
158     }
159 
160     public boolean isInputAvailable()
161     {
162         return inputAvailable;
163     }
164 
165     public boolean isOpen()
166     {
167         return (getConnectionState() != ConnectionState.CLOSED);
168     }
169 
170     public boolean isOutputAvailable()
171     {
172         return outputAvailable;
173     }
174 
175     private void notifyStateListeners(ConnectionState state)
176     {
177         LOG.debug("Notify State Listeners: {}",state);
178         for (ConnectionStateListener listener : listeners)
179         {
180             if (LOG.isDebugEnabled())
181             {
182                 LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
183             }
184             listener.onConnectionStateChange(state);
185         }
186     }
187 
188     /**
189      * A websocket connection has been disconnected for abnormal close reasons.
190      * <p>
191      * This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout.
192      */
193     public void onAbnormalClose(CloseInfo close)
194     {
195         LOG.debug("onAbnormalClose({})",close);
196         ConnectionState event = null;
197         synchronized (this)
198         {
199             if (this.state == ConnectionState.CLOSED)
200             {
201                 // already closed
202                 return;
203             }
204 
205             if (this.state == ConnectionState.OPEN)
206             {
207                 this.cleanClose = false;
208             }
209 
210             this.state = ConnectionState.CLOSED;
211             finalClose.compareAndSet(null,close);
212             this.inputAvailable = false;
213             this.outputAvailable = false;
214             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
215             event = this.state;
216         }
217         notifyStateListeners(event);
218     }
219 
220     /**
221      * A close handshake has been issued from the local endpoint
222      */
223     public void onCloseLocal(CloseInfo close)
224     {
225         ConnectionState event = null;
226         ConnectionState abnormalEvent = null;
227         ConnectionState initialState = this.state;
228         LOG.debug("onCloseLocal({}) : {}",close,initialState);
229         if (initialState == ConnectionState.CLOSED)
230         {
231             // already closed
232             LOG.debug("already closed");
233             return;
234         }
235 
236         if (initialState == ConnectionState.CONNECTED)
237         {
238             // fast close. a local close request from end-user onConnect/onOpen method
239             LOG.debug("FastClose in CONNECTED detected");
240             // Force the state open (to allow read/write to endpoint)
241             onOpened();
242             LOG.debug("FastClose continuing with Closure");
243         }
244 
245         synchronized (this)
246         {
247             closeInfo = close;
248 
249             boolean in = inputAvailable;
250             boolean out = outputAvailable;
251             if (closeHandshakeSource == CloseHandshakeSource.NONE)
252             {
253                 closeHandshakeSource = CloseHandshakeSource.LOCAL;
254             }
255             out = false;
256             outputAvailable = false;
257 
258             LOG.debug("onCloseLocal(), input={}, output={}",in,out);
259 
260             if (!in && !out)
261             {
262                 LOG.debug("Close Handshake satisfied, disconnecting");
263                 cleanClose = true;
264                 this.state = ConnectionState.CLOSED;
265                 finalClose.compareAndSet(null,close);
266                 event = this.state;
267             }
268             else if (this.state == ConnectionState.OPEN)
269             {
270                 // We are now entering CLOSING (or half-closed)
271                 this.state = ConnectionState.CLOSING;
272                 event = this.state;
273                 
274                 // if abnormal, we don't expect an answer.
275                 if (close.isAbnormal())
276                 {
277                     abnormalEvent = ConnectionState.CLOSED;
278                     finalClose.compareAndSet(null,close);
279                     cleanClose = false;
280                     outputAvailable = false;
281                     inputAvailable = false;
282                     closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
283                 }
284             }
285         }
286 
287         // Only notify on state change events
288         if (event != null)
289         {
290             notifyStateListeners(event);
291             
292             if(abnormalEvent != null) {
293                 notifyStateListeners(abnormalEvent);
294             }
295         }
296     }
297 
298     /**
299      * A close handshake has been received from the remote endpoint
300      */
301     public void onCloseRemote(CloseInfo close)
302     {
303         LOG.debug("onCloseRemote({})",close);
304         ConnectionState event = null;
305         synchronized (this)
306         {
307             if (this.state == ConnectionState.CLOSED)
308             {
309                 // already closed
310                 return;
311             }
312 
313             closeInfo = close;
314 
315             boolean in = inputAvailable;
316             boolean out = outputAvailable;
317             if (closeHandshakeSource == CloseHandshakeSource.NONE)
318             {
319                 closeHandshakeSource = CloseHandshakeSource.REMOTE;
320             }
321             in = false;
322             inputAvailable = false;
323 
324             LOG.debug("onCloseRemote(), input={}, output={}",in,out);
325 
326             if (!in && !out)
327             {
328                 LOG.debug("Close Handshake satisfied, disconnecting");
329                 cleanClose = true;
330                 state = ConnectionState.CLOSED;
331                 finalClose.compareAndSet(null,close);
332                 event = this.state;
333             }
334             else if (this.state == ConnectionState.OPEN)
335             {
336                 // We are now entering CLOSING (or half-closed)
337                 this.state = ConnectionState.CLOSING;
338                 event = this.state;
339             }
340         }
341 
342         // Only notify on state change events
343         if (event != null)
344         {
345             notifyStateListeners(event);
346         }
347     }
348 
349     /**
350      * WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet.
351      * <p>
352      * This is an intermediate state between the RFC's {@link ConnectionState#CONNECTING} and {@link ConnectionState#OPEN}
353      */
354     public void onConnected()
355     {
356         ConnectionState event = null;
357         synchronized (this)
358         {
359             if (this.state != ConnectionState.CONNECTING)
360             {
361                 LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
362                 return;
363             }
364 
365             this.state = ConnectionState.CONNECTED;
366             inputAvailable = false; // cannot read (yet)
367             outputAvailable = true; // write allowed
368             event = this.state;
369         }
370         notifyStateListeners(event);
371     }
372 
373     /**
374      * A websocket connection has failed its upgrade handshake, and is now closed.
375      */
376     public void onFailedUpgrade()
377     {
378         assert (this.state == ConnectionState.CONNECTING);
379         ConnectionState event = null;
380         synchronized (this)
381         {
382             this.state = ConnectionState.CLOSED;
383             cleanClose = false;
384             inputAvailable = false;
385             outputAvailable = false;
386             event = this.state;
387         }
388         notifyStateListeners(event);
389     }
390 
391     /**
392      * A websocket connection has finished its upgrade handshake, and is now open.
393      */
394     public void onOpened()
395     {
396         ConnectionState event = null;
397         synchronized (this)
398         {
399             if (this.state == ConnectionState.OPEN)
400             {
401                 // already opened
402                 return;
403             }
404 
405             if (this.state != ConnectionState.CONNECTED)
406             {
407                 LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
408                 return;
409             }
410 
411             this.state = ConnectionState.OPEN;
412             this.inputAvailable = true;
413             this.outputAvailable = true;
414             event = this.state;
415         }
416         notifyStateListeners(event);
417     }
418 
419     /**
420      * The local endpoint has reached a read failure.
421      * <p>
422      * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect.
423      */
424     public void onReadFailure(Throwable t)
425     {
426         ConnectionState event = null;
427         synchronized (this)
428         {
429             if (this.state == ConnectionState.CLOSED)
430             {
431                 // already closed
432                 return;
433             }
434 
435          // Build out Close Reason
436             String reason = "WebSocket Read Failure";
437             if (t instanceof EOFException)
438             {
439                 reason = "WebSocket Read EOF";
440                 Throwable cause = t.getCause();
441                 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
442                 {
443                     reason = "EOF: " + cause.getMessage();
444                 }
445             }
446             else
447             {
448                 if (StringUtil.isNotBlank(t.getMessage()))
449                 {
450                     reason = t.getMessage();
451                 }
452             }
453 
454             reason = CloseStatus.trimMaxReasonLength(reason);
455             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
456 
457             finalClose.compareAndSet(null,close);
458 
459             this.cleanClose = false;
460             this.state = ConnectionState.CLOSED;
461             this.closeInfo = close;
462             this.inputAvailable = false;
463             this.outputAvailable = false;
464             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
465             event = this.state;
466         }
467         notifyStateListeners(event);
468     }
469 
470     /**
471      * The local endpoint has reached a write failure.
472      * <p>
473      * A low level I/O failure, or even a jetty side EndPoint close (from idle timeout) are a few scenarios
474      */
475     public void onWriteFailure(Throwable t)
476     {
477         ConnectionState event = null;
478         synchronized (this)
479         {
480             if (this.state == ConnectionState.CLOSED)
481             {
482                 // already closed
483                 return;
484             }
485 
486             // Build out Close Reason
487             String reason = "WebSocket Write Failure";
488             if (t instanceof EOFException)
489             {
490                 reason = "WebSocket Write EOF";
491                 Throwable cause = t.getCause();
492                 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
493                 {
494                     reason = "EOF: " + cause.getMessage();
495                 }
496             }
497             else
498             {
499                 if (StringUtil.isNotBlank(t.getMessage()))
500                 {
501                     reason = t.getMessage();
502                 }
503             }
504 
505             reason = CloseStatus.trimMaxReasonLength(reason);
506             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
507 
508             finalClose.compareAndSet(null,close);
509 
510             this.cleanClose = false;
511             this.state = ConnectionState.CLOSED;
512             this.inputAvailable = false;
513             this.outputAvailable = false;
514             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
515             event = this.state;
516         }
517         notifyStateListeners(event);
518     }
519 
520     public void onDisconnected()
521     {
522         ConnectionState event = null;
523         synchronized (this)
524         {
525             if (this.state == ConnectionState.CLOSED)
526             {
527                 // already closed
528                 return;
529             }
530 
531             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
532 
533             this.cleanClose = false;
534             this.state = ConnectionState.CLOSED;
535             this.closeInfo = close;
536             this.inputAvailable = false;
537             this.outputAvailable = false;
538             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
539             event = this.state;
540         }
541         notifyStateListeners(event);
542     }
543 
544     @Override
545     public String toString()
546     {
547         StringBuilder str = new StringBuilder();
548         str.append(this.getClass().getSimpleName());
549         str.append("@").append(Integer.toHexString(hashCode()));
550         str.append("[").append(state);
551         str.append(',');
552         if (!inputAvailable)
553         {
554             str.append('!');
555         }
556         str.append("in,");
557         if (!outputAvailable)
558         {
559             str.append('!');
560         }
561         str.append("out");
562         if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
563         {
564             CloseInfo ci = finalClose.get();
565             if (ci != null)
566             {
567                 str.append(",finalClose=").append(ci);
568             }
569             else
570             {
571                 str.append(",close=").append(closeInfo);
572             }
573             str.append(",clean=").append(cleanClose);
574             str.append(",closeSource=").append(closeHandshakeSource);
575         }
576         str.append(']');
577         return str.toString();
578     }
579 
580     public boolean wasAbnormalClose()
581     {
582         return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
583     }
584 
585     public boolean wasCleanClose()
586     {
587         return cleanClose;
588     }
589 
590     public boolean wasLocalCloseInitiated()
591     {
592         return closeHandshakeSource == CloseHandshakeSource.LOCAL;
593     }
594 
595     public boolean wasRemoteCloseInitiated()
596     {
597         return closeHandshakeSource == CloseHandshakeSource.REMOTE;
598     }
599 
600 }