1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.concurrent.CopyOnWriteArrayList;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.eclipse.jetty.util.StringUtil;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.websocket.api.StatusCode;
31 import org.eclipse.jetty.websocket.common.CloseInfo;
32 import org.eclipse.jetty.websocket.common.ConnectionState;
33
34
35
36
37
38
39
40
41
42 public class IOState
43 {
44
45
46
47 private enum CloseHandshakeSource
48 {
49
50 NONE,
51
52 LOCAL,
53
54 REMOTE,
55
56 ABNORMAL
57 }
58
59 public static interface ConnectionStateListener
60 {
61 public void onConnectionStateChange(ConnectionState state);
62 }
63
64 private static final Logger LOG = Log.getLogger(IOState.class);
65 private ConnectionState state;
66 private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
67
68
69
70
71
72 private boolean inputAvailable;
73
74
75
76
77 private boolean outputAvailable;
78
79
80
81
82 private CloseHandshakeSource closeHandshakeSource;
83
84
85
86
87
88
89 private CloseInfo closeInfo;
90
91
92
93
94
95 private AtomicReference<CloseInfo> finalClose = new AtomicReference<>();
96
97
98
99
100 private boolean cleanClose;
101
102
103
104
105 public IOState()
106 {
107 this.state = ConnectionState.CONNECTING;
108 this.inputAvailable = false;
109 this.outputAvailable = false;
110 this.closeHandshakeSource = CloseHandshakeSource.NONE;
111 this.closeInfo = null;
112 this.cleanClose = false;
113 }
114
115 public void addListener(ConnectionStateListener listener)
116 {
117 listeners.add(listener);
118 }
119
120 public void assertInputOpen() throws IOException
121 {
122 if (!isInputAvailable())
123 {
124 throw new IOException("Connection input is closed");
125 }
126 }
127
128 public void assertOutputOpen() throws IOException
129 {
130 if (!isOutputAvailable())
131 {
132 throw new IOException("Connection output is closed");
133 }
134 }
135
136 public CloseInfo getCloseInfo()
137 {
138 CloseInfo ci = finalClose.get();
139 if (ci != null)
140 {
141 return ci;
142 }
143 return closeInfo;
144 }
145
146 public ConnectionState getConnectionState()
147 {
148 return state;
149 }
150
151 public boolean isClosed()
152 {
153 synchronized (this)
154 {
155 return (state == ConnectionState.CLOSED);
156 }
157 }
158
159 public boolean isInputAvailable()
160 {
161 return inputAvailable;
162 }
163
164 public boolean isOpen()
165 {
166 return !isClosed();
167 }
168
169 public boolean isOutputAvailable()
170 {
171 return outputAvailable;
172 }
173
174 private void notifyStateListeners(ConnectionState state)
175 {
176 if (LOG.isDebugEnabled())
177 LOG.debug("Notify State Listeners: {}",state);
178 for (ConnectionStateListener listener : listeners)
179 {
180 if (LOG.isDebugEnabled())
181 {
182 LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
183 }
184 listener.onConnectionStateChange(state);
185 }
186 }
187
188
189
190
191
192
193
194 public void onAbnormalClose(CloseInfo close)
195 {
196 if (LOG.isDebugEnabled())
197 LOG.debug("onAbnormalClose({})",close);
198 ConnectionState event = null;
199 synchronized (this)
200 {
201 if (this.state == ConnectionState.CLOSED)
202 {
203
204 return;
205 }
206
207 if (this.state == ConnectionState.OPEN)
208 {
209 this.cleanClose = false;
210 }
211
212 this.state = ConnectionState.CLOSED;
213 finalClose.compareAndSet(null,close);
214 this.inputAvailable = false;
215 this.outputAvailable = false;
216 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
217 event = this.state;
218 }
219 notifyStateListeners(event);
220 }
221
222
223
224
225
226 public void onCloseLocal(CloseInfo closeInfo)
227 {
228 boolean open = false;
229 synchronized (this)
230 {
231 ConnectionState initialState = this.state;
232 if (LOG.isDebugEnabled())
233 LOG.debug("onCloseLocal({}) : {}", closeInfo, initialState);
234 if (initialState == ConnectionState.CLOSED)
235 {
236
237 if (LOG.isDebugEnabled())
238 LOG.debug("already closed");
239 return;
240 }
241
242 if (initialState == ConnectionState.CONNECTED)
243 {
244
245 if (LOG.isDebugEnabled())
246 LOG.debug("FastClose in CONNECTED detected");
247 open = true;
248 }
249 }
250
251 if (open)
252 openAndCloseLocal(closeInfo);
253 else
254 closeLocal(closeInfo);
255 }
256
257 private void openAndCloseLocal(CloseInfo closeInfo)
258 {
259
260 onOpened();
261 if (LOG.isDebugEnabled())
262 LOG.debug("FastClose continuing with Closure");
263 closeLocal(closeInfo);
264 }
265
266 private void closeLocal(CloseInfo closeInfo)
267 {
268 ConnectionState event = null;
269 ConnectionState abnormalEvent = null;
270 synchronized (this)
271 {
272 if (LOG.isDebugEnabled())
273 LOG.debug("onCloseLocal(), input={}, output={}", inputAvailable, outputAvailable);
274
275 this.closeInfo = closeInfo;
276
277
278 outputAvailable = false;
279
280 if (closeHandshakeSource == CloseHandshakeSource.NONE)
281 {
282 closeHandshakeSource = CloseHandshakeSource.LOCAL;
283 }
284
285 if (!inputAvailable)
286 {
287 if (LOG.isDebugEnabled())
288 LOG.debug("Close Handshake satisfied, disconnecting");
289 cleanClose = true;
290 this.state = ConnectionState.CLOSED;
291 finalClose.compareAndSet(null,closeInfo);
292 event = this.state;
293 }
294 else if (this.state == ConnectionState.OPEN)
295 {
296
297 this.state = ConnectionState.CLOSING;
298 event = this.state;
299
300
301 if (closeInfo.isAbnormal())
302 {
303 abnormalEvent = ConnectionState.CLOSED;
304 finalClose.compareAndSet(null,closeInfo);
305 cleanClose = false;
306 outputAvailable = false;
307 inputAvailable = false;
308 closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
309 }
310 }
311 }
312
313
314 if (event != null)
315 {
316 notifyStateListeners(event);
317 if (abnormalEvent != null)
318 {
319 notifyStateListeners(abnormalEvent);
320 }
321 }
322 }
323
324
325
326
327
328 public void onCloseRemote(CloseInfo closeInfo)
329 {
330 if (LOG.isDebugEnabled())
331 LOG.debug("onCloseRemote({})", closeInfo);
332 ConnectionState event = null;
333 synchronized (this)
334 {
335 if (this.state == ConnectionState.CLOSED)
336 {
337
338 return;
339 }
340
341 if (LOG.isDebugEnabled())
342 LOG.debug("onCloseRemote(), input={}, output={}", inputAvailable, outputAvailable);
343
344 this.closeInfo = closeInfo;
345
346
347 inputAvailable = false;
348
349 if (closeHandshakeSource == CloseHandshakeSource.NONE)
350 {
351 closeHandshakeSource = CloseHandshakeSource.REMOTE;
352 }
353
354 if (!outputAvailable)
355 {
356 LOG.debug("Close Handshake satisfied, disconnecting");
357 cleanClose = true;
358 state = ConnectionState.CLOSED;
359 finalClose.compareAndSet(null,closeInfo);
360 event = this.state;
361 }
362 else if (this.state == ConnectionState.OPEN)
363 {
364
365 this.state = ConnectionState.CLOSING;
366 event = this.state;
367 }
368 }
369
370
371 if (event != null)
372 {
373 notifyStateListeners(event);
374 }
375 }
376
377
378
379
380
381
382 public void onConnected()
383 {
384 ConnectionState event = null;
385 synchronized (this)
386 {
387 if (this.state != ConnectionState.CONNECTING)
388 {
389 LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
390 return;
391 }
392
393 this.state = ConnectionState.CONNECTED;
394 inputAvailable = false;
395 outputAvailable = true;
396 event = this.state;
397 }
398 notifyStateListeners(event);
399 }
400
401
402
403
404 public void onFailedUpgrade()
405 {
406 assert (this.state == ConnectionState.CONNECTING);
407 ConnectionState event = null;
408 synchronized (this)
409 {
410 this.state = ConnectionState.CLOSED;
411 cleanClose = false;
412 inputAvailable = false;
413 outputAvailable = false;
414 event = this.state;
415 }
416 notifyStateListeners(event);
417 }
418
419
420
421
422 public void onOpened()
423 {
424 if(LOG.isDebugEnabled())
425 LOG.debug(" onOpened()");
426
427 ConnectionState event = null;
428 synchronized (this)
429 {
430 if (this.state == ConnectionState.OPEN)
431 {
432
433 return;
434 }
435
436 if (this.state != ConnectionState.CONNECTED)
437 {
438 LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
439 return;
440 }
441
442 this.state = ConnectionState.OPEN;
443 this.inputAvailable = true;
444 this.outputAvailable = true;
445 event = this.state;
446 }
447 notifyStateListeners(event);
448 }
449
450
451
452
453
454
455
456 public void onReadFailure(Throwable t)
457 {
458 ConnectionState event = null;
459 synchronized (this)
460 {
461 if (this.state == ConnectionState.CLOSED)
462 {
463
464 return;
465 }
466
467
468 String reason = "WebSocket Read Failure";
469 if (t instanceof EOFException)
470 {
471 reason = "WebSocket Read EOF";
472 Throwable cause = t.getCause();
473 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
474 {
475 reason = "EOF: " + cause.getMessage();
476 }
477 }
478 else
479 {
480 if (StringUtil.isNotBlank(t.getMessage()))
481 {
482 reason = t.getMessage();
483 }
484 }
485
486 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
487
488 finalClose.compareAndSet(null,close);
489
490 this.cleanClose = false;
491 this.state = ConnectionState.CLOSED;
492 this.closeInfo = close;
493 this.inputAvailable = false;
494 this.outputAvailable = false;
495 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
496 event = this.state;
497 }
498 notifyStateListeners(event);
499 }
500
501
502
503
504
505
506
507 public void onWriteFailure(Throwable t)
508 {
509 ConnectionState event = null;
510 synchronized (this)
511 {
512 if (this.state == ConnectionState.CLOSED)
513 {
514
515 return;
516 }
517
518
519 String reason = "WebSocket Write Failure";
520 if (t instanceof EOFException)
521 {
522 reason = "WebSocket Write EOF";
523 Throwable cause = t.getCause();
524 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
525 {
526 reason = "EOF: " + cause.getMessage();
527 }
528 }
529 else
530 {
531 if (StringUtil.isNotBlank(t.getMessage()))
532 {
533 reason = t.getMessage();
534 }
535 }
536
537 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
538
539 finalClose.compareAndSet(null,close);
540
541 this.cleanClose = false;
542 this.state = ConnectionState.CLOSED;
543 this.inputAvailable = false;
544 this.outputAvailable = false;
545 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
546 event = this.state;
547 }
548 notifyStateListeners(event);
549 }
550
551 public void onDisconnected()
552 {
553 ConnectionState event = null;
554 synchronized (this)
555 {
556 if (this.state == ConnectionState.CLOSED)
557 {
558
559 return;
560 }
561
562 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
563
564 this.cleanClose = false;
565 this.state = ConnectionState.CLOSED;
566 this.closeInfo = close;
567 this.inputAvailable = false;
568 this.outputAvailable = false;
569 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
570 event = this.state;
571 }
572 notifyStateListeners(event);
573 }
574
575 @Override
576 public String toString()
577 {
578 StringBuilder str = new StringBuilder();
579 str.append(this.getClass().getSimpleName());
580 str.append("@").append(Integer.toHexString(hashCode()));
581 str.append("[").append(state);
582 str.append(',');
583 if (!inputAvailable)
584 {
585 str.append('!');
586 }
587 str.append("in,");
588 if (!outputAvailable)
589 {
590 str.append('!');
591 }
592 str.append("out");
593 if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
594 {
595 CloseInfo ci = finalClose.get();
596 if (ci != null)
597 {
598 str.append(",finalClose=").append(ci);
599 }
600 else
601 {
602 str.append(",close=").append(closeInfo);
603 }
604 str.append(",clean=").append(cleanClose);
605 str.append(",closeSource=").append(closeHandshakeSource);
606 }
607 str.append(']');
608 return str.toString();
609 }
610
611 public boolean wasAbnormalClose()
612 {
613 return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
614 }
615
616 public boolean wasCleanClose()
617 {
618 return cleanClose;
619 }
620
621 public boolean wasLocalCloseInitiated()
622 {
623 return closeHandshakeSource == CloseHandshakeSource.LOCAL;
624 }
625
626 public boolean wasRemoteCloseInitiated()
627 {
628 return closeHandshakeSource == CloseHandshakeSource.REMOTE;
629 }
630
631 }