1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketTimeoutException;
25 import java.nio.ByteBuffer;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.ByteBufferPool;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Scheduler;
43 import org.eclipse.jetty.websocket.api.BatchMode;
44 import org.eclipse.jetty.websocket.api.CloseException;
45 import org.eclipse.jetty.websocket.api.StatusCode;
46 import org.eclipse.jetty.websocket.api.SuspendToken;
47 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48 import org.eclipse.jetty.websocket.api.WriteCallback;
49 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50 import org.eclipse.jetty.websocket.api.extensions.Frame;
51 import org.eclipse.jetty.websocket.common.CloseInfo;
52 import org.eclipse.jetty.websocket.common.ConnectionState;
53 import org.eclipse.jetty.websocket.common.Generator;
54 import org.eclipse.jetty.websocket.common.LogicalConnection;
55 import org.eclipse.jetty.websocket.common.Parser;
56 import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
57
58
59
60
61 public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
62 {
63 private class Flusher extends FrameFlusher
64 {
65 private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
66 {
67 super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
68 }
69
70 @Override
71 protected void onFailure(Throwable x)
72 {
73 notifyError(x);
74
75 if (ioState.wasAbnormalClose())
76 {
77 LOG.ignore(x);
78 return;
79 }
80
81 if (LOG.isDebugEnabled())
82 LOG.debug("Write flush failure",x);
83 ioState.onWriteFailure(x);
84 }
85 }
86
87 public class OnDisconnectCallback implements WriteCallback
88 {
89 private final boolean outputOnly;
90
91 public OnDisconnectCallback(boolean outputOnly)
92 {
93 this.outputOnly = outputOnly;
94 }
95
96 @Override
97 public void writeFailed(Throwable x)
98 {
99 disconnect(outputOnly);
100 }
101
102 @Override
103 public void writeSuccess()
104 {
105 disconnect(outputOnly);
106 }
107 }
108
109 public class OnCloseLocalCallback implements WriteCallback
110 {
111 private final WriteCallback callback;
112 private final CloseInfo close;
113
114 public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
115 {
116 this.callback = callback;
117 this.close = close;
118 }
119
120 public OnCloseLocalCallback(CloseInfo close)
121 {
122 this(null,close);
123 }
124
125 @Override
126 public void writeFailed(Throwable x)
127 {
128 try
129 {
130 if (callback != null)
131 {
132 callback.writeFailed(x);
133 }
134 }
135 finally
136 {
137 onLocalClose();
138 }
139 }
140
141 @Override
142 public void writeSuccess()
143 {
144 try
145 {
146 if (callback != null)
147 {
148 callback.writeSuccess();
149 }
150 }
151 finally
152 {
153 onLocalClose();
154 }
155 }
156
157 private void onLocalClose()
158 {
159 if (LOG_CLOSE.isDebugEnabled())
160 LOG_CLOSE.debug("Local Close Confirmed {}",close);
161 if (close.isAbnormal())
162 {
163 ioState.onAbnormalClose(close);
164 }
165 else
166 {
167 ioState.onCloseLocal(close);
168 }
169 }
170 }
171
172 public static class Stats
173 {
174 private AtomicLong countFillInterestedEvents = new AtomicLong(0);
175 private AtomicLong countOnFillableEvents = new AtomicLong(0);
176 private AtomicLong countFillableErrors = new AtomicLong(0);
177
178 public long getFillableErrorCount()
179 {
180 return countFillableErrors.get();
181 }
182
183 public long getFillInterestedCount()
184 {
185 return countFillInterestedEvents.get();
186 }
187
188 public long getOnFillableCount()
189 {
190 return countOnFillableEvents.get();
191 }
192 }
193
194 private static enum ReadMode
195 {
196 PARSE,
197 DISCARD,
198 EOF
199 }
200
201 private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
202 private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_OPEN");
203 private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_CLOSE");
204
205
206
207
208 private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
209
210 private final ByteBufferPool bufferPool;
211 private final Scheduler scheduler;
212 private final Generator generator;
213 private final Parser parser;
214 private final WebSocketPolicy policy;
215 private final AtomicBoolean suspendToken;
216 private final FrameFlusher flusher;
217 private final String id;
218 private List<ExtensionConfig> extensions;
219 private boolean isFilling;
220 private ByteBuffer prefillBuffer;
221 private ReadMode readMode = ReadMode.PARSE;
222 private IOState ioState;
223 private Stats stats = new Stats();
224
225 public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
226 {
227 super(endp,executor);
228 this.id = String.format("%s:%d->%s:%d",
229 endp.getLocalAddress().getAddress().getHostAddress(),
230 endp.getLocalAddress().getPort(),
231 endp.getRemoteAddress().getAddress().getHostAddress(),
232 endp.getRemoteAddress().getPort());
233 this.policy = policy;
234 this.bufferPool = bufferPool;
235 this.generator = new Generator(policy,bufferPool);
236 this.parser = new Parser(policy,bufferPool);
237 this.scheduler = scheduler;
238 this.extensions = new ArrayList<>();
239 this.suspendToken = new AtomicBoolean(false);
240 this.ioState = new IOState();
241 this.ioState.addListener(this);
242 this.flusher = new Flusher(bufferPool,generator,endp);
243 this.setInputBufferSize(policy.getInputBufferSize());
244 this.setMaxIdleTimeout(policy.getIdleTimeout());
245 }
246
247 @Override
248 public Executor getExecutor()
249 {
250 return super.getExecutor();
251 }
252
253
254
255
256 @Override
257 public void close()
258 {
259 if(LOG_CLOSE.isDebugEnabled())
260 LOG_CLOSE.debug(".close()");
261 CloseInfo close = new CloseInfo();
262 this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
263 }
264
265
266
267
268
269
270
271
272
273
274
275
276
277 @Override
278 public void close(int statusCode, String reason)
279 {
280 if (LOG_CLOSE.isDebugEnabled())
281 LOG_CLOSE.debug("close({},{})",statusCode,reason);
282 CloseInfo close = new CloseInfo(statusCode,reason);
283 this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
284 }
285
286 @Override
287 public void disconnect()
288 {
289 if (LOG_CLOSE.isDebugEnabled())
290 LOG_CLOSE.debug("{} disconnect()",policy.getBehavior());
291 disconnect(false);
292 }
293
294 private void disconnect(boolean onlyOutput)
295 {
296 if (LOG_CLOSE.isDebugEnabled())
297 LOG_CLOSE.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
298
299 flusher.close();
300 EndPoint endPoint = getEndPoint();
301
302
303 if (LOG_CLOSE.isDebugEnabled())
304 LOG_CLOSE.debug("Shutting down output {}",endPoint);
305 endPoint.shutdownOutput();
306 if (!onlyOutput)
307 {
308 if (LOG_CLOSE.isDebugEnabled())
309 LOG_CLOSE.debug("Closing {}",endPoint);
310 endPoint.close();
311 }
312 }
313
314 protected void execute(Runnable task)
315 {
316 try
317 {
318 getExecutor().execute(task);
319 }
320 catch (RejectedExecutionException e)
321 {
322 if (LOG.isDebugEnabled())
323 LOG.debug("Job not dispatched: {}",task);
324 }
325 }
326
327 @Override
328 public void fillInterested()
329 {
330 stats.countFillInterestedEvents.incrementAndGet();
331 super.fillInterested();
332 }
333
334 @Override
335 public ByteBufferPool getBufferPool()
336 {
337 return bufferPool;
338 }
339
340
341
342
343
344
345
346
347 public List<ExtensionConfig> getExtensions()
348 {
349 return extensions;
350 }
351
352 public Generator getGenerator()
353 {
354 return generator;
355 }
356
357 @Override
358 public String getId()
359 {
360 return id;
361 }
362
363 @Override
364 public long getIdleTimeout()
365 {
366 return getEndPoint().getIdleTimeout();
367 }
368
369 @Override
370 public IOState getIOState()
371 {
372 return ioState;
373 }
374
375 @Override
376 public long getMaxIdleTimeout()
377 {
378 return getEndPoint().getIdleTimeout();
379 }
380
381 public Parser getParser()
382 {
383 return parser;
384 }
385
386 @Override
387 public WebSocketPolicy getPolicy()
388 {
389 return this.policy;
390 }
391
392 @Override
393 public InetSocketAddress getRemoteAddress()
394 {
395 return getEndPoint().getRemoteAddress();
396 }
397
398 public Scheduler getScheduler()
399 {
400 return scheduler;
401 }
402
403 public Stats getStats()
404 {
405 return stats;
406 }
407
408 @Override
409 public boolean isOpen()
410 {
411 return getIOState().isOpen() && getEndPoint().isOpen();
412 }
413
414 @Override
415 public boolean isReading()
416 {
417 return isFilling;
418 }
419
420
421
422
423
424
425 @Override
426 public void onClose()
427 {
428 if (LOG.isDebugEnabled())
429 LOG.debug("{} onClose()",policy.getBehavior());
430 super.onClose();
431 ioState.onDisconnected();
432 flusher.close();
433 }
434
435 @Override
436 public void onConnectionStateChange(ConnectionState state)
437 {
438 if (LOG_CLOSE.isDebugEnabled())
439 LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
440
441 switch (state)
442 {
443 case OPEN:
444 if (BufferUtil.hasContent(prefillBuffer))
445 {
446 if (LOG.isDebugEnabled())
447 {
448 LOG.debug("Parsing Upgrade prefill buffer ({} remaining)",prefillBuffer.remaining());
449 }
450 parser.parse(prefillBuffer);
451 }
452 if (LOG.isDebugEnabled())
453 {
454 LOG.debug("OPEN: normal fillInterested");
455 }
456
457
458 fillInterested();
459 break;
460 case CLOSED:
461 if (LOG_CLOSE.isDebugEnabled())
462 LOG_CLOSE.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
463 if (ioState.wasAbnormalClose())
464 {
465
466 CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
467 outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
468 }
469 else
470 {
471
472 this.disconnect(false);
473 }
474 break;
475 case CLOSING:
476 if (LOG_CLOSE.isDebugEnabled())
477 LOG_CLOSE.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
478
479 if (ioState.wasRemoteCloseInitiated())
480 {
481 CloseInfo close = ioState.getCloseInfo();
482
483 outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
484 }
485 default:
486 break;
487 }
488 }
489
490 @Override
491 public void onFillable()
492 {
493 if (LOG.isDebugEnabled())
494 LOG.debug("{} onFillable()",policy.getBehavior());
495 stats.countOnFillableEvents.incrementAndGet();
496
497 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
498
499 try
500 {
501 isFilling = true;
502
503 if(readMode == ReadMode.PARSE)
504 {
505 readMode = readParse(buffer);
506 }
507 else
508 {
509 readMode = readDiscard(buffer);
510 }
511 }
512 finally
513 {
514 bufferPool.release(buffer);
515 }
516
517 if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
518 {
519 fillInterested();
520 }
521 else
522 {
523 isFilling = false;
524 }
525 }
526
527
528
529 @Override
530 protected void onFillInterestedFailed(Throwable cause)
531 {
532 LOG.ignore(cause);
533 stats.countFillInterestedEvents.incrementAndGet();
534 super.onFillInterestedFailed(cause);
535 }
536
537
538
539
540
541
542
543 protected void setInitialBuffer(ByteBuffer prefilled)
544 {
545 if (LOG.isDebugEnabled())
546 {
547 LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled));
548 }
549 prefillBuffer = prefilled;
550 }
551
552 private void notifyError(Throwable t)
553 {
554 getParser().getIncomingFramesHandler().incomingError(t);
555 }
556
557 @Override
558 public void onOpen()
559 {
560 if(LOG_OPEN.isDebugEnabled())
561 LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
562 super.onOpen();
563 this.ioState.onOpened();
564 }
565
566
567
568
569 @Override
570 protected boolean onReadTimeout()
571 {
572 IOState state = getIOState();
573 ConnectionState cstate = state.getConnectionState();
574 if (LOG_CLOSE.isDebugEnabled())
575 LOG_CLOSE.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
576
577 if (cstate == ConnectionState.CLOSED)
578 {
579 if (LOG_CLOSE.isDebugEnabled())
580 LOG_CLOSE.debug("onReadTimeout - Connection Already CLOSED");
581
582
583 return true;
584 }
585
586 try
587 {
588 notifyError(new SocketTimeoutException("Timeout on Read"));
589 }
590 finally
591 {
592
593 close(StatusCode.SHUTDOWN,"Idle Timeout");
594 }
595
596 return false;
597 }
598
599
600
601
602 @Override
603 public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
604 {
605 if (LOG.isDebugEnabled())
606 {
607 LOG.debug("outgoingFrame({}, {})",frame,callback);
608 }
609
610 flusher.enqueue(frame,callback,batchMode);
611 }
612
613 private ReadMode readDiscard(ByteBuffer buffer)
614 {
615 EndPoint endPoint = getEndPoint();
616 try
617 {
618 while (true)
619 {
620 int filled = endPoint.fill(buffer);
621 if (filled == 0)
622 {
623 return ReadMode.DISCARD;
624 }
625 else if (filled < 0)
626 {
627 if (LOG_CLOSE.isDebugEnabled())
628 LOG_CLOSE.debug("read - EOF Reached (remote: {})",getRemoteAddress());
629 return ReadMode.EOF;
630 }
631 else
632 {
633 if (LOG_CLOSE.isDebugEnabled())
634 LOG_CLOSE.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
635 }
636 }
637 }
638 catch (IOException e)
639 {
640 LOG.ignore(e);
641 return ReadMode.EOF;
642 }
643 catch (Throwable t)
644 {
645 LOG.ignore(t);
646 return ReadMode.DISCARD;
647 }
648 }
649
650 private ReadMode readParse(ByteBuffer buffer)
651 {
652 EndPoint endPoint = getEndPoint();
653 try
654 {
655
656 while(true)
657 {
658 int filled = endPoint.fill(buffer);
659 if (filled < 0)
660 {
661 LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
662 ioState.onReadFailure(new EOFException("Remote Read EOF"));
663 return ReadMode.EOF;
664 }
665 else if (filled == 0)
666 {
667
668 return ReadMode.PARSE;
669 }
670
671 if (LOG.isDebugEnabled())
672 {
673 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
674 }
675 parser.parse(buffer);
676 }
677 }
678 catch (IOException e)
679 {
680 LOG.warn(e);
681 close(StatusCode.PROTOCOL,e.getMessage());
682 return ReadMode.DISCARD;
683 }
684 catch (CloseException e)
685 {
686 LOG.debug(e);
687 close(e.getStatusCode(),e.getMessage());
688 return ReadMode.DISCARD;
689 }
690 catch (Throwable t)
691 {
692 LOG.warn(t);
693 close(StatusCode.ABNORMAL,t.getMessage());
694
695 return ReadMode.DISCARD;
696 }
697 }
698
699 @Override
700 public void resume()
701 {
702 if (suspendToken.getAndSet(false))
703 {
704 fillInterested();
705 }
706 }
707
708
709
710
711
712
713
714
715
716 public void setExtensions(List<ExtensionConfig> extensions)
717 {
718 this.extensions = extensions;
719 }
720
721 @Override
722 public void setInputBufferSize(int inputBufferSize)
723 {
724 if (inputBufferSize < MIN_BUFFER_SIZE)
725 {
726 throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
727 }
728 super.setInputBufferSize(inputBufferSize);
729 }
730
731 @Override
732 public void setMaxIdleTimeout(long ms)
733 {
734 getEndPoint().setIdleTimeout(ms);
735 }
736
737 @Override
738 public SuspendToken suspend()
739 {
740 suspendToken.set(true);
741 return this;
742 }
743
744 @Override
745 public String dump()
746 {
747 return ContainerLifeCycle.dump(this);
748 }
749
750 @Override
751 public void dump(Appendable out, String indent) throws IOException
752 {
753 out.append(toString()).append(System.lineSeparator());
754 }
755
756 @Override
757 public String toString()
758 {
759 return String.format("%s@%X{endp=%s,ios=%s,f=%s,g=%s,p=%s}",getClass().getSimpleName(),hashCode(),getEndPoint(),ioState,flusher,generator,parser);
760 }
761
762 @Override
763 public int hashCode()
764 {
765 final int prime = 31;
766 int result = 1;
767
768 EndPoint endp = getEndPoint();
769 if(endp != null)
770 {
771 result = prime * result + endp.getLocalAddress().hashCode();
772 result = prime * result + endp.getRemoteAddress().hashCode();
773 }
774 return result;
775 }
776
777 @Override
778 public boolean equals(Object obj)
779 {
780 if (this == obj)
781 return true;
782 if (obj == null)
783 return false;
784 if (getClass() != obj.getClass())
785 return false;
786 AbstractWebSocketConnection other = (AbstractWebSocketConnection)obj;
787 EndPoint endp = getEndPoint();
788 EndPoint otherEndp = other.getEndPoint();
789 if (endp == null)
790 {
791 if (otherEndp != null)
792 return false;
793 }
794 else if (!endp.equals(otherEndp))
795 return false;
796 return true;
797 }
798
799
800
801
802
803
804 @Override
805 public void onUpgradeTo(ByteBuffer prefilled)
806 {
807 setInitialBuffer(prefilled);
808 }
809 }