1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.eclipse.jetty.util.log.Log;
28 import org.eclipse.jetty.util.log.Logger;
29 import org.eclipse.jetty.websocket.api.StatusCode;
30 import org.eclipse.jetty.websocket.common.CloseInfo;
31 import org.eclipse.jetty.websocket.common.ConnectionState;
32
33
34
35
36
37
38
39
40
41 public class IOState
42 {
43
44
45
46 private static enum CloseHandshakeSource
47 {
48
49 NONE,
50
51 LOCAL,
52
53 REMOTE,
54
55 ABNORMAL;
56 }
57
58 public static interface ConnectionStateListener
59 {
60 public void onConnectionStateChange(ConnectionState state);
61 }
62
63 private static final Logger LOG = Log.getLogger(IOState.class);
64 private ConnectionState state;
65 private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
66
67 private final AtomicBoolean inputAvailable;
68 private final AtomicBoolean outputAvailable;
69 private final AtomicReference<CloseHandshakeSource> closeHandshakeSource;
70 private final AtomicReference<CloseInfo> closeInfo;
71
72 private final AtomicBoolean cleanClose;
73
74
75
76
77 public IOState()
78 {
79 this.state = ConnectionState.CONNECTING;
80 this.inputAvailable = new AtomicBoolean(false);
81 this.outputAvailable = new AtomicBoolean(false);
82 this.closeHandshakeSource = new AtomicReference<>(CloseHandshakeSource.NONE);
83 this.closeInfo = new AtomicReference<>();
84 this.cleanClose = new AtomicBoolean(false);
85 }
86
87 public void addListener(ConnectionStateListener listener)
88 {
89 listeners.add(listener);
90 }
91
92 public void assertInputOpen() throws IOException
93 {
94 if (!isInputAvailable())
95 {
96 throw new IOException("Connection input is closed");
97 }
98 }
99
100 public void assertOutputOpen() throws IOException
101 {
102 if (!isOutputAvailable())
103 {
104 throw new IOException("Connection output is closed");
105 }
106 }
107
108 public CloseInfo getCloseInfo()
109 {
110 return closeInfo.get();
111 }
112
113 public ConnectionState getConnectionState()
114 {
115 return state;
116 }
117
118 public boolean isClosed()
119 {
120 synchronized (state)
121 {
122 return (state == ConnectionState.CLOSED);
123 }
124 }
125
126 public boolean isInputAvailable()
127 {
128 return inputAvailable.get();
129 }
130
131 public boolean isOpen()
132 {
133 return (getConnectionState() != ConnectionState.CLOSED);
134 }
135
136 public boolean isOutputAvailable()
137 {
138 return outputAvailable.get();
139 }
140
141 private void notifyStateListeners(ConnectionState state)
142 {
143 for (ConnectionStateListener listener : listeners)
144 {
145 listener.onConnectionStateChange(state);
146 }
147 }
148
149
150
151
152
153
154 public void onAbnormalClose(CloseInfo close)
155 {
156 ConnectionState event = null;
157 synchronized (this.state)
158 {
159 if (this.state == ConnectionState.CLOSED)
160 {
161
162 return;
163 }
164
165 if (this.state == ConnectionState.OPEN)
166 {
167 this.cleanClose.set(false);
168 }
169
170 this.state = ConnectionState.CLOSED;
171 this.closeInfo.compareAndSet(null,close);
172 this.inputAvailable.set(false);
173 this.outputAvailable.set(false);
174 this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
175 event = this.state;
176 }
177 notifyStateListeners(event);
178 }
179
180
181
182
183 public void onCloseLocal(CloseInfo close)
184 {
185 LOG.debug("onCloseLocal({})",close);
186 ConnectionState event = null;
187 ConnectionState initialState = this.state;
188 if (initialState == ConnectionState.CLOSED)
189 {
190
191 LOG.debug("already closed");
192 return;
193 }
194
195 if (initialState == ConnectionState.CONNECTED)
196 {
197
198 LOG.debug("FastClose in CONNECTED detected");
199
200 onOpened();
201 }
202
203 synchronized (this.state)
204 {
205 closeInfo.compareAndSet(null,close);
206
207 boolean in = inputAvailable.get();
208 boolean out = outputAvailable.get();
209 closeHandshakeSource.compareAndSet(CloseHandshakeSource.NONE,CloseHandshakeSource.LOCAL);
210 out = false;
211 outputAvailable.set(false);
212
213 LOG.debug("onCloseLocal(), input={}, output={}",in,out);
214
215 if (!in && !out)
216 {
217 LOG.debug("Close Handshake satisfied, disconnecting");
218 cleanClose.set(true);
219 this.state = ConnectionState.CLOSED;
220 event = this.state;
221 }
222 else if (this.state == ConnectionState.OPEN)
223 {
224
225 this.state = ConnectionState.CLOSING;
226 event = this.state;
227 }
228 }
229
230 LOG.debug("event = {}",event);
231
232
233 if (event != null)
234 {
235 LOG.debug("notifying state listeners: {}",event);
236 notifyStateListeners(event);
237
238
239 if (close.isHarsh())
240 {
241 LOG.debug("Harsh close, disconnecting");
242 synchronized (this.state)
243 {
244 this.state = ConnectionState.CLOSED;
245 cleanClose.set(false);
246 outputAvailable.set(false);
247 inputAvailable.set(false);
248 this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
249 event = this.state;
250 }
251 notifyStateListeners(event);
252 return;
253 }
254 }
255 }
256
257
258
259
260 public void onCloseRemote(CloseInfo close)
261 {
262 LOG.debug("onCloseRemote({})",close);
263 ConnectionState event = null;
264 synchronized (this.state)
265 {
266 if (this.state == ConnectionState.CLOSED)
267 {
268
269 return;
270 }
271
272 closeInfo.compareAndSet(null,close);
273
274 boolean in = inputAvailable.get();
275 boolean out = outputAvailable.get();
276 closeHandshakeSource.compareAndSet(CloseHandshakeSource.NONE,CloseHandshakeSource.REMOTE);
277 in = false;
278 inputAvailable.set(false);
279
280 LOG.debug("onCloseRemote(), input={}, output={}",in,out);
281
282 if (!in && !out)
283 {
284 LOG.debug("Close Handshake satisfied, disconnecting");
285 cleanClose.set(true);
286 this.state = ConnectionState.CLOSED;
287 event = this.state;
288 }
289 else if (this.state == ConnectionState.OPEN)
290 {
291
292 this.state = ConnectionState.CLOSING;
293 event = this.state;
294 }
295 }
296
297
298 if (event != null)
299 {
300 notifyStateListeners(event);
301 }
302 }
303
304
305
306
307
308
309 public void onConnected()
310 {
311 if (this.state != ConnectionState.CONNECTING)
312 {
313 LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
314 return;
315 }
316
317 ConnectionState event = null;
318 synchronized (this.state)
319 {
320 this.state = ConnectionState.CONNECTED;
321 this.inputAvailable.set(false);
322 this.outputAvailable.set(true);
323 event = this.state;
324 }
325 notifyStateListeners(event);
326 }
327
328
329
330
331 public void onFailedUpgrade()
332 {
333 assert (this.state == ConnectionState.CONNECTING);
334 ConnectionState event = null;
335 synchronized (this.state)
336 {
337 this.state = ConnectionState.CLOSED;
338 this.cleanClose.set(false);
339 this.inputAvailable.set(false);
340 this.outputAvailable.set(false);
341 event = this.state;
342 }
343 notifyStateListeners(event);
344 }
345
346
347
348
349 public void onOpened()
350 {
351 if (this.state != ConnectionState.CONNECTED)
352 {
353 LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
354 return;
355 }
356
357 assert (this.state == ConnectionState.CONNECTED);
358
359 ConnectionState event = null;
360 synchronized (this.state)
361 {
362 this.state = ConnectionState.OPEN;
363 this.inputAvailable.set(true);
364 this.outputAvailable.set(true);
365 event = this.state;
366 }
367 notifyStateListeners(event);
368 }
369
370
371
372
373
374
375 public void onReadEOF()
376 {
377 ConnectionState event = null;
378 synchronized (this.state)
379 {
380 if (this.state == ConnectionState.CLOSED)
381 {
382
383 return;
384 }
385
386 CloseInfo close = new CloseInfo(StatusCode.NO_CLOSE,"Read EOF");
387
388 this.cleanClose.set(false);
389 this.state = ConnectionState.CLOSED;
390 this.closeInfo.compareAndSet(null,close);
391 this.inputAvailable.set(false);
392 this.outputAvailable.set(false);
393 this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
394 event = this.state;
395 }
396 notifyStateListeners(event);
397 }
398
399 public boolean wasAbnormalClose()
400 {
401 return closeHandshakeSource.get() == CloseHandshakeSource.ABNORMAL;
402 }
403
404 public boolean wasCleanClose()
405 {
406 return cleanClose.get();
407 }
408
409 public boolean wasLocalCloseInitiated()
410 {
411 return closeHandshakeSource.get() == CloseHandshakeSource.LOCAL;
412 }
413
414 public boolean wasRemoteCloseInitiated()
415 {
416 return closeHandshakeSource.get() == CloseHandshakeSource.REMOTE;
417 }
418 }