1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import org.eclipse.jetty.io.AbstractConnection;
33 import org.eclipse.jetty.io.ByteBufferPool;
34 import org.eclipse.jetty.io.Connection;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.util.BufferUtil;
37 import org.eclipse.jetty.util.Callback;
38 import org.eclipse.jetty.util.ForkInvoker;
39 import org.eclipse.jetty.util.StringUtil;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42 import org.eclipse.jetty.util.thread.Scheduler;
43 import org.eclipse.jetty.websocket.api.CloseException;
44 import org.eclipse.jetty.websocket.api.CloseStatus;
45 import org.eclipse.jetty.websocket.api.StatusCode;
46 import org.eclipse.jetty.websocket.api.SuspendToken;
47 import org.eclipse.jetty.websocket.api.WebSocketException;
48 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
49 import org.eclipse.jetty.websocket.api.WebSocketTimeoutException;
50 import org.eclipse.jetty.websocket.api.WriteCallback;
51 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
52 import org.eclipse.jetty.websocket.api.extensions.Frame;
53 import org.eclipse.jetty.websocket.common.CloseInfo;
54 import org.eclipse.jetty.websocket.common.ConnectionState;
55 import org.eclipse.jetty.websocket.common.Generator;
56 import org.eclipse.jetty.websocket.common.LogicalConnection;
57 import org.eclipse.jetty.websocket.common.Parser;
58 import org.eclipse.jetty.websocket.common.WebSocketSession;
59 import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
60
61
62
63
64 public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener
65 {
66 private class FlushCallback implements Callback
67 {
68
69
70
71 @Override
72 public void failed(Throwable x)
73 {
74 LOG.debug("Write flush failure",x);
75
76
77
78 String reason = "Websocket write failure";
79
80 if (x instanceof EOFException)
81 {
82 reason = "EOF";
83 Throwable cause = x.getCause();
84 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
85 {
86 reason = "EOF: " + cause.getMessage();
87 }
88 }
89 else
90 {
91 if (StringUtil.isNotBlank(x.getMessage()))
92 {
93 reason = x.getMessage();
94 }
95 }
96
97
98 reason = CloseStatus.trimMaxReasonLength(reason);
99 session.incomingError(new WebSocketException(x));
100 session.notifyClose(StatusCode.NO_CLOSE,reason);
101
102 disconnect();
103 }
104
105 @Override
106 public void succeeded()
107 {
108 AbstractWebSocketConnection.this.complete(writeBytes);
109 }
110 }
111
112 private class FlushInvoker extends ForkInvoker<Callback>
113 {
114 private FlushInvoker()
115 {
116 super(4);
117 }
118
119 @Override
120 public void call(Callback callback)
121 {
122 flush();
123 }
124
125 @Override
126 public void fork(final Callback callback)
127 {
128 execute(new Runnable()
129 {
130 @Override
131 public void run()
132 {
133 flush();
134 }
135 });
136 }
137
138 @Override
139 public String toString()
140 {
141 return String.format("%s@%x",FlushInvoker.class.getSimpleName(),hashCode());
142 }
143 }
144
145 public class OnDisconnectCallback implements WriteCallback
146 {
147 @Override
148 public void writeFailed(Throwable x)
149 {
150 disconnect();
151 }
152
153 @Override
154 public void writeSuccess()
155 {
156 disconnect();
157 }
158 }
159
160 public static class Stats
161 {
162 private AtomicLong countFillInterestedEvents = new AtomicLong(0);
163 private AtomicLong countOnFillableEvents = new AtomicLong(0);
164 private AtomicLong countFillableErrors = new AtomicLong(0);
165
166 public long getFillableErrorCount()
167 {
168 return countFillableErrors.get();
169 }
170
171 public long getFillInterestedCount()
172 {
173 return countFillInterestedEvents.get();
174 }
175
176 public long getOnFillableCount()
177 {
178 return countOnFillableEvents.get();
179 }
180 }
181
182 private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
183
184
185
186
187 private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD;
188
189 private final ForkInvoker<Callback> invoker = new FlushInvoker();
190 private final ByteBufferPool bufferPool;
191 private final Scheduler scheduler;
192 private final Generator generator;
193 private final Parser parser;
194 private final WebSocketPolicy policy;
195 private final WriteBytesProvider writeBytes;
196 private final AtomicBoolean suspendToken;
197 private WebSocketSession session;
198 private List<ExtensionConfig> extensions;
199 private boolean flushing;
200 private boolean isFilling;
201 private IOState ioState;
202 private Stats stats = new Stats();
203
204 public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
205 {
206 super(endp,executor,EXECUTE_ONFILLABLE);
207 this.policy = policy;
208 this.bufferPool = bufferPool;
209 this.generator = new Generator(policy,bufferPool);
210 this.parser = new Parser(policy,bufferPool);
211 this.scheduler = scheduler;
212 this.extensions = new ArrayList<>();
213 this.suspendToken = new AtomicBoolean(false);
214 this.ioState = new IOState();
215 this.ioState.addListener(this);
216 this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
217 this.setInputBufferSize(policy.getInputBufferSize());
218 }
219
220 @Override
221 public void close()
222 {
223 close(StatusCode.NORMAL,null);
224 }
225
226 @Override
227 public void close(int statusCode, String reason)
228 {
229 enqueClose(statusCode,reason);
230 }
231
232 public void complete(final Callback callback)
233 {
234 LOG.debug("complete({})",callback);
235 synchronized (writeBytes)
236 {
237 flushing = false;
238 }
239
240 if (!ioState.isOpen() || (callback == null))
241 {
242 return;
243 }
244
245 invoker.invoke(callback);
246 }
247
248 @Override
249 public void disconnect()
250 {
251 LOG.debug("{} disconnect()", policy.getBehavior());
252 synchronized (writeBytes)
253 {
254 if (!writeBytes.isClosed())
255 {
256 writeBytes.close();
257 }
258 }
259 disconnect(false);
260 }
261
262 public void disconnect(boolean onlyOutput)
263 {
264 EndPoint endPoint = getEndPoint();
265
266
267 LOG.debug("Shutting down output {}",endPoint);
268 endPoint.shutdownOutput();
269 if (!onlyOutput)
270 {
271 LOG.debug("Closing {}",endPoint);
272 endPoint.close();
273 }
274 }
275
276
277
278
279
280
281
282
283
284
285 private void enqueClose(int statusCode, String reason)
286 {
287 CloseInfo close = new CloseInfo(statusCode,reason);
288 ioState.onCloseLocal(close);
289 }
290
291 protected void execute(Runnable task)
292 {
293 try
294 {
295 getExecutor().execute(task);
296 }
297 catch (RejectedExecutionException e)
298 {
299 LOG.debug("Job not dispatched: {}",task);
300 }
301 }
302
303 @Override
304 public void fillInterested()
305 {
306 stats.countFillInterestedEvents.incrementAndGet();
307 super.fillInterested();
308 }
309
310 public void flush()
311 {
312 ByteBuffer buffer = null;
313
314 synchronized (writeBytes)
315 {
316 if (flushing)
317 {
318 return;
319 }
320
321 if (LOG.isDebugEnabled())
322 {
323 LOG.debug(".flush() - flushing={} - writeBytes={}",flushing,writeBytes);
324 }
325
326 if (!isOpen())
327 {
328
329 writeBytes.failAll(new WebSocketException("Connection closed"));
330 return;
331 }
332
333 buffer = writeBytes.getByteBuffer();
334
335 if (buffer == null)
336 {
337 return;
338 }
339
340 flushing = true;
341
342 if (LOG.isDebugEnabled())
343 {
344 LOG.debug("Flushing {} - {}",BufferUtil.toDetailString(buffer),writeBytes);
345 }
346 }
347
348 write(buffer);
349 }
350
351 public ByteBufferPool getBufferPool()
352 {
353 return bufferPool;
354 }
355
356
357
358
359
360
361
362
363 public List<ExtensionConfig> getExtensions()
364 {
365 return extensions;
366 }
367
368 public Generator getGenerator()
369 {
370 return generator;
371 }
372
373 @Override
374 public IOState getIOState()
375 {
376 return ioState;
377 }
378
379 @Override
380 public long getMaxIdleTimeout()
381 {
382 return getEndPoint().getIdleTimeout();
383 }
384
385 public Parser getParser()
386 {
387 return parser;
388 }
389
390 @Override
391 public WebSocketPolicy getPolicy()
392 {
393 return this.policy;
394 }
395
396 @Override
397 public InetSocketAddress getRemoteAddress()
398 {
399 return getEndPoint().getRemoteAddress();
400 }
401
402 public Scheduler getScheduler()
403 {
404 return scheduler;
405 }
406
407 @Override
408 public WebSocketSession getSession()
409 {
410 return session;
411 }
412
413 public Stats getStats()
414 {
415 return stats;
416 }
417
418 @Override
419 public boolean isOpen()
420 {
421 return getIOState().isOpen() && getEndPoint().isOpen();
422 }
423
424 @Override
425 public boolean isReading()
426 {
427 return isFilling;
428 }
429
430
431
432
433
434
435 @Override
436 public void onClose()
437 {
438 super.onClose();
439 writeBytes.close();
440 }
441
442 @Override
443 public void onConnectionStateChange(ConnectionState state)
444 {
445 LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
446 switch (state)
447 {
448 case OPEN:
449 LOG.debug("fillInterested");
450 fillInterested();
451 break;
452 case CLOSED:
453 this.disconnect();
454 break;
455 case CLOSING:
456 CloseInfo close = ioState.getCloseInfo();
457
458 outgoingFrame(close.asFrame(),new OnDisconnectCallback());
459 default:
460 break;
461 }
462 }
463
464 @Override
465 public void onFillable()
466 {
467 LOG.debug("{} onFillable()",policy.getBehavior());
468 stats.countOnFillableEvents.incrementAndGet();
469 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
470 BufferUtil.clear(buffer);
471 boolean readMore = false;
472 try
473 {
474 isFilling = true;
475 readMore = (read(buffer) != -1);
476 }
477 finally
478 {
479 bufferPool.release(buffer);
480 }
481
482 if (readMore && (suspendToken.get() == false))
483 {
484 fillInterested();
485 }
486 else
487 {
488 isFilling = false;
489 }
490 }
491
492 @Override
493 protected void onFillInterestedFailed(Throwable cause)
494 {
495 LOG.ignore(cause);
496 stats.countFillInterestedEvents.incrementAndGet();
497 super.onFillInterestedFailed(cause);
498 }
499
500 @Override
501 public void onOpen()
502 {
503 super.onOpen();
504 this.ioState.onOpened();
505 }
506
507 @Override
508 protected boolean onReadTimeout()
509 {
510 LOG.debug("Read Timeout");
511
512 IOState state = getIOState();
513 if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
514 {
515
516
517 return true;
518 }
519
520
521 session.incomingError(new WebSocketTimeoutException("Timeout on Read"));
522 close(StatusCode.SHUTDOWN,"Idle Timeout");
523
524 return false;
525 }
526
527
528
529
530 @Override
531 public void outgoingFrame(Frame frame, WriteCallback callback)
532 {
533 if (LOG.isDebugEnabled())
534 {
535 LOG.debug("outgoingFrame({}, {})",frame,callback);
536 }
537
538 writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
539
540 flush();
541 }
542
543 private int read(ByteBuffer buffer)
544 {
545 EndPoint endPoint = getEndPoint();
546 try
547 {
548 while (true)
549 {
550 int filled = endPoint.fill(buffer);
551 if (filled == 0)
552 {
553 return 0;
554 }
555 else if (filled < 0)
556 {
557 LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
558 ioState.onReadEOF();
559 return -1;
560 }
561 else
562 {
563 if (LOG.isDebugEnabled())
564 {
565 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
566 }
567 parser.parse(buffer);
568 }
569 }
570 }
571 catch (IOException e)
572 {
573 LOG.warn(e);
574 enqueClose(StatusCode.PROTOCOL,e.getMessage());
575 return -1;
576 }
577 catch (CloseException e)
578 {
579 LOG.warn(e);
580 enqueClose(e.getStatusCode(),e.getMessage());
581 return -1;
582 }
583 }
584
585 @Override
586 public void resume()
587 {
588 if (suspendToken.getAndSet(false))
589 {
590 fillInterested();
591 }
592 }
593
594
595
596
597
598
599
600
601
602 public void setExtensions(List<ExtensionConfig> extensions)
603 {
604 this.extensions = extensions;
605 }
606
607 @Override
608 public void setInputBufferSize(int inputBufferSize)
609 {
610 if (inputBufferSize < MIN_BUFFER_SIZE)
611 {
612 throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
613 }
614 super.setInputBufferSize(inputBufferSize);
615 }
616
617 @Override
618 public void setMaxIdleTimeout(long ms)
619 {
620 getEndPoint().setIdleTimeout(ms);
621 }
622
623 @Override
624 public void setSession(WebSocketSession session)
625 {
626 this.session = session;
627 }
628
629 @Override
630 public SuspendToken suspend()
631 {
632 suspendToken.set(true);
633 return this;
634 }
635
636 @Override
637 public String toString()
638 {
639 return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
640 }
641
642 private <C> void write(ByteBuffer buffer)
643 {
644 EndPoint endpoint = getEndPoint();
645
646 if (!isOpen())
647 {
648 writeBytes.failAll(new IOException("Connection closed"));
649 return;
650 }
651
652 try
653 {
654 endpoint.write(writeBytes,buffer);
655 }
656 catch (Throwable t)
657 {
658 writeBytes.failed(t);
659 }
660 }
661 }