1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.concurrent.CopyOnWriteArrayList;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.eclipse.jetty.util.StringUtil;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.websocket.api.StatusCode;
31 import org.eclipse.jetty.websocket.common.CloseInfo;
32 import org.eclipse.jetty.websocket.common.ConnectionState;
33
34
35
36
37
38
39
40
41
42 public class IOState
43 {
44
45
46
47 private static enum CloseHandshakeSource
48 {
49
50 NONE,
51
52 LOCAL,
53
54 REMOTE,
55
56 ABNORMAL;
57 }
58
59 public static interface ConnectionStateListener
60 {
61 public void onConnectionStateChange(ConnectionState state);
62 }
63
64 private static final Logger LOG = Log.getLogger(IOState.class);
65 private ConnectionState state;
66 private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
67
68
69
70
71
72 private boolean inputAvailable;
73
74
75
76
77 private boolean outputAvailable;
78
79
80
81
82 private CloseHandshakeSource closeHandshakeSource;
83
84
85
86
87
88
89 private CloseInfo closeInfo;
90
91
92
93
94
95 private AtomicReference<CloseInfo> finalClose = new AtomicReference<>();
96
97
98
99
100 private boolean cleanClose;
101
102
103
104
105 public IOState()
106 {
107 this.state = ConnectionState.CONNECTING;
108 this.inputAvailable = false;
109 this.outputAvailable = false;
110 this.closeHandshakeSource = CloseHandshakeSource.NONE;
111 this.closeInfo = null;
112 this.cleanClose = false;
113 }
114
115 public void addListener(ConnectionStateListener listener)
116 {
117 listeners.add(listener);
118 }
119
120 public void assertInputOpen() throws IOException
121 {
122 if (!isInputAvailable())
123 {
124 throw new IOException("Connection input is closed");
125 }
126 }
127
128 public void assertOutputOpen() throws IOException
129 {
130 if (!isOutputAvailable())
131 {
132 throw new IOException("Connection output is closed");
133 }
134 }
135
136 public CloseInfo getCloseInfo()
137 {
138 CloseInfo ci = finalClose.get();
139 if (ci != null)
140 {
141 return ci;
142 }
143 return closeInfo;
144 }
145
146 public ConnectionState getConnectionState()
147 {
148 return state;
149 }
150
151 public boolean isClosed()
152 {
153 synchronized (state)
154 {
155 return (state == ConnectionState.CLOSED);
156 }
157 }
158
159 public boolean isInputAvailable()
160 {
161 return inputAvailable;
162 }
163
164 public boolean isOpen()
165 {
166 return (getConnectionState() != ConnectionState.CLOSED);
167 }
168
169 public boolean isOutputAvailable()
170 {
171 return outputAvailable;
172 }
173
174 private void notifyStateListeners(ConnectionState state)
175 {
176 if (LOG.isDebugEnabled())
177 LOG.debug("Notify State Listeners: {}",state);
178 for (ConnectionStateListener listener : listeners)
179 {
180 if (LOG.isDebugEnabled())
181 {
182 LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
183 }
184 listener.onConnectionStateChange(state);
185 }
186 }
187
188
189
190
191
192
193
194 public void onAbnormalClose(CloseInfo close)
195 {
196 if (LOG.isDebugEnabled())
197 LOG.debug("onAbnormalClose({})",close);
198 ConnectionState event = null;
199 synchronized (this)
200 {
201 if (this.state == ConnectionState.CLOSED)
202 {
203
204 return;
205 }
206
207 if (this.state == ConnectionState.OPEN)
208 {
209 this.cleanClose = false;
210 }
211
212 this.state = ConnectionState.CLOSED;
213 finalClose.compareAndSet(null,close);
214 this.inputAvailable = false;
215 this.outputAvailable = false;
216 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
217 event = this.state;
218 }
219 notifyStateListeners(event);
220 }
221
222
223
224
225
226 public void onCloseLocal(CloseInfo close)
227 {
228 ConnectionState event = null;
229 ConnectionState abnormalEvent = null;
230 ConnectionState initialState = this.state;
231 if (LOG.isDebugEnabled())
232 LOG.debug("onCloseLocal({}) : {}",close,initialState);
233 if (initialState == ConnectionState.CLOSED)
234 {
235
236 LOG.debug("already closed");
237 return;
238 }
239
240 if (initialState == ConnectionState.CONNECTED)
241 {
242
243 LOG.debug("FastClose in CONNECTED detected");
244
245 onOpened();
246 if (LOG.isDebugEnabled())
247 LOG.debug("FastClose continuing with Closure");
248 }
249
250 synchronized (this)
251 {
252 closeInfo = close;
253
254
255 outputAvailable = false;
256
257 boolean in = inputAvailable;
258 boolean out = outputAvailable;
259 if (closeHandshakeSource == CloseHandshakeSource.NONE)
260 {
261 closeHandshakeSource = CloseHandshakeSource.LOCAL;
262 }
263
264 LOG.debug("onCloseLocal(), input={}, output={}",in,out);
265
266 if (!in && !out)
267 {
268 LOG.debug("Close Handshake satisfied, disconnecting");
269 cleanClose = true;
270 this.state = ConnectionState.CLOSED;
271 finalClose.compareAndSet(null,close);
272 event = this.state;
273 }
274 else if (this.state == ConnectionState.OPEN)
275 {
276
277 this.state = ConnectionState.CLOSING;
278 event = this.state;
279
280
281 if (close.isAbnormal())
282 {
283 abnormalEvent = ConnectionState.CLOSED;
284 finalClose.compareAndSet(null,close);
285 cleanClose = false;
286 outputAvailable = false;
287 inputAvailable = false;
288 closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
289 }
290 }
291 }
292
293
294 if (event != null)
295 {
296 notifyStateListeners(event);
297
298 if(abnormalEvent != null)
299 {
300 notifyStateListeners(abnormalEvent);
301 }
302 }
303 }
304
305
306
307
308
309 public void onCloseRemote(CloseInfo close)
310 {
311 if (LOG.isDebugEnabled())
312 LOG.debug("onCloseRemote({})",close);
313 ConnectionState event = null;
314 synchronized (this)
315 {
316 if (this.state == ConnectionState.CLOSED)
317 {
318
319 return;
320 }
321
322 closeInfo = close;
323
324
325 inputAvailable = false;
326
327 boolean in = inputAvailable;
328 boolean out = outputAvailable;
329 if (closeHandshakeSource == CloseHandshakeSource.NONE)
330 {
331 closeHandshakeSource = CloseHandshakeSource.REMOTE;
332 }
333
334 if (LOG.isDebugEnabled())
335 LOG.debug("onCloseRemote(), input={}, output={}",in,out);
336
337 if (!in && !out)
338 {
339 LOG.debug("Close Handshake satisfied, disconnecting");
340 cleanClose = true;
341 state = ConnectionState.CLOSED;
342 finalClose.compareAndSet(null,close);
343 event = this.state;
344 }
345 else if (this.state == ConnectionState.OPEN)
346 {
347
348 this.state = ConnectionState.CLOSING;
349 event = this.state;
350 }
351 }
352
353
354 if (event != null)
355 {
356 notifyStateListeners(event);
357 }
358 }
359
360
361
362
363
364
365 public void onConnected()
366 {
367 ConnectionState event = null;
368 synchronized (this)
369 {
370 if (this.state != ConnectionState.CONNECTING)
371 {
372 LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
373 return;
374 }
375
376 this.state = ConnectionState.CONNECTED;
377 inputAvailable = false;
378 outputAvailable = true;
379 event = this.state;
380 }
381 notifyStateListeners(event);
382 }
383
384
385
386
387 public void onFailedUpgrade()
388 {
389 assert (this.state == ConnectionState.CONNECTING);
390 ConnectionState event = null;
391 synchronized (this)
392 {
393 this.state = ConnectionState.CLOSED;
394 cleanClose = false;
395 inputAvailable = false;
396 outputAvailable = false;
397 event = this.state;
398 }
399 notifyStateListeners(event);
400 }
401
402
403
404
405 public void onOpened()
406 {
407 if(LOG.isDebugEnabled())
408 LOG.debug(" onOpened()");
409
410 ConnectionState event = null;
411 synchronized (this)
412 {
413 if (this.state == ConnectionState.OPEN)
414 {
415
416 return;
417 }
418
419 if (this.state != ConnectionState.CONNECTED)
420 {
421 LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
422 return;
423 }
424
425 this.state = ConnectionState.OPEN;
426 this.inputAvailable = true;
427 this.outputAvailable = true;
428 event = this.state;
429 }
430 notifyStateListeners(event);
431 }
432
433
434
435
436
437
438
439 public void onReadFailure(Throwable t)
440 {
441 ConnectionState event = null;
442 synchronized (this)
443 {
444 if (this.state == ConnectionState.CLOSED)
445 {
446
447 return;
448 }
449
450
451 String reason = "WebSocket Read Failure";
452 if (t instanceof EOFException)
453 {
454 reason = "WebSocket Read EOF";
455 Throwable cause = t.getCause();
456 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
457 {
458 reason = "EOF: " + cause.getMessage();
459 }
460 }
461 else
462 {
463 if (StringUtil.isNotBlank(t.getMessage()))
464 {
465 reason = t.getMessage();
466 }
467 }
468
469 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
470
471 finalClose.compareAndSet(null,close);
472
473 this.cleanClose = false;
474 this.state = ConnectionState.CLOSED;
475 this.closeInfo = close;
476 this.inputAvailable = false;
477 this.outputAvailable = false;
478 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
479 event = this.state;
480 }
481 notifyStateListeners(event);
482 }
483
484
485
486
487
488
489
490 public void onWriteFailure(Throwable t)
491 {
492 ConnectionState event = null;
493 synchronized (this)
494 {
495 if (this.state == ConnectionState.CLOSED)
496 {
497
498 return;
499 }
500
501
502 String reason = "WebSocket Write Failure";
503 if (t instanceof EOFException)
504 {
505 reason = "WebSocket Write EOF";
506 Throwable cause = t.getCause();
507 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
508 {
509 reason = "EOF: " + cause.getMessage();
510 }
511 }
512 else
513 {
514 if (StringUtil.isNotBlank(t.getMessage()))
515 {
516 reason = t.getMessage();
517 }
518 }
519
520 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
521
522 finalClose.compareAndSet(null,close);
523
524 this.cleanClose = false;
525 this.state = ConnectionState.CLOSED;
526 this.inputAvailable = false;
527 this.outputAvailable = false;
528 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
529 event = this.state;
530 }
531 notifyStateListeners(event);
532 }
533
534 public void onDisconnected()
535 {
536 ConnectionState event = null;
537 synchronized (this)
538 {
539 if (this.state == ConnectionState.CLOSED)
540 {
541
542 return;
543 }
544
545 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
546
547 this.cleanClose = false;
548 this.state = ConnectionState.CLOSED;
549 this.closeInfo = close;
550 this.inputAvailable = false;
551 this.outputAvailable = false;
552 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
553 event = this.state;
554 }
555 notifyStateListeners(event);
556 }
557
558 @Override
559 public String toString()
560 {
561 StringBuilder str = new StringBuilder();
562 str.append(this.getClass().getSimpleName());
563 str.append("@").append(Integer.toHexString(hashCode()));
564 str.append("[").append(state);
565 str.append(',');
566 if (!inputAvailable)
567 {
568 str.append('!');
569 }
570 str.append("in,");
571 if (!outputAvailable)
572 {
573 str.append('!');
574 }
575 str.append("out");
576 if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
577 {
578 CloseInfo ci = finalClose.get();
579 if (ci != null)
580 {
581 str.append(",finalClose=").append(ci);
582 }
583 else
584 {
585 str.append(",close=").append(closeInfo);
586 }
587 str.append(",clean=").append(cleanClose);
588 str.append(",closeSource=").append(closeHandshakeSource);
589 }
590 str.append(']');
591 return str.toString();
592 }
593
594 public boolean wasAbnormalClose()
595 {
596 return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
597 }
598
599 public boolean wasCleanClose()
600 {
601 return cleanClose;
602 }
603
604 public boolean wasLocalCloseInitiated()
605 {
606 return closeHandshakeSource == CloseHandshakeSource.LOCAL;
607 }
608
609 public boolean wasRemoteCloseInitiated()
610 {
611 return closeHandshakeSource == CloseHandshakeSource.REMOTE;
612 }
613
614 }