1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.concurrent.CopyOnWriteArrayList;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.eclipse.jetty.util.StringUtil;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.websocket.api.CloseStatus;
31 import org.eclipse.jetty.websocket.api.StatusCode;
32 import org.eclipse.jetty.websocket.common.CloseInfo;
33 import org.eclipse.jetty.websocket.common.ConnectionState;
34
35
36
37
38
39
40
41
42
43 public class IOState
44 {
45
46
47
48 private static enum CloseHandshakeSource
49 {
50
51 NONE,
52
53 LOCAL,
54
55 REMOTE,
56
57 ABNORMAL;
58 }
59
60 public static interface ConnectionStateListener
61 {
62 public void onConnectionStateChange(ConnectionState state);
63 }
64
65 private static final Logger LOG = Log.getLogger(IOState.class);
66 private ConnectionState state;
67 private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
68
69
70
71
72
73 private boolean inputAvailable;
74
75
76
77
78 private boolean outputAvailable;
79
80
81
82
83 private CloseHandshakeSource closeHandshakeSource;
84
85
86
87
88
89
90 private CloseInfo closeInfo;
91
92
93
94
95
96 private AtomicReference<CloseInfo> finalClose = new AtomicReference<>();
97
98
99
100
101 private boolean cleanClose;
102
103
104
105
106 public IOState()
107 {
108 this.state = ConnectionState.CONNECTING;
109 this.inputAvailable = false;
110 this.outputAvailable = false;
111 this.closeHandshakeSource = CloseHandshakeSource.NONE;
112 this.closeInfo = null;
113 this.cleanClose = false;
114 }
115
116 public void addListener(ConnectionStateListener listener)
117 {
118 listeners.add(listener);
119 }
120
121 public void assertInputOpen() throws IOException
122 {
123 if (!isInputAvailable())
124 {
125 throw new IOException("Connection input is closed");
126 }
127 }
128
129 public void assertOutputOpen() throws IOException
130 {
131 if (!isOutputAvailable())
132 {
133 throw new IOException("Connection output is closed");
134 }
135 }
136
137 public CloseInfo getCloseInfo()
138 {
139 CloseInfo ci = finalClose.get();
140 if (ci != null)
141 {
142 return ci;
143 }
144 return closeInfo;
145 }
146
147 public ConnectionState getConnectionState()
148 {
149 return state;
150 }
151
152 public boolean isClosed()
153 {
154 synchronized (state)
155 {
156 return (state == ConnectionState.CLOSED);
157 }
158 }
159
160 public boolean isInputAvailable()
161 {
162 return inputAvailable;
163 }
164
165 public boolean isOpen()
166 {
167 return (getConnectionState() != ConnectionState.CLOSED);
168 }
169
170 public boolean isOutputAvailable()
171 {
172 return outputAvailable;
173 }
174
175 private void notifyStateListeners(ConnectionState state)
176 {
177 LOG.debug("Notify State Listeners: {}",state);
178 for (ConnectionStateListener listener : listeners)
179 {
180 if (LOG.isDebugEnabled())
181 {
182 LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
183 }
184 listener.onConnectionStateChange(state);
185 }
186 }
187
188
189
190
191
192
193 public void onAbnormalClose(CloseInfo close)
194 {
195 LOG.debug("onAbnormalClose({})",close);
196 ConnectionState event = null;
197 synchronized (this)
198 {
199 if (this.state == ConnectionState.CLOSED)
200 {
201
202 return;
203 }
204
205 if (this.state == ConnectionState.OPEN)
206 {
207 this.cleanClose = false;
208 }
209
210 this.state = ConnectionState.CLOSED;
211 finalClose.compareAndSet(null,close);
212 this.inputAvailable = false;
213 this.outputAvailable = false;
214 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
215 event = this.state;
216 }
217 notifyStateListeners(event);
218 }
219
220
221
222
223 public void onCloseLocal(CloseInfo close)
224 {
225 ConnectionState event = null;
226 ConnectionState abnormalEvent = null;
227 ConnectionState initialState = this.state;
228 LOG.debug("onCloseLocal({}) : {}",close,initialState);
229 if (initialState == ConnectionState.CLOSED)
230 {
231
232 LOG.debug("already closed");
233 return;
234 }
235
236 if (initialState == ConnectionState.CONNECTED)
237 {
238
239 LOG.debug("FastClose in CONNECTED detected");
240
241 onOpened();
242 LOG.debug("FastClose continuing with Closure");
243 }
244
245 synchronized (this)
246 {
247 closeInfo = close;
248
249 boolean in = inputAvailable;
250 boolean out = outputAvailable;
251 if (closeHandshakeSource == CloseHandshakeSource.NONE)
252 {
253 closeHandshakeSource = CloseHandshakeSource.LOCAL;
254 }
255 out = false;
256 outputAvailable = false;
257
258 LOG.debug("onCloseLocal(), input={}, output={}",in,out);
259
260 if (!in && !out)
261 {
262 LOG.debug("Close Handshake satisfied, disconnecting");
263 cleanClose = true;
264 this.state = ConnectionState.CLOSED;
265 finalClose.compareAndSet(null,close);
266 event = this.state;
267 }
268 else if (this.state == ConnectionState.OPEN)
269 {
270
271 this.state = ConnectionState.CLOSING;
272 event = this.state;
273
274
275 if (close.isAbnormal())
276 {
277 abnormalEvent = ConnectionState.CLOSED;
278 finalClose.compareAndSet(null,close);
279 cleanClose = false;
280 outputAvailable = false;
281 inputAvailable = false;
282 closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
283 }
284 }
285 }
286
287
288 if (event != null)
289 {
290 notifyStateListeners(event);
291
292 if(abnormalEvent != null) {
293 notifyStateListeners(abnormalEvent);
294 }
295 }
296 }
297
298
299
300
301 public void onCloseRemote(CloseInfo close)
302 {
303 LOG.debug("onCloseRemote({})",close);
304 ConnectionState event = null;
305 synchronized (this)
306 {
307 if (this.state == ConnectionState.CLOSED)
308 {
309
310 return;
311 }
312
313 closeInfo = close;
314
315 boolean in = inputAvailable;
316 boolean out = outputAvailable;
317 if (closeHandshakeSource == CloseHandshakeSource.NONE)
318 {
319 closeHandshakeSource = CloseHandshakeSource.REMOTE;
320 }
321 in = false;
322 inputAvailable = false;
323
324 LOG.debug("onCloseRemote(), input={}, output={}",in,out);
325
326 if (!in && !out)
327 {
328 LOG.debug("Close Handshake satisfied, disconnecting");
329 cleanClose = true;
330 state = ConnectionState.CLOSED;
331 finalClose.compareAndSet(null,close);
332 event = this.state;
333 }
334 else if (this.state == ConnectionState.OPEN)
335 {
336
337 this.state = ConnectionState.CLOSING;
338 event = this.state;
339 }
340 }
341
342
343 if (event != null)
344 {
345 notifyStateListeners(event);
346 }
347 }
348
349
350
351
352
353
354 public void onConnected()
355 {
356 ConnectionState event = null;
357 synchronized (this)
358 {
359 if (this.state != ConnectionState.CONNECTING)
360 {
361 LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
362 return;
363 }
364
365 this.state = ConnectionState.CONNECTED;
366 inputAvailable = false;
367 outputAvailable = true;
368 event = this.state;
369 }
370 notifyStateListeners(event);
371 }
372
373
374
375
376 public void onFailedUpgrade()
377 {
378 assert (this.state == ConnectionState.CONNECTING);
379 ConnectionState event = null;
380 synchronized (this)
381 {
382 this.state = ConnectionState.CLOSED;
383 cleanClose = false;
384 inputAvailable = false;
385 outputAvailable = false;
386 event = this.state;
387 }
388 notifyStateListeners(event);
389 }
390
391
392
393
394 public void onOpened()
395 {
396 ConnectionState event = null;
397 synchronized (this)
398 {
399 if (this.state == ConnectionState.OPEN)
400 {
401
402 return;
403 }
404
405 if (this.state != ConnectionState.CONNECTED)
406 {
407 LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
408 return;
409 }
410
411 this.state = ConnectionState.OPEN;
412 this.inputAvailable = true;
413 this.outputAvailable = true;
414 event = this.state;
415 }
416 notifyStateListeners(event);
417 }
418
419
420
421
422
423
424 public void onReadFailure(Throwable t)
425 {
426 ConnectionState event = null;
427 synchronized (this)
428 {
429 if (this.state == ConnectionState.CLOSED)
430 {
431
432 return;
433 }
434
435
436 String reason = "WebSocket Read Failure";
437 if (t instanceof EOFException)
438 {
439 reason = "WebSocket Read EOF";
440 Throwable cause = t.getCause();
441 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
442 {
443 reason = "EOF: " + cause.getMessage();
444 }
445 }
446 else
447 {
448 if (StringUtil.isNotBlank(t.getMessage()))
449 {
450 reason = t.getMessage();
451 }
452 }
453
454 reason = CloseStatus.trimMaxReasonLength(reason);
455 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
456
457 finalClose.compareAndSet(null,close);
458
459 this.cleanClose = false;
460 this.state = ConnectionState.CLOSED;
461 this.closeInfo = close;
462 this.inputAvailable = false;
463 this.outputAvailable = false;
464 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
465 event = this.state;
466 }
467 notifyStateListeners(event);
468 }
469
470
471
472
473
474
475 public void onWriteFailure(Throwable t)
476 {
477 ConnectionState event = null;
478 synchronized (this)
479 {
480 if (this.state == ConnectionState.CLOSED)
481 {
482
483 return;
484 }
485
486
487 String reason = "WebSocket Write Failure";
488 if (t instanceof EOFException)
489 {
490 reason = "WebSocket Write EOF";
491 Throwable cause = t.getCause();
492 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
493 {
494 reason = "EOF: " + cause.getMessage();
495 }
496 }
497 else
498 {
499 if (StringUtil.isNotBlank(t.getMessage()))
500 {
501 reason = t.getMessage();
502 }
503 }
504
505 reason = CloseStatus.trimMaxReasonLength(reason);
506 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
507
508 finalClose.compareAndSet(null,close);
509
510 this.cleanClose = false;
511 this.state = ConnectionState.CLOSED;
512 this.inputAvailable = false;
513 this.outputAvailable = false;
514 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
515 event = this.state;
516 }
517 notifyStateListeners(event);
518 }
519
520 public void onDisconnected()
521 {
522 ConnectionState event = null;
523 synchronized (this)
524 {
525 if (this.state == ConnectionState.CLOSED)
526 {
527
528 return;
529 }
530
531 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
532
533 this.cleanClose = false;
534 this.state = ConnectionState.CLOSED;
535 this.closeInfo = close;
536 this.inputAvailable = false;
537 this.outputAvailable = false;
538 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
539 event = this.state;
540 }
541 notifyStateListeners(event);
542 }
543
544 @Override
545 public String toString()
546 {
547 StringBuilder str = new StringBuilder();
548 str.append(this.getClass().getSimpleName());
549 str.append("@").append(Integer.toHexString(hashCode()));
550 str.append("[").append(state);
551 str.append(',');
552 if (!inputAvailable)
553 {
554 str.append('!');
555 }
556 str.append("in,");
557 if (!outputAvailable)
558 {
559 str.append('!');
560 }
561 str.append("out");
562 if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
563 {
564 CloseInfo ci = finalClose.get();
565 if (ci != null)
566 {
567 str.append(",finalClose=").append(ci);
568 }
569 else
570 {
571 str.append(",close=").append(closeInfo);
572 }
573 str.append(",clean=").append(cleanClose);
574 str.append(",closeSource=").append(closeHandshakeSource);
575 }
576 str.append(']');
577 return str.toString();
578 }
579
580 public boolean wasAbnormalClose()
581 {
582 return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
583 }
584
585 public boolean wasCleanClose()
586 {
587 return cleanClose;
588 }
589
590 public boolean wasLocalCloseInitiated()
591 {
592 return closeHandshakeSource == CloseHandshakeSource.LOCAL;
593 }
594
595 public boolean wasRemoteCloseInitiated()
596 {
597 return closeHandshakeSource == CloseHandshakeSource.REMOTE;
598 }
599
600 }