1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketTimeoutException;
25 import java.nio.ByteBuffer;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.ByteBufferPool;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.component.ContainerLifeCycle;
39 import org.eclipse.jetty.util.component.Dumpable;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Scheduler;
43 import org.eclipse.jetty.websocket.api.BatchMode;
44 import org.eclipse.jetty.websocket.api.CloseException;
45 import org.eclipse.jetty.websocket.api.StatusCode;
46 import org.eclipse.jetty.websocket.api.SuspendToken;
47 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
48 import org.eclipse.jetty.websocket.api.WriteCallback;
49 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
50 import org.eclipse.jetty.websocket.api.extensions.Frame;
51 import org.eclipse.jetty.websocket.common.CloseInfo;
52 import org.eclipse.jetty.websocket.common.ConnectionState;
53 import org.eclipse.jetty.websocket.common.Generator;
54 import org.eclipse.jetty.websocket.common.LogicalConnection;
55 import org.eclipse.jetty.websocket.common.Parser;
56 import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
57
58
59
60
61 public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
62 {
63 private final AtomicBoolean closed = new AtomicBoolean();
64
65 private class Flusher extends FrameFlusher
66 {
67 private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
68 {
69 super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
70 }
71
72 @Override
73 protected void onFailure(Throwable x)
74 {
75 notifyError(x);
76
77 if (ioState.wasAbnormalClose())
78 {
79 LOG.ignore(x);
80 return;
81 }
82
83 if (LOG.isDebugEnabled())
84 LOG.debug("Write flush failure",x);
85 ioState.onWriteFailure(x);
86 }
87 }
88
89 public class OnDisconnectCallback implements WriteCallback
90 {
91 private final boolean outputOnly;
92
93 public OnDisconnectCallback(boolean outputOnly)
94 {
95 this.outputOnly = outputOnly;
96 }
97
98 @Override
99 public void writeFailed(Throwable x)
100 {
101 disconnect(outputOnly);
102 }
103
104 @Override
105 public void writeSuccess()
106 {
107 disconnect(outputOnly);
108 }
109 }
110
111 public class OnCloseLocalCallback implements WriteCallback
112 {
113 private final WriteCallback callback;
114 private final CloseInfo close;
115
116 public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
117 {
118 this.callback = callback;
119 this.close = close;
120 }
121
122 public OnCloseLocalCallback(CloseInfo close)
123 {
124 this(null,close);
125 }
126
127 @Override
128 public void writeFailed(Throwable x)
129 {
130 try
131 {
132 if (callback != null)
133 {
134 callback.writeFailed(x);
135 }
136 }
137 finally
138 {
139 onLocalClose();
140 }
141 }
142
143 @Override
144 public void writeSuccess()
145 {
146 try
147 {
148 if (callback != null)
149 {
150 callback.writeSuccess();
151 }
152 }
153 finally
154 {
155 onLocalClose();
156 }
157 }
158
159 private void onLocalClose()
160 {
161 if (LOG_CLOSE.isDebugEnabled())
162 LOG_CLOSE.debug("Local Close Confirmed {}",close);
163 if (close.isAbnormal())
164 {
165 ioState.onAbnormalClose(close);
166 }
167 else
168 {
169 ioState.onCloseLocal(close);
170 }
171 }
172 }
173
174 public static class Stats
175 {
176 private AtomicLong countFillInterestedEvents = new AtomicLong(0);
177 private AtomicLong countOnFillableEvents = new AtomicLong(0);
178 private AtomicLong countFillableErrors = new AtomicLong(0);
179
180 public long getFillableErrorCount()
181 {
182 return countFillableErrors.get();
183 }
184
185 public long getFillInterestedCount()
186 {
187 return countFillInterestedEvents.get();
188 }
189
190 public long getOnFillableCount()
191 {
192 return countOnFillableEvents.get();
193 }
194 }
195
196 private static enum ReadMode
197 {
198 PARSE,
199 DISCARD,
200 EOF
201 }
202
203 private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
204 private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_OPEN");
205 private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_CLOSE");
206
207
208
209
210 private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
211
212 private final ByteBufferPool bufferPool;
213 private final Scheduler scheduler;
214 private final Generator generator;
215 private final Parser parser;
216 private final WebSocketPolicy policy;
217 private final AtomicBoolean suspendToken;
218 private final FrameFlusher flusher;
219 private final String id;
220 private List<ExtensionConfig> extensions;
221 private boolean isFilling;
222 private ByteBuffer prefillBuffer;
223 private ReadMode readMode = ReadMode.PARSE;
224 private IOState ioState;
225 private Stats stats = new Stats();
226
227 public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
228 {
229 super(endp,executor);
230 this.id = String.format("%s:%d->%s:%d",
231 endp.getLocalAddress().getAddress().getHostAddress(),
232 endp.getLocalAddress().getPort(),
233 endp.getRemoteAddress().getAddress().getHostAddress(),
234 endp.getRemoteAddress().getPort());
235 this.policy = policy;
236 this.bufferPool = bufferPool;
237 this.generator = new Generator(policy,bufferPool);
238 this.parser = new Parser(policy,bufferPool);
239 this.scheduler = scheduler;
240 this.extensions = new ArrayList<>();
241 this.suspendToken = new AtomicBoolean(false);
242 this.ioState = new IOState();
243 this.ioState.addListener(this);
244 this.flusher = new Flusher(bufferPool,generator,endp);
245 this.setInputBufferSize(policy.getInputBufferSize());
246 this.setMaxIdleTimeout(policy.getIdleTimeout());
247 }
248
249 @Override
250 public Executor getExecutor()
251 {
252 return super.getExecutor();
253 }
254
255
256
257
258 @Override
259 public void close()
260 {
261 if (LOG_CLOSE.isDebugEnabled())
262 LOG_CLOSE.debug("close()");
263 close(new CloseInfo());
264 }
265
266
267
268
269
270
271
272
273
274
275
276
277
278 @Override
279 public void close(int statusCode, String reason)
280 {
281 if (LOG_CLOSE.isDebugEnabled())
282 LOG_CLOSE.debug("close({},{})", statusCode, reason);
283 close(new CloseInfo(statusCode, reason));
284 }
285
286 private void close(CloseInfo closeInfo)
287 {
288 if (closed.compareAndSet(false, true))
289 outgoingFrame(closeInfo.asFrame(), new OnCloseLocalCallback(closeInfo), BatchMode.OFF);
290 }
291
292 @Override
293 public void disconnect()
294 {
295 if (LOG_CLOSE.isDebugEnabled())
296 LOG_CLOSE.debug("{} disconnect()",policy.getBehavior());
297 disconnect(false);
298 }
299
300 private void disconnect(boolean onlyOutput)
301 {
302 if (LOG_CLOSE.isDebugEnabled())
303 LOG_CLOSE.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
304
305 flusher.close();
306 EndPoint endPoint = getEndPoint();
307
308
309 if (LOG_CLOSE.isDebugEnabled())
310 LOG_CLOSE.debug("Shutting down output {}",endPoint);
311 endPoint.shutdownOutput();
312 if (!onlyOutput)
313 {
314 if (LOG_CLOSE.isDebugEnabled())
315 LOG_CLOSE.debug("Closing {}",endPoint);
316 endPoint.close();
317 }
318 }
319
320 protected void execute(Runnable task)
321 {
322 try
323 {
324 getExecutor().execute(task);
325 }
326 catch (RejectedExecutionException e)
327 {
328 if (LOG.isDebugEnabled())
329 LOG.debug("Job not dispatched: {}",task);
330 }
331 }
332
333 @Override
334 public void fillInterested()
335 {
336 stats.countFillInterestedEvents.incrementAndGet();
337 super.fillInterested();
338 }
339
340 @Override
341 public ByteBufferPool getBufferPool()
342 {
343 return bufferPool;
344 }
345
346
347
348
349
350
351
352
353 public List<ExtensionConfig> getExtensions()
354 {
355 return extensions;
356 }
357
358 public Generator getGenerator()
359 {
360 return generator;
361 }
362
363 @Override
364 public String getId()
365 {
366 return id;
367 }
368
369 @Override
370 public long getIdleTimeout()
371 {
372 return getEndPoint().getIdleTimeout();
373 }
374
375 @Override
376 public IOState getIOState()
377 {
378 return ioState;
379 }
380
381 @Override
382 public long getMaxIdleTimeout()
383 {
384 return getEndPoint().getIdleTimeout();
385 }
386
387 public Parser getParser()
388 {
389 return parser;
390 }
391
392 @Override
393 public WebSocketPolicy getPolicy()
394 {
395 return this.policy;
396 }
397
398 @Override
399 public InetSocketAddress getRemoteAddress()
400 {
401 return getEndPoint().getRemoteAddress();
402 }
403
404 public Scheduler getScheduler()
405 {
406 return scheduler;
407 }
408
409 public Stats getStats()
410 {
411 return stats;
412 }
413
414 @Override
415 public boolean isOpen()
416 {
417 return !closed.get();
418 }
419
420 @Override
421 public boolean isReading()
422 {
423 return isFilling;
424 }
425
426
427
428
429
430
431 @Override
432 public void onClose()
433 {
434 if (LOG.isDebugEnabled())
435 LOG.debug("{} onClose()",policy.getBehavior());
436 super.onClose();
437 ioState.onDisconnected();
438 flusher.close();
439 }
440
441 @Override
442 public void onConnectionStateChange(ConnectionState state)
443 {
444 if (LOG_CLOSE.isDebugEnabled())
445 LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
446
447 switch (state)
448 {
449 case OPEN:
450 if (BufferUtil.hasContent(prefillBuffer))
451 {
452 if (LOG.isDebugEnabled())
453 {
454 LOG.debug("Parsing Upgrade prefill buffer ({} remaining)",prefillBuffer.remaining());
455 }
456 parser.parse(prefillBuffer);
457 }
458 if (LOG.isDebugEnabled())
459 {
460 LOG.debug("OPEN: normal fillInterested");
461 }
462
463
464 fillInterested();
465 break;
466 case CLOSED:
467 if (LOG_CLOSE.isDebugEnabled())
468 LOG_CLOSE.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
469 if (ioState.wasAbnormalClose())
470 {
471
472 CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
473 outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
474 }
475 else
476 {
477
478 this.disconnect(false);
479 }
480 break;
481 case CLOSING:
482 if (LOG_CLOSE.isDebugEnabled())
483 LOG_CLOSE.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
484
485 if (ioState.wasRemoteCloseInitiated())
486 {
487 CloseInfo close = ioState.getCloseInfo();
488
489 outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
490 }
491 default:
492 break;
493 }
494 }
495
496 @Override
497 public void onFillable()
498 {
499 if (LOG.isDebugEnabled())
500 LOG.debug("{} onFillable()",policy.getBehavior());
501 stats.countOnFillableEvents.incrementAndGet();
502
503 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
504
505 try
506 {
507 isFilling = true;
508
509 if(readMode == ReadMode.PARSE)
510 {
511 readMode = readParse(buffer);
512 }
513 else
514 {
515 readMode = readDiscard(buffer);
516 }
517 }
518 finally
519 {
520 bufferPool.release(buffer);
521 }
522
523 if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
524 {
525 fillInterested();
526 }
527 else
528 {
529 isFilling = false;
530 }
531 }
532
533 @Override
534 protected void onFillInterestedFailed(Throwable cause)
535 {
536 LOG.ignore(cause);
537 stats.countFillInterestedEvents.incrementAndGet();
538 super.onFillInterestedFailed(cause);
539 }
540
541
542
543
544
545
546
547 protected void setInitialBuffer(ByteBuffer prefilled)
548 {
549 if (LOG.isDebugEnabled())
550 {
551 LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled));
552 }
553 prefillBuffer = prefilled;
554 }
555
556 private void notifyError(Throwable t)
557 {
558 getParser().getIncomingFramesHandler().incomingError(t);
559 }
560
561 @Override
562 public void onOpen()
563 {
564 if(LOG_OPEN.isDebugEnabled())
565 LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
566 super.onOpen();
567 this.ioState.onOpened();
568 }
569
570
571
572
573 @Override
574 protected boolean onReadTimeout()
575 {
576 IOState state = getIOState();
577 ConnectionState cstate = state.getConnectionState();
578 if (LOG_CLOSE.isDebugEnabled())
579 LOG_CLOSE.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
580
581 if (cstate == ConnectionState.CLOSED)
582 {
583 if (LOG_CLOSE.isDebugEnabled())
584 LOG_CLOSE.debug("onReadTimeout - Connection Already CLOSED");
585
586
587 return true;
588 }
589
590 try
591 {
592 notifyError(new SocketTimeoutException("Timeout on Read"));
593 }
594 finally
595 {
596
597 close(StatusCode.SHUTDOWN,"Idle Timeout");
598 }
599
600 return false;
601 }
602
603
604
605
606 @Override
607 public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
608 {
609 if (LOG.isDebugEnabled())
610 {
611 LOG.debug("outgoingFrame({}, {})",frame,callback);
612 }
613
614 flusher.enqueue(frame,callback,batchMode);
615 }
616
617 private ReadMode readDiscard(ByteBuffer buffer)
618 {
619 EndPoint endPoint = getEndPoint();
620 try
621 {
622 while (true)
623 {
624 int filled = endPoint.fill(buffer);
625 if (filled == 0)
626 {
627 return ReadMode.DISCARD;
628 }
629 else if (filled < 0)
630 {
631 if (LOG_CLOSE.isDebugEnabled())
632 LOG_CLOSE.debug("read - EOF Reached (remote: {})",getRemoteAddress());
633 return ReadMode.EOF;
634 }
635 else
636 {
637 if (LOG_CLOSE.isDebugEnabled())
638 LOG_CLOSE.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
639 }
640 }
641 }
642 catch (IOException e)
643 {
644 LOG.ignore(e);
645 return ReadMode.EOF;
646 }
647 catch (Throwable t)
648 {
649 LOG.ignore(t);
650 return ReadMode.DISCARD;
651 }
652 }
653
654 private ReadMode readParse(ByteBuffer buffer)
655 {
656 EndPoint endPoint = getEndPoint();
657 try
658 {
659
660 while(true)
661 {
662 int filled = endPoint.fill(buffer);
663 if (filled < 0)
664 {
665 LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
666 ioState.onReadFailure(new EOFException("Remote Read EOF"));
667 return ReadMode.EOF;
668 }
669 else if (filled == 0)
670 {
671
672 return ReadMode.PARSE;
673 }
674
675 if (LOG.isDebugEnabled())
676 {
677 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
678 }
679 parser.parse(buffer);
680 }
681 }
682 catch (IOException e)
683 {
684 LOG.warn(e);
685 close(StatusCode.PROTOCOL,e.getMessage());
686 return ReadMode.DISCARD;
687 }
688 catch (CloseException e)
689 {
690 LOG.debug(e);
691 close(e.getStatusCode(),e.getMessage());
692 return ReadMode.DISCARD;
693 }
694 catch (Throwable t)
695 {
696 LOG.warn(t);
697 close(StatusCode.ABNORMAL,t.getMessage());
698
699 return ReadMode.DISCARD;
700 }
701 }
702
703 @Override
704 public void resume()
705 {
706 if (suspendToken.getAndSet(false))
707 {
708 fillInterested();
709 }
710 }
711
712
713
714
715
716
717
718
719
720 public void setExtensions(List<ExtensionConfig> extensions)
721 {
722 this.extensions = extensions;
723 }
724
725 @Override
726 public void setInputBufferSize(int inputBufferSize)
727 {
728 if (inputBufferSize < MIN_BUFFER_SIZE)
729 {
730 throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
731 }
732 super.setInputBufferSize(inputBufferSize);
733 }
734
735 @Override
736 public void setMaxIdleTimeout(long ms)
737 {
738 getEndPoint().setIdleTimeout(ms);
739 }
740
741 @Override
742 public SuspendToken suspend()
743 {
744 suspendToken.set(true);
745 return this;
746 }
747
748 @Override
749 public String dump()
750 {
751 return ContainerLifeCycle.dump(this);
752 }
753
754 @Override
755 public void dump(Appendable out, String indent) throws IOException
756 {
757 out.append(toString()).append(System.lineSeparator());
758 }
759
760 @Override
761 public String toString()
762 {
763 return String.format("%s@%X{endp=%s,ios=%s,f=%s,g=%s,p=%s}",getClass().getSimpleName(),hashCode(),getEndPoint(),ioState,flusher,generator,parser);
764 }
765
766 @Override
767 public int hashCode()
768 {
769 final int prime = 31;
770 int result = 1;
771
772 EndPoint endp = getEndPoint();
773 if(endp != null)
774 {
775 result = prime * result + endp.getLocalAddress().hashCode();
776 result = prime * result + endp.getRemoteAddress().hashCode();
777 }
778 return result;
779 }
780
781 @Override
782 public boolean equals(Object obj)
783 {
784 if (this == obj)
785 return true;
786 if (obj == null)
787 return false;
788 if (getClass() != obj.getClass())
789 return false;
790 AbstractWebSocketConnection other = (AbstractWebSocketConnection)obj;
791 EndPoint endp = getEndPoint();
792 EndPoint otherEndp = other.getEndPoint();
793 if (endp == null)
794 {
795 if (otherEndp != null)
796 return false;
797 }
798 else if (!endp.equals(otherEndp))
799 return false;
800 return true;
801 }
802
803
804
805
806
807
808 @Override
809 public void onUpgradeTo(ByteBuffer prefilled)
810 {
811 setInitialBuffer(prefilled);
812 }
813 }