1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketTimeoutException;
25 import java.nio.ByteBuffer;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.ByteBufferPool;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.util.BufferUtil;
37 import org.eclipse.jetty.util.component.ContainerLifeCycle;
38 import org.eclipse.jetty.util.component.Dumpable;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41 import org.eclipse.jetty.util.thread.Scheduler;
42 import org.eclipse.jetty.websocket.api.BatchMode;
43 import org.eclipse.jetty.websocket.api.CloseException;
44 import org.eclipse.jetty.websocket.api.StatusCode;
45 import org.eclipse.jetty.websocket.api.SuspendToken;
46 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
47 import org.eclipse.jetty.websocket.api.WriteCallback;
48 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
49 import org.eclipse.jetty.websocket.api.extensions.Frame;
50 import org.eclipse.jetty.websocket.common.CloseInfo;
51 import org.eclipse.jetty.websocket.common.ConnectionState;
52 import org.eclipse.jetty.websocket.common.Generator;
53 import org.eclipse.jetty.websocket.common.LogicalConnection;
54 import org.eclipse.jetty.websocket.common.Parser;
55 import org.eclipse.jetty.websocket.common.WebSocketSession;
56 import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
57
58
59
60
61 public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable
62 {
63 private class Flusher extends FrameFlusher
64 {
65 private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
66 {
67 super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
68 }
69
70 @Override
71 protected void onFailure(Throwable x)
72 {
73 session.notifyError(x);
74
75 if (ioState.wasAbnormalClose())
76 {
77 LOG.ignore(x);
78 return;
79 }
80
81 if (LOG.isDebugEnabled())
82 LOG.debug("Write flush failure",x);
83 ioState.onWriteFailure(x);
84 }
85 }
86
87 public class OnDisconnectCallback implements WriteCallback
88 {
89 private final boolean outputOnly;
90
91 public OnDisconnectCallback(boolean outputOnly)
92 {
93 this.outputOnly = outputOnly;
94 }
95
96 @Override
97 public void writeFailed(Throwable x)
98 {
99 disconnect(outputOnly);
100 }
101
102 @Override
103 public void writeSuccess()
104 {
105 disconnect(outputOnly);
106 }
107 }
108
109 public class OnCloseLocalCallback implements WriteCallback
110 {
111 private final WriteCallback callback;
112 private final CloseInfo close;
113
114 public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
115 {
116 this.callback = callback;
117 this.close = close;
118 }
119
120 public OnCloseLocalCallback(CloseInfo close)
121 {
122 this(null,close);
123 }
124
125 @Override
126 public void writeFailed(Throwable x)
127 {
128 try
129 {
130 if (callback != null)
131 {
132 callback.writeFailed(x);
133 }
134 }
135 finally
136 {
137 onLocalClose();
138 }
139 }
140
141 @Override
142 public void writeSuccess()
143 {
144 try
145 {
146 if (callback != null)
147 {
148 callback.writeSuccess();
149 }
150 }
151 finally
152 {
153 onLocalClose();
154 }
155 }
156
157 private void onLocalClose()
158 {
159 if (LOG.isDebugEnabled())
160 LOG.debug("Local Close Confirmed {}",close);
161 if (close.isAbnormal())
162 {
163 ioState.onAbnormalClose(close);
164 }
165 else
166 {
167 ioState.onCloseLocal(close);
168 }
169 }
170 }
171
172 public static class Stats
173 {
174 private AtomicLong countFillInterestedEvents = new AtomicLong(0);
175 private AtomicLong countOnFillableEvents = new AtomicLong(0);
176 private AtomicLong countFillableErrors = new AtomicLong(0);
177
178 public long getFillableErrorCount()
179 {
180 return countFillableErrors.get();
181 }
182
183 public long getFillInterestedCount()
184 {
185 return countFillInterestedEvents.get();
186 }
187
188 public long getOnFillableCount()
189 {
190 return countOnFillableEvents.get();
191 }
192 }
193
194 private static enum ReadMode
195 {
196 PARSE,
197 DISCARD,
198 EOF
199 }
200
201 private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
202
203
204
205
206 private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
207
208 private final ByteBufferPool bufferPool;
209 private final Scheduler scheduler;
210 private final Generator generator;
211 private final Parser parser;
212 private final WebSocketPolicy policy;
213 private final AtomicBoolean suspendToken;
214 private final FrameFlusher flusher;
215 private WebSocketSession session;
216 private List<ExtensionConfig> extensions;
217 private boolean isFilling;
218 private ReadMode readMode = ReadMode.PARSE;
219 private IOState ioState;
220 private Stats stats = new Stats();
221
222 public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
223 {
224 super(endp,executor,EXECUTE_ONFILLABLE);
225 this.policy = policy;
226 this.bufferPool = bufferPool;
227 this.generator = new Generator(policy,bufferPool);
228 this.parser = new Parser(policy,bufferPool);
229 this.scheduler = scheduler;
230 this.extensions = new ArrayList<>();
231 this.suspendToken = new AtomicBoolean(false);
232 this.ioState = new IOState();
233 this.ioState.addListener(this);
234 this.flusher = new Flusher(bufferPool,generator,endp);
235 this.setInputBufferSize(policy.getInputBufferSize());
236 this.setMaxIdleTimeout(policy.getIdleTimeout());
237 }
238
239 @Override
240 public Executor getExecutor()
241 {
242 return super.getExecutor();
243 }
244
245 @Override
246 public void close()
247 {
248 close(StatusCode.NORMAL,null);
249 }
250
251
252
253
254
255
256
257
258
259
260
261
262 @Override
263 public void close(int statusCode, String reason)
264 {
265 if (LOG.isDebugEnabled())
266 LOG.debug("close({},{})",statusCode,reason);
267 CloseInfo close = new CloseInfo(statusCode,reason);
268 this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
269 }
270
271 @Override
272 public void disconnect()
273 {
274 disconnect(false);
275 }
276
277 private void disconnect(boolean onlyOutput)
278 {
279 if (LOG.isDebugEnabled())
280 LOG.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
281
282 flusher.close();
283 EndPoint endPoint = getEndPoint();
284
285
286 if (LOG.isDebugEnabled())
287 LOG.debug("Shutting down output {}",endPoint);
288 endPoint.shutdownOutput();
289 if (!onlyOutput)
290 {
291 LOG.debug("Closing {}",endPoint);
292 endPoint.close();
293 }
294 }
295
296 protected void execute(Runnable task)
297 {
298 try
299 {
300 getExecutor().execute(task);
301 }
302 catch (RejectedExecutionException e)
303 {
304 if (LOG.isDebugEnabled())
305 LOG.debug("Job not dispatched: {}",task);
306 }
307 }
308
309 @Override
310 public void fillInterested()
311 {
312 stats.countFillInterestedEvents.incrementAndGet();
313 super.fillInterested();
314 }
315
316 @Override
317 public ByteBufferPool getBufferPool()
318 {
319 return bufferPool;
320 }
321
322
323
324
325
326
327
328
329 public List<ExtensionConfig> getExtensions()
330 {
331 return extensions;
332 }
333
334 public Generator getGenerator()
335 {
336 return generator;
337 }
338
339 @Override
340 public long getIdleTimeout()
341 {
342 return getEndPoint().getIdleTimeout();
343 }
344
345 @Override
346 public IOState getIOState()
347 {
348 return ioState;
349 }
350
351 @Override
352 public long getMaxIdleTimeout()
353 {
354 return getEndPoint().getIdleTimeout();
355 }
356
357 public Parser getParser()
358 {
359 return parser;
360 }
361
362 @Override
363 public WebSocketPolicy getPolicy()
364 {
365 return this.policy;
366 }
367
368 @Override
369 public InetSocketAddress getRemoteAddress()
370 {
371 return getEndPoint().getRemoteAddress();
372 }
373
374 public Scheduler getScheduler()
375 {
376 return scheduler;
377 }
378
379 @Override
380 public WebSocketSession getSession()
381 {
382 return session;
383 }
384
385 public Stats getStats()
386 {
387 return stats;
388 }
389
390 @Override
391 public boolean isOpen()
392 {
393 return getIOState().isOpen() && getEndPoint().isOpen();
394 }
395
396 @Override
397 public boolean isReading()
398 {
399 return isFilling;
400 }
401
402
403
404
405
406
407 @Override
408 public void onClose()
409 {
410 if (LOG.isDebugEnabled())
411 LOG.debug("{} onClose()",policy.getBehavior());
412 super.onClose();
413
414 flusher.close();
415 }
416
417 @Override
418 public void onConnectionStateChange(ConnectionState state)
419 {
420 if (LOG.isDebugEnabled())
421 LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
422 switch (state)
423 {
424 case OPEN:
425 if (LOG.isDebugEnabled())
426 LOG.debug("fillInterested");
427 fillInterested();
428 break;
429 case CLOSED:
430 if (ioState.wasAbnormalClose())
431 {
432
433 CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
434 outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
435 }
436 else
437 {
438
439 this.disconnect(false);
440 }
441 break;
442 case CLOSING:
443
444 if (ioState.wasRemoteCloseInitiated())
445 {
446 CloseInfo close = ioState.getCloseInfo();
447
448 outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
449 }
450 default:
451 break;
452 }
453 }
454
455 @Override
456 public void onFillable()
457 {
458 if (LOG.isDebugEnabled())
459 LOG.debug("{} onFillable()",policy.getBehavior());
460 stats.countOnFillableEvents.incrementAndGet();
461 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
462 try
463 {
464 isFilling = true;
465
466 if(readMode == ReadMode.PARSE)
467 {
468 readMode = readParse(buffer);
469 } else
470 {
471 readMode = readDiscard(buffer);
472 }
473 }
474 finally
475 {
476 bufferPool.release(buffer);
477 }
478
479 if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
480 {
481 fillInterested();
482 }
483 else
484 {
485 isFilling = false;
486 }
487 }
488
489
490
491 @Override
492 protected void onFillInterestedFailed(Throwable cause)
493 {
494 LOG.ignore(cause);
495 stats.countFillInterestedEvents.incrementAndGet();
496 super.onFillInterestedFailed(cause);
497 }
498
499 @Override
500 public void onOpen()
501 {
502 super.onOpen();
503 this.ioState.onOpened();
504 }
505
506
507
508
509 @Override
510 protected boolean onReadTimeout()
511 {
512 IOState state = getIOState();
513 ConnectionState cstate = state.getConnectionState();
514 if (LOG.isDebugEnabled())
515 LOG.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
516
517 if (cstate == ConnectionState.CLOSED)
518 {
519
520
521 return true;
522 }
523
524 try
525 {
526 session.notifyError(new SocketTimeoutException("Timeout on Read"));
527 }
528 finally
529 {
530
531 close(StatusCode.SHUTDOWN,"Idle Timeout");
532 }
533
534 return false;
535 }
536
537
538
539
540 @Override
541 public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
542 {
543 if (LOG.isDebugEnabled())
544 {
545 LOG.debug("outgoingFrame({}, {})",frame,callback);
546 }
547
548 flusher.enqueue(frame,callback,batchMode);
549 }
550
551 private ReadMode readDiscard(ByteBuffer buffer)
552 {
553 EndPoint endPoint = getEndPoint();
554 try
555 {
556 while (true)
557 {
558 int filled = endPoint.fill(buffer);
559 if (filled == 0)
560 {
561 return ReadMode.DISCARD;
562 }
563 else if (filled < 0)
564 {
565 LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
566 return ReadMode.EOF;
567 }
568 else
569 {
570 if (LOG.isDebugEnabled())
571 {
572 LOG.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
573 }
574 }
575 }
576 }
577 catch (IOException e)
578 {
579 LOG.ignore(e);
580 return ReadMode.EOF;
581 }
582 catch (Throwable t)
583 {
584 LOG.ignore(t);
585 return ReadMode.DISCARD;
586 }
587 }
588
589 private ReadMode readParse(ByteBuffer buffer)
590 {
591 EndPoint endPoint = getEndPoint();
592 try
593 {
594 while (true)
595 {
596 int filled = endPoint.fill(buffer);
597 if (filled == 0)
598 {
599 return ReadMode.PARSE;
600 }
601 else if (filled < 0)
602 {
603 LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
604 ioState.onReadFailure(new EOFException("Remote Read EOF"));
605 return ReadMode.EOF;
606 }
607 else
608 {
609 if (LOG.isDebugEnabled())
610 {
611 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
612 }
613 parser.parse(buffer);
614 }
615 }
616 }
617 catch (IOException e)
618 {
619 LOG.warn(e);
620 close(StatusCode.PROTOCOL,e.getMessage());
621 return ReadMode.DISCARD;
622 }
623 catch (CloseException e)
624 {
625 LOG.debug(e);
626 close(e.getStatusCode(),e.getMessage());
627 return ReadMode.DISCARD;
628 }
629 catch (Throwable t)
630 {
631 LOG.warn(t);
632 close(StatusCode.ABNORMAL,t.getMessage());
633
634 return ReadMode.DISCARD;
635 }
636 }
637
638 @Override
639 public void resume()
640 {
641 if (suspendToken.getAndSet(false))
642 {
643 fillInterested();
644 }
645 }
646
647
648
649
650
651
652
653
654
655 public void setExtensions(List<ExtensionConfig> extensions)
656 {
657 this.extensions = extensions;
658 }
659
660 @Override
661 public void setInputBufferSize(int inputBufferSize)
662 {
663 if (inputBufferSize < MIN_BUFFER_SIZE)
664 {
665 throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
666 }
667 super.setInputBufferSize(inputBufferSize);
668 }
669
670 @Override
671 public void setMaxIdleTimeout(long ms)
672 {
673 getEndPoint().setIdleTimeout(ms);
674 }
675
676 @Override
677 public void setSession(WebSocketSession session)
678 {
679 this.session = session;
680 }
681
682 @Override
683 public SuspendToken suspend()
684 {
685 suspendToken.set(true);
686 return this;
687 }
688
689 @Override
690 public String dump()
691 {
692 return ContainerLifeCycle.dump(this);
693 }
694
695 @Override
696 public void dump(Appendable out, String indent) throws IOException
697 {
698 out.append(toString()).append(System.lineSeparator());
699 }
700
701 @Override
702 public String toString()
703 {
704 return String.format("%s{f=%s,g=%s,p=%s}",super.toString(),flusher,generator,parser);
705 }
706
707 }