1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketTimeoutException;
25 import java.nio.ByteBuffer;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.ByteBufferPool;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.StringUtil;
39 import org.eclipse.jetty.util.component.ContainerLifeCycle;
40 import org.eclipse.jetty.util.component.Dumpable;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43 import org.eclipse.jetty.util.thread.Scheduler;
44 import org.eclipse.jetty.websocket.api.BatchMode;
45 import org.eclipse.jetty.websocket.api.CloseException;
46 import org.eclipse.jetty.websocket.api.CloseStatus;
47 import org.eclipse.jetty.websocket.api.StatusCode;
48 import org.eclipse.jetty.websocket.api.SuspendToken;
49 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
50 import org.eclipse.jetty.websocket.api.WriteCallback;
51 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
52 import org.eclipse.jetty.websocket.api.extensions.Frame;
53 import org.eclipse.jetty.websocket.common.CloseInfo;
54 import org.eclipse.jetty.websocket.common.ConnectionState;
55 import org.eclipse.jetty.websocket.common.Generator;
56 import org.eclipse.jetty.websocket.common.LogicalConnection;
57 import org.eclipse.jetty.websocket.common.Parser;
58 import org.eclipse.jetty.websocket.common.WebSocketSession;
59 import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
60
61
62
63
64
65 public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable
66 {
67 private class Flusher extends FrameFlusher
68 {
69 private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
70 {
71 super(bufferPool, generator, endpoint, getPolicy().getMaxBinaryMessageBufferSize(), 8);
72 }
73
74 @Override
75 protected void onFailure(Throwable x)
76 {
77 if (ioState.wasAbnormalClose())
78 {
79 LOG.ignore(x);
80 return;
81 }
82
83 LOG.debug("Write flush failure",x);
84
85
86
87 String reason = "Websocket write failure";
88
89 if (x instanceof EOFException)
90 {
91 reason = "EOF";
92 Throwable cause = x.getCause();
93 if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
94 {
95 reason = "EOF: " + cause.getMessage();
96 }
97 }
98 else
99 {
100 if (StringUtil.isNotBlank(x.getMessage()))
101 {
102 reason = x.getMessage();
103 }
104 }
105
106
107 reason = CloseStatus.trimMaxReasonLength(reason);
108 session.notifyError(x);
109 session.notifyClose(StatusCode.NO_CLOSE,reason);
110
111 disconnect();
112 }
113 }
114
115 public class OnDisconnectCallback implements WriteCallback
116 {
117 @Override
118 public void writeFailed(Throwable x)
119 {
120 disconnect();
121 }
122
123 @Override
124 public void writeSuccess()
125 {
126 disconnect();
127 }
128 }
129
130 public static class Stats
131 {
132 private AtomicLong countFillInterestedEvents = new AtomicLong(0);
133 private AtomicLong countOnFillableEvents = new AtomicLong(0);
134 private AtomicLong countFillableErrors = new AtomicLong(0);
135
136 public long getFillableErrorCount()
137 {
138 return countFillableErrors.get();
139 }
140
141 public long getFillInterestedCount()
142 {
143 return countFillInterestedEvents.get();
144 }
145
146 public long getOnFillableCount()
147 {
148 return countOnFillableEvents.get();
149 }
150 }
151
152 private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
153
154
155
156
157 private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
158
159 private final ByteBufferPool bufferPool;
160 private final Scheduler scheduler;
161 private final Generator generator;
162 private final Parser parser;
163 private final WebSocketPolicy policy;
164 private final AtomicBoolean suspendToken;
165 private final FrameFlusher flusher;
166 private WebSocketSession session;
167 private List<ExtensionConfig> extensions;
168 private boolean isFilling;
169 private IOState ioState;
170 private Stats stats = new Stats();
171
172 public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
173 {
174 super(endp,executor,EXECUTE_ONFILLABLE);
175 this.policy = policy;
176 this.bufferPool = bufferPool;
177 this.generator = new Generator(policy,bufferPool);
178 this.parser = new Parser(policy,bufferPool);
179 this.scheduler = scheduler;
180 this.extensions = new ArrayList<>();
181 this.suspendToken = new AtomicBoolean(false);
182 this.ioState = new IOState();
183 this.ioState.addListener(this);
184 this.flusher = new Flusher(bufferPool,generator,endp);
185 this.setInputBufferSize(policy.getInputBufferSize());
186 this.setMaxIdleTimeout(policy.getIdleTimeout());
187 }
188
189 @Override
190 public Executor getExecutor()
191 {
192 return super.getExecutor();
193 }
194
195 @Override
196 public void close()
197 {
198 close(StatusCode.NORMAL,null);
199 }
200
201
202
203
204
205
206
207
208
209
210
211
212 @Override
213 public void close(int statusCode, String reason)
214 {
215 CloseInfo close = new CloseInfo(statusCode,reason);
216 if (statusCode == StatusCode.ABNORMAL)
217 {
218 flusher.close();
219 ioState.onAbnormalClose(close);
220 }
221 else
222 {
223 ioState.onCloseLocal(close);
224 }
225 }
226
227
228 @Override
229 public void disconnect()
230 {
231 LOG.debug("{} disconnect()",policy.getBehavior());
232 flusher.close();
233 disconnect(false);
234 }
235
236 private void disconnect(boolean onlyOutput)
237 {
238 EndPoint endPoint = getEndPoint();
239
240
241 LOG.debug("Shutting down output {}",endPoint);
242 endPoint.shutdownOutput();
243 if (!onlyOutput)
244 {
245 LOG.debug("Closing {}",endPoint);
246 endPoint.close();
247 }
248 }
249
250 protected void execute(Runnable task)
251 {
252 try
253 {
254 getExecutor().execute(task);
255 }
256 catch (RejectedExecutionException e)
257 {
258 LOG.debug("Job not dispatched: {}",task);
259 }
260 }
261
262 @Override
263 public void fillInterested()
264 {
265 stats.countFillInterestedEvents.incrementAndGet();
266 super.fillInterested();
267 }
268
269 @Override
270 public ByteBufferPool getBufferPool()
271 {
272 return bufferPool;
273 }
274
275
276
277
278
279
280
281
282 public List<ExtensionConfig> getExtensions()
283 {
284 return extensions;
285 }
286
287 public Generator getGenerator()
288 {
289 return generator;
290 }
291
292 @Override
293 public long getIdleTimeout()
294 {
295 return getEndPoint().getIdleTimeout();
296 }
297
298 @Override
299 public IOState getIOState()
300 {
301 return ioState;
302 }
303
304 @Override
305 public long getMaxIdleTimeout()
306 {
307 return getEndPoint().getIdleTimeout();
308 }
309
310 public Parser getParser()
311 {
312 return parser;
313 }
314
315 @Override
316 public WebSocketPolicy getPolicy()
317 {
318 return this.policy;
319 }
320
321 @Override
322 public InetSocketAddress getRemoteAddress()
323 {
324 return getEndPoint().getRemoteAddress();
325 }
326
327 public Scheduler getScheduler()
328 {
329 return scheduler;
330 }
331
332 @Override
333 public WebSocketSession getSession()
334 {
335 return session;
336 }
337
338 public Stats getStats()
339 {
340 return stats;
341 }
342
343 @Override
344 public boolean isOpen()
345 {
346 return getIOState().isOpen() && getEndPoint().isOpen();
347 }
348
349 @Override
350 public boolean isReading()
351 {
352 return isFilling;
353 }
354
355
356
357
358
359
360 @Override
361 public void onClose()
362 {
363 super.onClose();
364 flusher.close();
365 }
366
367 @Override
368 public void onConnectionStateChange(ConnectionState state)
369 {
370 LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
371 switch (state)
372 {
373 case OPEN:
374 LOG.debug("fillInterested");
375 fillInterested();
376 break;
377 case CLOSED:
378 if (ioState.wasAbnormalClose())
379 {
380
381 CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
382 outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(), BatchMode.OFF);
383 }
384 else
385 {
386
387 this.disconnect();
388 }
389 break;
390 case CLOSING:
391 CloseInfo close = ioState.getCloseInfo();
392
393 outgoingFrame(close.asFrame(),new OnDisconnectCallback(), BatchMode.OFF);
394 default:
395 break;
396 }
397 }
398
399 @Override
400 public void onFillable()
401 {
402 LOG.debug("{} onFillable()",policy.getBehavior());
403 stats.countOnFillableEvents.incrementAndGet();
404 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
405 boolean readMore = false;
406 try
407 {
408 isFilling = true;
409 readMore = (read(buffer) != -1);
410 }
411 finally
412 {
413 bufferPool.release(buffer);
414 }
415
416 if (readMore && (suspendToken.get() == false))
417 {
418 fillInterested();
419 }
420 else
421 {
422 isFilling = false;
423 }
424 }
425
426 @Override
427 protected void onFillInterestedFailed(Throwable cause)
428 {
429 LOG.ignore(cause);
430 stats.countFillInterestedEvents.incrementAndGet();
431 super.onFillInterestedFailed(cause);
432 }
433
434 @Override
435 public void onOpen()
436 {
437 super.onOpen();
438 this.ioState.onOpened();
439 }
440
441 @Override
442 protected boolean onReadTimeout()
443 {
444 LOG.debug("{} Read Timeout",policy.getBehavior());
445
446 IOState state = getIOState();
447 if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
448 {
449
450
451 return true;
452 }
453
454
455 session.notifyError(new SocketTimeoutException("Timeout on Read"));
456
457 close(StatusCode.ABNORMAL,"Idle Timeout");
458
459 return false;
460 }
461
462
463
464
465 @Override
466 public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
467 {
468 if (LOG.isDebugEnabled())
469 {
470 LOG.debug("outgoingFrame({}, {})",frame,callback);
471 }
472
473 flusher.enqueue(frame,callback, batchMode);
474 }
475
476 private int read(ByteBuffer buffer)
477 {
478 EndPoint endPoint = getEndPoint();
479 try
480 {
481 while (true)
482 {
483 int filled = endPoint.fill(buffer);
484 if (filled == 0)
485 {
486 return 0;
487 }
488 else if (filled < 0)
489 {
490 LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
491 ioState.onReadEOF();
492 return -1;
493 }
494 else
495 {
496 if (LOG.isDebugEnabled())
497 {
498 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
499 }
500 parser.parse(buffer);
501
502 }
503 }
504 }
505 catch (IOException e)
506 {
507 LOG.warn(e);
508 close(StatusCode.PROTOCOL,e.getMessage());
509 return -1;
510 }
511 catch (CloseException e)
512 {
513 LOG.warn(e);
514 close(e.getStatusCode(),e.getMessage());
515 return -1;
516 }
517 }
518
519 @Override
520 public void resume()
521 {
522 if (suspendToken.getAndSet(false))
523 {
524 fillInterested();
525 }
526 }
527
528
529
530
531
532
533
534
535
536 public void setExtensions(List<ExtensionConfig> extensions)
537 {
538 this.extensions = extensions;
539 }
540
541 @Override
542 public void setInputBufferSize(int inputBufferSize)
543 {
544 if (inputBufferSize < MIN_BUFFER_SIZE)
545 {
546 throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
547 }
548 super.setInputBufferSize(inputBufferSize);
549 }
550
551 @Override
552 public void setMaxIdleTimeout(long ms)
553 {
554 getEndPoint().setIdleTimeout(ms);
555 }
556
557 @Override
558 public void setSession(WebSocketSession session)
559 {
560 this.session = session;
561 }
562
563 @Override
564 public SuspendToken suspend()
565 {
566 suspendToken.set(true);
567 return this;
568 }
569
570 @Override
571 public String dump()
572 {
573 return ContainerLifeCycle.dump(this);
574 }
575
576 @Override
577 public void dump(Appendable out, String indent) throws IOException
578 {
579 out.append(toString()).append(System.lineSeparator());
580 }
581
582 @Override
583 public String toString()
584 {
585 return String.format("%s{f=%s,g=%s,p=%s}",super.toString(),flusher,generator,parser);
586 }
587
588 }