1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketTimeoutException;
25 import java.nio.ByteBuffer;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.ByteBufferPool;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.util.BufferUtil;
37 import org.eclipse.jetty.util.Callback;
38 import org.eclipse.jetty.util.ForkInvoker;
39 import org.eclipse.jetty.util.StringUtil;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Scheduler;
43 import org.eclipse.jetty.websocket.api.CloseException;
44 import org.eclipse.jetty.websocket.api.CloseStatus;
45 import org.eclipse.jetty.websocket.api.StatusCode;
46 import org.eclipse.jetty.websocket.api.SuspendToken;
47 import org.eclipse.jetty.websocket.api.WebSocketException;
48 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
49 import org.eclipse.jetty.websocket.api.WriteCallback;
50 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
51 import org.eclipse.jetty.websocket.api.extensions.Frame;
52 import org.eclipse.jetty.websocket.common.CloseInfo;
53 import org.eclipse.jetty.websocket.common.ConnectionState;
54 import org.eclipse.jetty.websocket.common.Generator;
55 import org.eclipse.jetty.websocket.common.LogicalConnection;
56 import org.eclipse.jetty.websocket.common.Parser;
57 import org.eclipse.jetty.websocket.common.WebSocketSession;
58 import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
59
60
61
62
63 public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener
64 {
65 private class FlushCallback implements Callback
66 {
67
68
69
70 @Override
71 public void failed(Throwable x)
72 {
73 if (ioState.wasAbnormalClose())
74 {
75 LOG.ignore(x);
76 return;
77 }
78
79 LOG.debug("Write flush failure",x);
80
81
82
83 String reason = "Websocket write failure";
84
85 if (x instanceof EOFException)
86 {
87 reason = "EOF";
88 Throwable cause = x.getCause();
89 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
90 {
91 reason = "EOF: " + cause.getMessage();
92 }
93 }
94 else
95 {
96 if (StringUtil.isNotBlank(x.getMessage()))
97 {
98 reason = x.getMessage();
99 }
100 }
101
102
103 reason = CloseStatus.trimMaxReasonLength(reason);
104 session.notifyError(x);
105 session.notifyClose(StatusCode.NO_CLOSE,reason);
106
107 disconnect();
108 }
109
110 @Override
111 public void succeeded()
112 {
113 AbstractWebSocketConnection.this.complete(writeBytes);
114 }
115 }
116
117 private class FlushInvoker extends ForkInvoker<Callback>
118 {
119 private FlushInvoker()
120 {
121 super(4);
122 }
123
124 @Override
125 public void call(Callback callback)
126 {
127 flush();
128 }
129
130 @Override
131 public void fork(final Callback callback)
132 {
133 execute(new Runnable()
134 {
135 @Override
136 public void run()
137 {
138 flush();
139 }
140 });
141 }
142
143 @Override
144 public String toString()
145 {
146 return String.format("%s@%x",FlushInvoker.class.getSimpleName(),hashCode());
147 }
148 }
149
150 public class OnDisconnectCallback implements WriteCallback
151 {
152 @Override
153 public void writeFailed(Throwable x)
154 {
155 disconnect();
156 }
157
158 @Override
159 public void writeSuccess()
160 {
161 disconnect();
162 }
163 }
164
165 public static class Stats
166 {
167 private AtomicLong countFillInterestedEvents = new AtomicLong(0);
168 private AtomicLong countOnFillableEvents = new AtomicLong(0);
169 private AtomicLong countFillableErrors = new AtomicLong(0);
170
171 public long getFillableErrorCount()
172 {
173 return countFillableErrors.get();
174 }
175
176 public long getFillInterestedCount()
177 {
178 return countFillInterestedEvents.get();
179 }
180
181 public long getOnFillableCount()
182 {
183 return countOnFillableEvents.get();
184 }
185 }
186
187 private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
188
189
190
191
192 private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD;
193
194 private final ForkInvoker<Callback> invoker = new FlushInvoker();
195 private final ByteBufferPool bufferPool;
196 private final Scheduler scheduler;
197 private final Generator generator;
198 private final Parser parser;
199 private final WebSocketPolicy policy;
200 private final WriteBytesProvider writeBytes;
201 private final AtomicBoolean suspendToken;
202 private WebSocketSession session;
203 private List<ExtensionConfig> extensions;
204 private boolean flushing;
205 private boolean isFilling;
206 private IOState ioState;
207 private Stats stats = new Stats();
208
209 public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
210 {
211 super(endp,executor,EXECUTE_ONFILLABLE);
212 this.policy = policy;
213 this.bufferPool = bufferPool;
214 this.generator = new Generator(policy,bufferPool);
215 this.parser = new Parser(policy,bufferPool);
216 this.scheduler = scheduler;
217 this.extensions = new ArrayList<>();
218 this.suspendToken = new AtomicBoolean(false);
219 this.ioState = new IOState();
220 this.ioState.addListener(this);
221 this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
222 this.setInputBufferSize(policy.getInputBufferSize());
223 }
224
225 @Override
226 public Executor getExecutor()
227 {
228 return super.getExecutor();
229 }
230
231 @Override
232 public void close()
233 {
234 close(StatusCode.NORMAL,null);
235 }
236
237
238
239
240
241
242
243
244
245
246
247
248 @Override
249 public void close(int statusCode, String reason)
250 {
251 CloseInfo close = new CloseInfo(statusCode,reason);
252 if (statusCode == StatusCode.ABNORMAL)
253 {
254 ioState.onAbnormalClose(close);
255 }
256 else
257 {
258 ioState.onCloseLocal(close);
259 }
260 }
261
262 public void complete(final Callback callback)
263 {
264 LOG.debug("complete({})",callback);
265 synchronized (writeBytes)
266 {
267 flushing = false;
268 }
269
270 if (!ioState.isOpen() || (callback == null))
271 {
272 return;
273 }
274
275 invoker.invoke(callback);
276 }
277
278 @Override
279 public void disconnect()
280 {
281 LOG.debug("{} disconnect()",policy.getBehavior());
282 synchronized (writeBytes)
283 {
284 if (!writeBytes.isClosed())
285 {
286 writeBytes.close();
287 }
288 }
289 disconnect(false);
290 }
291
292 private void disconnect(boolean onlyOutput)
293 {
294 EndPoint endPoint = getEndPoint();
295
296
297 LOG.debug("Shutting down output {}",endPoint);
298 endPoint.shutdownOutput();
299 if (!onlyOutput)
300 {
301 LOG.debug("Closing {}",endPoint);
302 endPoint.close();
303 }
304 }
305
306 protected void execute(Runnable task)
307 {
308 try
309 {
310 getExecutor().execute(task);
311 }
312 catch (RejectedExecutionException e)
313 {
314 LOG.debug("Job not dispatched: {}",task);
315 }
316 }
317
318 @Override
319 public void fillInterested()
320 {
321 stats.countFillInterestedEvents.incrementAndGet();
322 super.fillInterested();
323 }
324
325 public void flush()
326 {
327 List<ByteBuffer> buffers = null;
328
329 synchronized (writeBytes)
330 {
331 if (flushing)
332 {
333 LOG.debug("Actively flushing");
334 return;
335 }
336
337 if (LOG.isDebugEnabled())
338 {
339 LOG.debug(".flush() - flushing={} - writeBytes={}",flushing,writeBytes);
340 }
341
342 if (!isOpen())
343 {
344
345 writeBytes.failAll(new WebSocketException("Connection closed"));
346 return;
347 }
348
349 buffers = writeBytes.getByteBuffers();
350
351 if ((buffers == null) || (buffers.size() <= 0))
352 {
353 return;
354 }
355
356 flushing = true;
357 }
358
359 write(buffers);
360 }
361
362 @Override
363 public ByteBufferPool getBufferPool()
364 {
365 return bufferPool;
366 }
367
368
369
370
371
372
373
374
375 public List<ExtensionConfig> getExtensions()
376 {
377 return extensions;
378 }
379
380 public Generator getGenerator()
381 {
382 return generator;
383 }
384
385 @Override
386 public long getIdleTimeout()
387 {
388 return getEndPoint().getIdleTimeout();
389 }
390
391 @Override
392 public IOState getIOState()
393 {
394 return ioState;
395 }
396
397 @Override
398 public long getMaxIdleTimeout()
399 {
400 return getEndPoint().getIdleTimeout();
401 }
402
403 public Parser getParser()
404 {
405 return parser;
406 }
407
408 @Override
409 public WebSocketPolicy getPolicy()
410 {
411 return this.policy;
412 }
413
414 @Override
415 public InetSocketAddress getRemoteAddress()
416 {
417 return getEndPoint().getRemoteAddress();
418 }
419
420 public Scheduler getScheduler()
421 {
422 return scheduler;
423 }
424
425 @Override
426 public WebSocketSession getSession()
427 {
428 return session;
429 }
430
431 public Stats getStats()
432 {
433 return stats;
434 }
435
436 @Override
437 public boolean isOpen()
438 {
439 return getIOState().isOpen() && getEndPoint().isOpen();
440 }
441
442 @Override
443 public boolean isReading()
444 {
445 return isFilling;
446 }
447
448
449
450
451
452
453 @Override
454 public void onClose()
455 {
456 super.onClose();
457 writeBytes.close();
458 }
459
460 @Override
461 public void onConnectionStateChange(ConnectionState state)
462 {
463 LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
464 switch (state)
465 {
466 case OPEN:
467 LOG.debug("fillInterested");
468 fillInterested();
469 break;
470 case CLOSED:
471 if (ioState.wasAbnormalClose())
472 {
473
474 CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
475 outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback());
476 }
477 else
478 {
479
480 this.disconnect();
481 }
482 break;
483 case CLOSING:
484 CloseInfo close = ioState.getCloseInfo();
485
486 outgoingFrame(close.asFrame(),new OnDisconnectCallback());
487 default:
488 break;
489 }
490 }
491
492 @Override
493 public void onFillable()
494 {
495 LOG.debug("{} onFillable()",policy.getBehavior());
496 stats.countOnFillableEvents.incrementAndGet();
497 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
498 BufferUtil.clear(buffer);
499 boolean readMore = false;
500 try
501 {
502 isFilling = true;
503 readMore = (read(buffer) != -1);
504 }
505 finally
506 {
507 bufferPool.release(buffer);
508 }
509
510 if (readMore && (suspendToken.get() == false))
511 {
512 fillInterested();
513 }
514 else
515 {
516 isFilling = false;
517 }
518 }
519
520 @Override
521 protected void onFillInterestedFailed(Throwable cause)
522 {
523 LOG.ignore(cause);
524 stats.countFillInterestedEvents.incrementAndGet();
525 super.onFillInterestedFailed(cause);
526 }
527
528 @Override
529 public void onOpen()
530 {
531 super.onOpen();
532 this.ioState.onOpened();
533 }
534
535 @Override
536 protected boolean onReadTimeout()
537 {
538 LOG.debug("{} Read Timeout",policy.getBehavior());
539
540 IOState state = getIOState();
541 if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
542 {
543
544
545 return true;
546 }
547
548
549 session.notifyError(new SocketTimeoutException("Timeout on Read"));
550
551 close(StatusCode.ABNORMAL,"Idle Timeout");
552
553 return false;
554 }
555
556
557
558
559 @Override
560 public void outgoingFrame(Frame frame, WriteCallback callback)
561 {
562 if (LOG.isDebugEnabled())
563 {
564 LOG.debug("outgoingFrame({}, {})",frame,callback);
565 }
566
567 writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
568
569 flush();
570 }
571
572 private int read(ByteBuffer buffer)
573 {
574 EndPoint endPoint = getEndPoint();
575 try
576 {
577 while (true)
578 {
579 int filled = endPoint.fill(buffer);
580 if (filled == 0)
581 {
582 return 0;
583 }
584 else if (filled < 0)
585 {
586 LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
587 ioState.onReadEOF();
588 return -1;
589 }
590 else
591 {
592 if (LOG.isDebugEnabled())
593 {
594 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
595 }
596 parser.parse(buffer);
597
598 }
599 }
600 }
601 catch (IOException e)
602 {
603 LOG.warn(e);
604 close(StatusCode.PROTOCOL,e.getMessage());
605 return -1;
606 }
607 catch (CloseException e)
608 {
609 LOG.warn(e);
610 close(e.getStatusCode(),e.getMessage());
611 return -1;
612 }
613 }
614
615 @Override
616 public void resume()
617 {
618 if (suspendToken.getAndSet(false))
619 {
620 fillInterested();
621 }
622 }
623
624
625
626
627
628
629
630
631
632 public void setExtensions(List<ExtensionConfig> extensions)
633 {
634 this.extensions = extensions;
635 }
636
637 @Override
638 public void setInputBufferSize(int inputBufferSize)
639 {
640 if (inputBufferSize < MIN_BUFFER_SIZE)
641 {
642 throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
643 }
644 super.setInputBufferSize(inputBufferSize);
645 }
646
647 @Override
648 public void setMaxIdleTimeout(long ms)
649 {
650 getEndPoint().setIdleTimeout(ms);
651 }
652
653 @Override
654 public void setSession(WebSocketSession session)
655 {
656 this.session = session;
657 }
658
659 @Override
660 public SuspendToken suspend()
661 {
662 suspendToken.set(true);
663 return this;
664 }
665
666 @Override
667 public String toString()
668 {
669 return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
670 }
671
672 private <C> void write(List<ByteBuffer> buffer)
673 {
674 EndPoint endpoint = getEndPoint();
675
676 try
677 {
678 int bufsize = buffer.size();
679 if (bufsize == 1)
680 {
681
682 endpoint.write(writeBytes,buffer.get(0));
683 }
684 else
685 {
686
687 ByteBuffer bbarr[] = buffer.toArray(new ByteBuffer[bufsize]);
688 endpoint.write(writeBytes,bbarr);
689 }
690 }
691 catch (Throwable t)
692 {
693 writeBytes.failed(t);
694 }
695 }
696 }