1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketTimeoutException;
25 import java.nio.ByteBuffer;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.ByteBufferPool;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Scheduler;
43 import org.eclipse.jetty.websocket.api.BatchMode;
44 import org.eclipse.jetty.websocket.api.CloseException;
45 import org.eclipse.jetty.websocket.api.StatusCode;
46 import org.eclipse.jetty.websocket.api.SuspendToken;
47 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48 import org.eclipse.jetty.websocket.api.WriteCallback;
49 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50 import org.eclipse.jetty.websocket.api.extensions.Frame;
51 import org.eclipse.jetty.websocket.common.CloseInfo;
52 import org.eclipse.jetty.websocket.common.ConnectionState;
53 import org.eclipse.jetty.websocket.common.Generator;
54 import org.eclipse.jetty.websocket.common.LogicalConnection;
55 import org.eclipse.jetty.websocket.common.Parser;
56 import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
57
58
59
60
61 public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
62 {
63 private class Flusher extends FrameFlusher
64 {
65 private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
66 {
67 super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
68 }
69
70 @Override
71 protected void onFailure(Throwable x)
72 {
73 notifyError(x);
74
75 if (ioState.wasAbnormalClose())
76 {
77 LOG.ignore(x);
78 return;
79 }
80
81 if (LOG.isDebugEnabled())
82 LOG.debug("Write flush failure",x);
83 ioState.onWriteFailure(x);
84 }
85 }
86
87 public class OnDisconnectCallback implements WriteCallback
88 {
89 private final boolean outputOnly;
90
91 public OnDisconnectCallback(boolean outputOnly)
92 {
93 this.outputOnly = outputOnly;
94 }
95
96 @Override
97 public void writeFailed(Throwable x)
98 {
99 disconnect(outputOnly);
100 }
101
102 @Override
103 public void writeSuccess()
104 {
105 disconnect(outputOnly);
106 }
107 }
108
109 public class OnCloseLocalCallback implements WriteCallback
110 {
111 private final WriteCallback callback;
112 private final CloseInfo close;
113
114 public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
115 {
116 this.callback = callback;
117 this.close = close;
118 }
119
120 public OnCloseLocalCallback(CloseInfo close)
121 {
122 this(null,close);
123 }
124
125 @Override
126 public void writeFailed(Throwable x)
127 {
128 try
129 {
130 if (callback != null)
131 {
132 callback.writeFailed(x);
133 }
134 }
135 finally
136 {
137 onLocalClose();
138 }
139 }
140
141 @Override
142 public void writeSuccess()
143 {
144 try
145 {
146 if (callback != null)
147 {
148 callback.writeSuccess();
149 }
150 }
151 finally
152 {
153 onLocalClose();
154 }
155 }
156
157 private void onLocalClose()
158 {
159 if (LOG_CLOSE.isDebugEnabled())
160 LOG_CLOSE.debug("Local Close Confirmed {}",close);
161 if (close.isAbnormal())
162 {
163 ioState.onAbnormalClose(close);
164 }
165 else
166 {
167 ioState.onCloseLocal(close);
168 }
169 }
170 }
171
172 public static class Stats
173 {
174 private AtomicLong countFillInterestedEvents = new AtomicLong(0);
175 private AtomicLong countOnFillableEvents = new AtomicLong(0);
176 private AtomicLong countFillableErrors = new AtomicLong(0);
177
178 public long getFillableErrorCount()
179 {
180 return countFillableErrors.get();
181 }
182
183 public long getFillInterestedCount()
184 {
185 return countFillInterestedEvents.get();
186 }
187
188 public long getOnFillableCount()
189 {
190 return countOnFillableEvents.get();
191 }
192 }
193
194 private static enum ReadMode
195 {
196 PARSE,
197 DISCARD,
198 EOF
199 }
200
201 private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
202 private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_OPEN");
203 private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_CLOSE");
204
205
206
207
208 private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
209
210 private final ByteBufferPool bufferPool;
211 private final Scheduler scheduler;
212 private final Generator generator;
213 private final Parser parser;
214 private final WebSocketPolicy policy;
215 private final AtomicBoolean suspendToken;
216 private final FrameFlusher flusher;
217 private List<ExtensionConfig> extensions;
218 private boolean isFilling;
219 private ByteBuffer prefillBuffer;
220 private ReadMode readMode = ReadMode.PARSE;
221 private IOState ioState;
222 private Stats stats = new Stats();
223
224 public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
225 {
226 super(endp,executor);
227 this.policy = policy;
228 this.bufferPool = bufferPool;
229 this.generator = new Generator(policy,bufferPool);
230 this.parser = new Parser(policy,bufferPool);
231 this.scheduler = scheduler;
232 this.extensions = new ArrayList<>();
233 this.suspendToken = new AtomicBoolean(false);
234 this.ioState = new IOState();
235 this.ioState.addListener(this);
236 this.flusher = new Flusher(bufferPool,generator,endp);
237 this.setInputBufferSize(policy.getInputBufferSize());
238 this.setMaxIdleTimeout(policy.getIdleTimeout());
239 }
240
241 @Override
242 public Executor getExecutor()
243 {
244 return super.getExecutor();
245 }
246
247
248
249
250 @Override
251 public void close()
252 {
253 if(LOG_CLOSE.isDebugEnabled())
254 LOG_CLOSE.debug(".close()");
255 CloseInfo close = new CloseInfo();
256 this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
257 }
258
259
260
261
262
263
264
265
266
267
268
269
270
271 @Override
272 public void close(int statusCode, String reason)
273 {
274 if (LOG_CLOSE.isDebugEnabled())
275 LOG_CLOSE.debug("close({},{})",statusCode,reason);
276 CloseInfo close = new CloseInfo(statusCode,reason);
277 this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
278 }
279
280 @Override
281 public void disconnect()
282 {
283 if (LOG_CLOSE.isDebugEnabled())
284 LOG_CLOSE.debug("{} disconnect()",policy.getBehavior());
285 disconnect(false);
286 }
287
288 private void disconnect(boolean onlyOutput)
289 {
290 if (LOG_CLOSE.isDebugEnabled())
291 LOG_CLOSE.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
292
293 flusher.close();
294 EndPoint endPoint = getEndPoint();
295
296
297 if (LOG_CLOSE.isDebugEnabled())
298 LOG_CLOSE.debug("Shutting down output {}",endPoint);
299 endPoint.shutdownOutput();
300 if (!onlyOutput)
301 {
302 if (LOG_CLOSE.isDebugEnabled())
303 LOG_CLOSE.debug("Closing {}",endPoint);
304 endPoint.close();
305 }
306 }
307
308 protected void execute(Runnable task)
309 {
310 try
311 {
312 getExecutor().execute(task);
313 }
314 catch (RejectedExecutionException e)
315 {
316 if (LOG.isDebugEnabled())
317 LOG.debug("Job not dispatched: {}",task);
318 }
319 }
320
321 @Override
322 public void fillInterested()
323 {
324 stats.countFillInterestedEvents.incrementAndGet();
325 super.fillInterested();
326 }
327
328 @Override
329 public ByteBufferPool getBufferPool()
330 {
331 return bufferPool;
332 }
333
334
335
336
337
338
339
340
341 public List<ExtensionConfig> getExtensions()
342 {
343 return extensions;
344 }
345
346 public Generator getGenerator()
347 {
348 return generator;
349 }
350
351 @Override
352 public long getIdleTimeout()
353 {
354 return getEndPoint().getIdleTimeout();
355 }
356
357 @Override
358 public IOState getIOState()
359 {
360 return ioState;
361 }
362
363 @Override
364 public long getMaxIdleTimeout()
365 {
366 return getEndPoint().getIdleTimeout();
367 }
368
369 public Parser getParser()
370 {
371 return parser;
372 }
373
374 @Override
375 public WebSocketPolicy getPolicy()
376 {
377 return this.policy;
378 }
379
380 @Override
381 public InetSocketAddress getRemoteAddress()
382 {
383 return getEndPoint().getRemoteAddress();
384 }
385
386 public Scheduler getScheduler()
387 {
388 return scheduler;
389 }
390
391 public Stats getStats()
392 {
393 return stats;
394 }
395
396 @Override
397 public boolean isOpen()
398 {
399 return getIOState().isOpen() && getEndPoint().isOpen();
400 }
401
402 @Override
403 public boolean isReading()
404 {
405 return isFilling;
406 }
407
408
409
410
411
412
413 @Override
414 public void onClose()
415 {
416 if (LOG.isDebugEnabled())
417 LOG.debug("{} onClose()",policy.getBehavior());
418 super.onClose();
419 ioState.onDisconnected();
420 flusher.close();
421 }
422
423 @Override
424 public void onConnectionStateChange(ConnectionState state)
425 {
426 if (LOG_CLOSE.isDebugEnabled())
427 LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
428
429 switch (state)
430 {
431 case OPEN:
432 if (BufferUtil.hasContent(prefillBuffer))
433 {
434 if (LOG.isDebugEnabled())
435 {
436 LOG.debug("Parsing Upgrade prefill buffer ({} remaining)",prefillBuffer.remaining());
437 }
438 parser.parse(prefillBuffer);
439 }
440 if (LOG.isDebugEnabled())
441 {
442 LOG.debug("OPEN: normal fillInterested");
443 }
444
445
446 fillInterested();
447 break;
448 case CLOSED:
449 if (LOG_CLOSE.isDebugEnabled())
450 LOG_CLOSE.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
451 if (ioState.wasAbnormalClose())
452 {
453
454 CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
455 outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
456 }
457 else
458 {
459
460 this.disconnect(false);
461 }
462 break;
463 case CLOSING:
464 if (LOG_CLOSE.isDebugEnabled())
465 LOG_CLOSE.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
466
467 if (ioState.wasRemoteCloseInitiated())
468 {
469 CloseInfo close = ioState.getCloseInfo();
470
471 outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
472 }
473 default:
474 break;
475 }
476 }
477
478 @Override
479 public void onFillable()
480 {
481 if (LOG.isDebugEnabled())
482 LOG.debug("{} onFillable()",policy.getBehavior());
483 stats.countOnFillableEvents.incrementAndGet();
484
485 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
486
487 try
488 {
489 isFilling = true;
490
491 if(readMode == ReadMode.PARSE)
492 {
493 readMode = readParse(buffer);
494 }
495 else
496 {
497 readMode = readDiscard(buffer);
498 }
499 }
500 finally
501 {
502 bufferPool.release(buffer);
503 }
504
505 if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
506 {
507 fillInterested();
508 }
509 else
510 {
511 isFilling = false;
512 }
513 }
514
515
516
517 @Override
518 protected void onFillInterestedFailed(Throwable cause)
519 {
520 LOG.ignore(cause);
521 stats.countFillInterestedEvents.incrementAndGet();
522 super.onFillInterestedFailed(cause);
523 }
524
525
526
527
528
529
530
531 protected void setInitialBuffer(ByteBuffer prefilled)
532 {
533 if (LOG.isDebugEnabled())
534 {
535 LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled));
536 }
537 prefillBuffer = prefilled;
538 }
539
540 private void notifyError(Throwable t)
541 {
542 getParser().getIncomingFramesHandler().incomingError(t);
543 }
544
545 @Override
546 public void onOpen()
547 {
548 if(LOG_OPEN.isDebugEnabled())
549 LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
550 super.onOpen();
551 this.ioState.onOpened();
552 }
553
554
555
556
557 @Override
558 protected boolean onReadTimeout()
559 {
560 IOState state = getIOState();
561 ConnectionState cstate = state.getConnectionState();
562 if (LOG_CLOSE.isDebugEnabled())
563 LOG_CLOSE.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
564
565 if (cstate == ConnectionState.CLOSED)
566 {
567 if (LOG_CLOSE.isDebugEnabled())
568 LOG_CLOSE.debug("onReadTimeout - Connection Already CLOSED");
569
570
571 return true;
572 }
573
574 try
575 {
576 notifyError(new SocketTimeoutException("Timeout on Read"));
577 }
578 finally
579 {
580
581 close(StatusCode.SHUTDOWN,"Idle Timeout");
582 }
583
584 return false;
585 }
586
587
588
589
590 @Override
591 public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
592 {
593 if (LOG.isDebugEnabled())
594 {
595 LOG.debug("outgoingFrame({}, {})",frame,callback);
596 }
597
598 flusher.enqueue(frame,callback,batchMode);
599 }
600
601 private ReadMode readDiscard(ByteBuffer buffer)
602 {
603 EndPoint endPoint = getEndPoint();
604 try
605 {
606 while (true)
607 {
608 int filled = endPoint.fill(buffer);
609 if (filled == 0)
610 {
611 return ReadMode.DISCARD;
612 }
613 else if (filled < 0)
614 {
615 if (LOG_CLOSE.isDebugEnabled())
616 LOG_CLOSE.debug("read - EOF Reached (remote: {})",getRemoteAddress());
617 return ReadMode.EOF;
618 }
619 else
620 {
621 if (LOG_CLOSE.isDebugEnabled())
622 LOG_CLOSE.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
623 }
624 }
625 }
626 catch (IOException e)
627 {
628 LOG.ignore(e);
629 return ReadMode.EOF;
630 }
631 catch (Throwable t)
632 {
633 LOG.ignore(t);
634 return ReadMode.DISCARD;
635 }
636 }
637
638 private ReadMode readParse(ByteBuffer buffer)
639 {
640 EndPoint endPoint = getEndPoint();
641 try
642 {
643
644 while(true)
645 {
646 int filled = endPoint.fill(buffer);
647 if (filled < 0)
648 {
649 LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
650 ioState.onReadFailure(new EOFException("Remote Read EOF"));
651 return ReadMode.EOF;
652 }
653 else if (filled == 0)
654 {
655
656 return ReadMode.PARSE;
657 }
658
659 if (LOG.isDebugEnabled())
660 {
661 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
662 }
663 parser.parse(buffer);
664 }
665 }
666 catch (IOException e)
667 {
668 LOG.warn(e);
669 close(StatusCode.PROTOCOL,e.getMessage());
670 return ReadMode.DISCARD;
671 }
672 catch (CloseException e)
673 {
674 LOG.debug(e);
675 close(e.getStatusCode(),e.getMessage());
676 return ReadMode.DISCARD;
677 }
678 catch (Throwable t)
679 {
680 LOG.warn(t);
681 close(StatusCode.ABNORMAL,t.getMessage());
682
683 return ReadMode.DISCARD;
684 }
685 }
686
687 @Override
688 public void resume()
689 {
690 if (suspendToken.getAndSet(false))
691 {
692 fillInterested();
693 }
694 }
695
696
697
698
699
700
701
702
703
704 public void setExtensions(List<ExtensionConfig> extensions)
705 {
706 this.extensions = extensions;
707 }
708
709 @Override
710 public void setInputBufferSize(int inputBufferSize)
711 {
712 if (inputBufferSize < MIN_BUFFER_SIZE)
713 {
714 throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
715 }
716 super.setInputBufferSize(inputBufferSize);
717 }
718
719 @Override
720 public void setMaxIdleTimeout(long ms)
721 {
722 getEndPoint().setIdleTimeout(ms);
723 }
724
725 @Override
726 public SuspendToken suspend()
727 {
728 suspendToken.set(true);
729 return this;
730 }
731
732 @Override
733 public String dump()
734 {
735 return ContainerLifeCycle.dump(this);
736 }
737
738 @Override
739 public void dump(Appendable out, String indent) throws IOException
740 {
741 out.append(toString()).append(System.lineSeparator());
742 }
743
744 @Override
745 public String toString()
746 {
747 return String.format("%s@%X{endp=%s,ios=%s,f=%s,g=%s,p=%s}",getClass().getSimpleName(),hashCode(),getEndPoint(),ioState,flusher,generator,parser);
748 }
749
750
751
752
753
754
755 @Override
756 public void onUpgradeTo(ByteBuffer prefilled)
757 {
758 setInitialBuffer(prefilled);
759 }
760 }