1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.concurrent.CopyOnWriteArrayList;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.eclipse.jetty.util.StringUtil;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.websocket.api.CloseStatus;
31 import org.eclipse.jetty.websocket.api.StatusCode;
32 import org.eclipse.jetty.websocket.common.CloseInfo;
33 import org.eclipse.jetty.websocket.common.ConnectionState;
34
35
36
37
38
39
40
41
42
43 public class IOState
44 {
45
46
47
48 private static enum CloseHandshakeSource
49 {
50
51 NONE,
52
53 LOCAL,
54
55 REMOTE,
56
57 ABNORMAL;
58 }
59
60 public static interface ConnectionStateListener
61 {
62 public void onConnectionStateChange(ConnectionState state);
63 }
64
65 private static final Logger LOG = Log.getLogger(IOState.class);
66 private ConnectionState state;
67 private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
68
69
70
71
72
73 private boolean inputAvailable;
74
75
76
77
78 private boolean outputAvailable;
79
80
81
82
83 private CloseHandshakeSource closeHandshakeSource;
84
85
86
87
88
89
90 private CloseInfo closeInfo;
91
92
93
94
95
96 private AtomicReference<CloseInfo> finalClose = new AtomicReference<>();
97
98
99
100
101 private boolean cleanClose;
102
103
104
105
106 public IOState()
107 {
108 this.state = ConnectionState.CONNECTING;
109 this.inputAvailable = false;
110 this.outputAvailable = false;
111 this.closeHandshakeSource = CloseHandshakeSource.NONE;
112 this.closeInfo = null;
113 this.cleanClose = false;
114 }
115
116 public void addListener(ConnectionStateListener listener)
117 {
118 listeners.add(listener);
119 }
120
121 public void assertInputOpen() throws IOException
122 {
123 if (!isInputAvailable())
124 {
125 throw new IOException("Connection input is closed");
126 }
127 }
128
129 public void assertOutputOpen() throws IOException
130 {
131 if (!isOutputAvailable())
132 {
133 throw new IOException("Connection output is closed");
134 }
135 }
136
137 public CloseInfo getCloseInfo()
138 {
139 CloseInfo ci = finalClose.get();
140 if (ci != null)
141 {
142 return ci;
143 }
144 return closeInfo;
145 }
146
147 public ConnectionState getConnectionState()
148 {
149 return state;
150 }
151
152 public boolean isClosed()
153 {
154 synchronized (state)
155 {
156 return (state == ConnectionState.CLOSED);
157 }
158 }
159
160 public boolean isInputAvailable()
161 {
162 return inputAvailable;
163 }
164
165 public boolean isOpen()
166 {
167 return (getConnectionState() != ConnectionState.CLOSED);
168 }
169
170 public boolean isOutputAvailable()
171 {
172 return outputAvailable;
173 }
174
175 private void notifyStateListeners(ConnectionState state)
176 {
177 if (LOG.isDebugEnabled())
178 LOG.debug("Notify State Listeners: {}",state);
179 for (ConnectionStateListener listener : listeners)
180 {
181 if (LOG.isDebugEnabled())
182 {
183 LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
184 }
185 listener.onConnectionStateChange(state);
186 }
187 }
188
189
190
191
192
193
194 public void onAbnormalClose(CloseInfo close)
195 {
196 if (LOG.isDebugEnabled())
197 LOG.debug("onAbnormalClose({})",close);
198 ConnectionState event = null;
199 synchronized (this)
200 {
201 if (this.state == ConnectionState.CLOSED)
202 {
203
204 return;
205 }
206
207 if (this.state == ConnectionState.OPEN)
208 {
209 this.cleanClose = false;
210 }
211
212 this.state = ConnectionState.CLOSED;
213 finalClose.compareAndSet(null,close);
214 this.inputAvailable = false;
215 this.outputAvailable = false;
216 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
217 event = this.state;
218 }
219 notifyStateListeners(event);
220 }
221
222
223
224
225 public void onCloseLocal(CloseInfo close)
226 {
227 ConnectionState event = null;
228 ConnectionState abnormalEvent = null;
229 ConnectionState initialState = this.state;
230 if (LOG.isDebugEnabled())
231 LOG.debug("onCloseLocal({}) : {}",close,initialState);
232 if (initialState == ConnectionState.CLOSED)
233 {
234
235 LOG.debug("already closed");
236 return;
237 }
238
239 if (initialState == ConnectionState.CONNECTED)
240 {
241
242 LOG.debug("FastClose in CONNECTED detected");
243
244 onOpened();
245 if (LOG.isDebugEnabled())
246 LOG.debug("FastClose continuing with Closure");
247 }
248
249 synchronized (this)
250 {
251 closeInfo = close;
252
253 boolean in = inputAvailable;
254 boolean out = outputAvailable;
255 if (closeHandshakeSource == CloseHandshakeSource.NONE)
256 {
257 closeHandshakeSource = CloseHandshakeSource.LOCAL;
258 }
259 out = false;
260 outputAvailable = false;
261
262 LOG.debug("onCloseLocal(), input={}, output={}",in,out);
263
264 if (!in && !out)
265 {
266 LOG.debug("Close Handshake satisfied, disconnecting");
267 cleanClose = true;
268 this.state = ConnectionState.CLOSED;
269 finalClose.compareAndSet(null,close);
270 event = this.state;
271 }
272 else if (this.state == ConnectionState.OPEN)
273 {
274
275 this.state = ConnectionState.CLOSING;
276 event = this.state;
277
278
279 if (close.isAbnormal())
280 {
281 abnormalEvent = ConnectionState.CLOSED;
282 finalClose.compareAndSet(null,close);
283 cleanClose = false;
284 outputAvailable = false;
285 inputAvailable = false;
286 closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
287 }
288 }
289 }
290
291
292 if (event != null)
293 {
294 notifyStateListeners(event);
295
296 if(abnormalEvent != null) {
297 notifyStateListeners(abnormalEvent);
298 }
299 }
300 }
301
302
303
304
305 public void onCloseRemote(CloseInfo close)
306 {
307 if (LOG.isDebugEnabled())
308 LOG.debug("onCloseRemote({})",close);
309 ConnectionState event = null;
310 synchronized (this)
311 {
312 if (this.state == ConnectionState.CLOSED)
313 {
314
315 return;
316 }
317
318 closeInfo = close;
319
320 boolean in = inputAvailable;
321 boolean out = outputAvailable;
322 if (closeHandshakeSource == CloseHandshakeSource.NONE)
323 {
324 closeHandshakeSource = CloseHandshakeSource.REMOTE;
325 }
326 in = false;
327 inputAvailable = false;
328
329 if (LOG.isDebugEnabled())
330 LOG.debug("onCloseRemote(), input={}, output={}",in,out);
331
332 if (!in && !out)
333 {
334 LOG.debug("Close Handshake satisfied, disconnecting");
335 cleanClose = true;
336 state = ConnectionState.CLOSED;
337 finalClose.compareAndSet(null,close);
338 event = this.state;
339 }
340 else if (this.state == ConnectionState.OPEN)
341 {
342
343 this.state = ConnectionState.CLOSING;
344 event = this.state;
345 }
346 }
347
348
349 if (event != null)
350 {
351 notifyStateListeners(event);
352 }
353 }
354
355
356
357
358
359
360 public void onConnected()
361 {
362 ConnectionState event = null;
363 synchronized (this)
364 {
365 if (this.state != ConnectionState.CONNECTING)
366 {
367 LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
368 return;
369 }
370
371 this.state = ConnectionState.CONNECTED;
372 inputAvailable = false;
373 outputAvailable = true;
374 event = this.state;
375 }
376 notifyStateListeners(event);
377 }
378
379
380
381
382 public void onFailedUpgrade()
383 {
384 assert (this.state == ConnectionState.CONNECTING);
385 ConnectionState event = null;
386 synchronized (this)
387 {
388 this.state = ConnectionState.CLOSED;
389 cleanClose = false;
390 inputAvailable = false;
391 outputAvailable = false;
392 event = this.state;
393 }
394 notifyStateListeners(event);
395 }
396
397
398
399
400 public void onOpened()
401 {
402 ConnectionState event = null;
403 synchronized (this)
404 {
405 if (this.state == ConnectionState.OPEN)
406 {
407
408 return;
409 }
410
411 if (this.state != ConnectionState.CONNECTED)
412 {
413 LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
414 return;
415 }
416
417 this.state = ConnectionState.OPEN;
418 this.inputAvailable = true;
419 this.outputAvailable = true;
420 event = this.state;
421 }
422 notifyStateListeners(event);
423 }
424
425
426
427
428
429
430 public void onReadFailure(Throwable t)
431 {
432 ConnectionState event = null;
433 synchronized (this)
434 {
435 if (this.state == ConnectionState.CLOSED)
436 {
437
438 return;
439 }
440
441
442 String reason = "WebSocket Read Failure";
443 if (t instanceof EOFException)
444 {
445 reason = "WebSocket Read EOF";
446 Throwable cause = t.getCause();
447 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
448 {
449 reason = "EOF: " + cause.getMessage();
450 }
451 }
452 else
453 {
454 if (StringUtil.isNotBlank(t.getMessage()))
455 {
456 reason = t.getMessage();
457 }
458 }
459
460 reason = CloseStatus.trimMaxReasonLength(reason);
461 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
462
463 finalClose.compareAndSet(null,close);
464
465 this.cleanClose = false;
466 this.state = ConnectionState.CLOSED;
467 this.closeInfo = close;
468 this.inputAvailable = false;
469 this.outputAvailable = false;
470 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
471 event = this.state;
472 }
473 notifyStateListeners(event);
474 }
475
476
477
478
479
480
481 public void onWriteFailure(Throwable t)
482 {
483 ConnectionState event = null;
484 synchronized (this)
485 {
486 if (this.state == ConnectionState.CLOSED)
487 {
488
489 return;
490 }
491
492
493 String reason = "WebSocket Write Failure";
494 if (t instanceof EOFException)
495 {
496 reason = "WebSocket Write EOF";
497 Throwable cause = t.getCause();
498 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
499 {
500 reason = "EOF: " + cause.getMessage();
501 }
502 }
503 else
504 {
505 if (StringUtil.isNotBlank(t.getMessage()))
506 {
507 reason = t.getMessage();
508 }
509 }
510
511 reason = CloseStatus.trimMaxReasonLength(reason);
512 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
513
514 finalClose.compareAndSet(null,close);
515
516 this.cleanClose = false;
517 this.state = ConnectionState.CLOSED;
518 this.inputAvailable = false;
519 this.outputAvailable = false;
520 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
521 event = this.state;
522 }
523 notifyStateListeners(event);
524 }
525
526 public void onDisconnected()
527 {
528 ConnectionState event = null;
529 synchronized (this)
530 {
531 if (this.state == ConnectionState.CLOSED)
532 {
533
534 return;
535 }
536
537 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
538
539 this.cleanClose = false;
540 this.state = ConnectionState.CLOSED;
541 this.closeInfo = close;
542 this.inputAvailable = false;
543 this.outputAvailable = false;
544 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
545 event = this.state;
546 }
547 notifyStateListeners(event);
548 }
549
550 @Override
551 public String toString()
552 {
553 StringBuilder str = new StringBuilder();
554 str.append(this.getClass().getSimpleName());
555 str.append("@").append(Integer.toHexString(hashCode()));
556 str.append("[").append(state);
557 str.append(',');
558 if (!inputAvailable)
559 {
560 str.append('!');
561 }
562 str.append("in,");
563 if (!outputAvailable)
564 {
565 str.append('!');
566 }
567 str.append("out");
568 if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
569 {
570 CloseInfo ci = finalClose.get();
571 if (ci != null)
572 {
573 str.append(",finalClose=").append(ci);
574 }
575 else
576 {
577 str.append(",close=").append(closeInfo);
578 }
579 str.append(",clean=").append(cleanClose);
580 str.append(",closeSource=").append(closeHandshakeSource);
581 }
582 str.append(']');
583 return str.toString();
584 }
585
586 public boolean wasAbnormalClose()
587 {
588 return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
589 }
590
591 public boolean wasCleanClose()
592 {
593 return cleanClose;
594 }
595
596 public boolean wasLocalCloseInitiated()
597 {
598 return closeHandshakeSource == CloseHandshakeSource.LOCAL;
599 }
600
601 public boolean wasRemoteCloseInitiated()
602 {
603 return closeHandshakeSource == CloseHandshakeSource.REMOTE;
604 }
605
606 }