View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket.common.io;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.concurrent.CopyOnWriteArrayList;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.eclipse.jetty.util.log.Log;
28  import org.eclipse.jetty.util.log.Logger;
29  import org.eclipse.jetty.websocket.api.StatusCode;
30  import org.eclipse.jetty.websocket.common.CloseInfo;
31  import org.eclipse.jetty.websocket.common.ConnectionState;
32  
33  /**
34   * Simple state tracker for Input / Output and {@link ConnectionState}.
35   * <p>
36   * Use the various known .on*() methods to trigger a state change.
37   * <ul>
38   * <li>{@link #onOpened()} - connection has been opened</li>
39   * </ul>
40   */
41  public class IOState
42  {
43      /**
44       * The source of a close handshake. (ie: who initiated it).
45       */
46      private static enum CloseHandshakeSource
47      {
48          /** No close handshake initiated (yet) */
49          NONE,
50          /** Local side initiated the close handshake */
51          LOCAL,
52          /** Remote side initiated the close handshake */
53          REMOTE,
54          /** An abnormal close situation (disconnect, timeout, etc...) */
55          ABNORMAL;
56      }
57  
58      public static interface ConnectionStateListener
59      {
60          public void onConnectionStateChange(ConnectionState state);
61      }
62  
63      private static final Logger LOG = Log.getLogger(IOState.class);
64      private ConnectionState state;
65      private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
66  
67      private final AtomicBoolean inputAvailable;
68      private final AtomicBoolean outputAvailable;
69      private final AtomicReference<CloseHandshakeSource> closeHandshakeSource;
70      private final AtomicReference<CloseInfo> closeInfo;
71  
72      private final AtomicBoolean cleanClose;
73  
74      /**
75       * Create a new IOState, initialized to {@link ConnectionState#CONNECTING}
76       */
77      public IOState()
78      {
79          this.state = ConnectionState.CONNECTING;
80          this.inputAvailable = new AtomicBoolean(false);
81          this.outputAvailable = new AtomicBoolean(false);
82          this.closeHandshakeSource = new AtomicReference<>(CloseHandshakeSource.NONE);
83          this.closeInfo = new AtomicReference<>();
84          this.cleanClose = new AtomicBoolean(false);
85      }
86  
87      public void addListener(ConnectionStateListener listener)
88      {
89          listeners.add(listener);
90      }
91  
92      public void assertInputOpen() throws IOException
93      {
94          if (!isInputAvailable())
95          {
96              throw new IOException("Connection input is closed");
97          }
98      }
99  
100     public void assertOutputOpen() throws IOException
101     {
102         if (!isOutputAvailable())
103         {
104             throw new IOException("Connection output is closed");
105         }
106     }
107 
108     public CloseInfo getCloseInfo()
109     {
110         return closeInfo.get();
111     }
112 
113     public ConnectionState getConnectionState()
114     {
115         return state;
116     }
117 
118     public boolean isClosed()
119     {
120         synchronized (state)
121         {
122             return (state == ConnectionState.CLOSED);
123         }
124     }
125 
126     public boolean isInputAvailable()
127     {
128         return inputAvailable.get();
129     }
130 
131     public boolean isOpen()
132     {
133         return (getConnectionState() != ConnectionState.CLOSED);
134     }
135 
136     public boolean isOutputAvailable()
137     {
138         return outputAvailable.get();
139     }
140 
141     private void notifyStateListeners(ConnectionState state)
142     {
143         for (ConnectionStateListener listener : listeners)
144         {
145             listener.onConnectionStateChange(state);
146         }
147     }
148 
149     /**
150      * A websocket connection has been disconnected for abnormal close reasons.
151      * <p>
152      * This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout.
153      */
154     public void onAbnormalClose(CloseInfo close)
155     {
156         ConnectionState event = null;
157         synchronized (this.state)
158         {
159             if (this.state == ConnectionState.CLOSED)
160             {
161                 // already closed
162                 return;
163             }
164 
165             if (this.state == ConnectionState.OPEN)
166             {
167                 this.cleanClose.set(false);
168             }
169 
170             this.state = ConnectionState.CLOSED;
171             this.closeInfo.compareAndSet(null,close);
172             this.inputAvailable.set(false);
173             this.outputAvailable.set(false);
174             this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
175             event = this.state;
176         }
177         notifyStateListeners(event);
178     }
179 
180     /**
181      * A close handshake has been issued from the local endpoint
182      */
183     public void onCloseLocal(CloseInfo close)
184     {
185         LOG.debug("onCloseLocal({})",close);
186         ConnectionState event = null;
187         ConnectionState initialState = this.state;
188         if (initialState == ConnectionState.CLOSED)
189         {
190             // already closed
191             LOG.debug("already closed");
192             return;
193         }
194 
195         if (initialState == ConnectionState.CONNECTED)
196         {
197             // fast close. a local close request from end-user onConnected() method
198             LOG.debug("FastClose in CONNECTED detected");
199             // Force the state open (to allow read/write to endpoint)
200             onOpened();
201         }
202 
203         synchronized (this.state)
204         {
205             closeInfo.compareAndSet(null,close);
206 
207             boolean in = inputAvailable.get();
208             boolean out = outputAvailable.get();
209             closeHandshakeSource.compareAndSet(CloseHandshakeSource.NONE,CloseHandshakeSource.LOCAL);
210             out = false;
211             outputAvailable.set(false);
212 
213             LOG.debug("onCloseLocal(), input={}, output={}",in,out);
214 
215             if (!in && !out)
216             {
217                 LOG.debug("Close Handshake satisfied, disconnecting");
218                 cleanClose.set(true);
219                 this.state = ConnectionState.CLOSED;
220                 event = this.state;
221             }
222             else if (this.state == ConnectionState.OPEN)
223             {
224                 // We are now entering CLOSING (or half-closed)
225                 this.state = ConnectionState.CLOSING;
226                 event = this.state;
227             }
228         }
229         
230         LOG.debug("event = {}",event);
231 
232         // Only notify on state change events
233         if (event != null)
234         {
235             LOG.debug("notifying state listeners: {}",event);
236             notifyStateListeners(event);
237 
238             // if harsh, we don't expect an answer.
239             if (close.isHarsh())
240             {
241                 LOG.debug("Harsh close, disconnecting");
242                 synchronized (this.state)
243                 {
244                     this.state = ConnectionState.CLOSED;
245                     cleanClose.set(false);
246                     outputAvailable.set(false);
247                     inputAvailable.set(false);
248                     this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
249                     event = this.state;
250                 }
251                 notifyStateListeners(event);
252                 return;
253             }
254         }
255     }
256 
257     /**
258      * A close handshake has been received from the remote endpoint
259      */
260     public void onCloseRemote(CloseInfo close)
261     {
262         LOG.debug("onCloseRemote({})",close);
263         ConnectionState event = null;
264         synchronized (this.state)
265         {
266             if (this.state == ConnectionState.CLOSED)
267             {
268                 // already closed
269                 return;
270             }
271 
272             closeInfo.compareAndSet(null,close);
273 
274             boolean in = inputAvailable.get();
275             boolean out = outputAvailable.get();
276             closeHandshakeSource.compareAndSet(CloseHandshakeSource.NONE,CloseHandshakeSource.REMOTE);
277             in = false;
278             inputAvailable.set(false);
279 
280             LOG.debug("onCloseRemote(), input={}, output={}",in,out);
281 
282             if (!in && !out)
283             {
284                 LOG.debug("Close Handshake satisfied, disconnecting");
285                 cleanClose.set(true);
286                 this.state = ConnectionState.CLOSED;
287                 event = this.state;
288             }
289             else if (this.state == ConnectionState.OPEN)
290             {
291                 // We are now entering CLOSING (or half-closed)
292                 this.state = ConnectionState.CLOSING;
293                 event = this.state;
294             }
295         }
296 
297         // Only notify on state change events
298         if (event != null)
299         {
300             notifyStateListeners(event);
301         }
302     }
303 
304     /**
305      * WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet.
306      * <p>
307      * This is an intermediate state between the RFC's {@link ConnectionState#CONNECTING} and {@link ConnectionState#OPEN}
308      */
309     public void onConnected()
310     {
311         if (this.state != ConnectionState.CONNECTING)
312         {
313             LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
314             return;
315         }
316 
317         ConnectionState event = null;
318         synchronized (this.state)
319         {
320             this.state = ConnectionState.CONNECTED;
321             this.inputAvailable.set(false); // cannot read (yet)
322             this.outputAvailable.set(true); // write allowed
323             event = this.state;
324         }
325         notifyStateListeners(event);
326     }
327 
328     /**
329      * A websocket connection has failed its upgrade handshake, and is now closed.
330      */
331     public void onFailedUpgrade()
332     {
333         assert (this.state == ConnectionState.CONNECTING);
334         ConnectionState event = null;
335         synchronized (this.state)
336         {
337             this.state = ConnectionState.CLOSED;
338             this.cleanClose.set(false);
339             this.inputAvailable.set(false);
340             this.outputAvailable.set(false);
341             event = this.state;
342         }
343         notifyStateListeners(event);
344     }
345 
346     /**
347      * A websocket connection has finished its upgrade handshake, and is now open.
348      */
349     public void onOpened()
350     {
351         if (this.state != ConnectionState.CONNECTED)
352         {
353             LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
354             return;
355         }
356 
357         assert (this.state == ConnectionState.CONNECTED);
358 
359         ConnectionState event = null;
360         synchronized (this.state)
361         {
362             this.state = ConnectionState.OPEN;
363             this.inputAvailable.set(true);
364             this.outputAvailable.set(true);
365             event = this.state;
366         }
367         notifyStateListeners(event);
368     }
369 
370     /**
371      * The local endpoint has reached a read EOF.
372      * <p>
373      * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect.
374      */
375     public void onReadEOF()
376     {
377         ConnectionState event = null;
378         synchronized (this.state)
379         {
380             if (this.state == ConnectionState.CLOSED)
381             {
382                 // already closed
383                 return;
384             }
385 
386             CloseInfo close = new CloseInfo(StatusCode.NO_CLOSE,"Read EOF");
387 
388             this.cleanClose.set(false);
389             this.state = ConnectionState.CLOSED;
390             this.closeInfo.compareAndSet(null,close);
391             this.inputAvailable.set(false);
392             this.outputAvailable.set(false);
393             this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
394             event = this.state;
395         }
396         notifyStateListeners(event);
397     }
398 
399     public boolean wasAbnormalClose()
400     {
401         return closeHandshakeSource.get() == CloseHandshakeSource.ABNORMAL;
402     }
403 
404     public boolean wasCleanClose()
405     {
406         return cleanClose.get();
407     }
408 
409     public boolean wasLocalCloseInitiated()
410     {
411         return closeHandshakeSource.get() == CloseHandshakeSource.LOCAL;
412     }
413 
414     public boolean wasRemoteCloseInitiated()
415     {
416         return closeHandshakeSource.get() == CloseHandshakeSource.REMOTE;
417     }
418 }