1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.RejectedExecutionException;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.eclipse.jetty.io.AbstractConnection;
32 import org.eclipse.jetty.io.ByteBufferPool;
33 import org.eclipse.jetty.io.Connection;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.util.BufferUtil;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.ForkInvoker;
38 import org.eclipse.jetty.util.log.Log;
39 import org.eclipse.jetty.util.log.Logger;
40 import org.eclipse.jetty.util.thread.Scheduler;
41 import org.eclipse.jetty.websocket.api.CloseException;
42 import org.eclipse.jetty.websocket.api.StatusCode;
43 import org.eclipse.jetty.websocket.api.SuspendToken;
44 import org.eclipse.jetty.websocket.api.WebSocketException;
45 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
46 import org.eclipse.jetty.websocket.api.WriteCallback;
47 import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
48 import org.eclipse.jetty.websocket.api.extensions.Frame;
49 import org.eclipse.jetty.websocket.common.CloseInfo;
50 import org.eclipse.jetty.websocket.common.ConnectionState;
51 import org.eclipse.jetty.websocket.common.Generator;
52 import org.eclipse.jetty.websocket.common.LogicalConnection;
53 import org.eclipse.jetty.websocket.common.Parser;
54 import org.eclipse.jetty.websocket.common.WebSocketSession;
55
56
57
58
59 public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection
60 {
61 private class FlushCallback implements Callback
62 {
63 @Override
64 public void failed(Throwable x)
65 {
66 LOG.warn("Write flush failure",x);
67 }
68
69 @Override
70 public void succeeded()
71 {
72
73 AbstractWebSocketConnection.this.complete(writeBytes);
74 }
75 }
76
77 private class FlushInvoker extends ForkInvoker<Callback>
78 {
79 private FlushInvoker()
80 {
81 super(4);
82 }
83
84 @Override
85 public void call(Callback callback)
86 {
87 flush();
88 }
89
90 @Override
91 public void fork(final Callback callback)
92 {
93 execute(new Runnable()
94 {
95 @Override
96 public void run()
97 {
98 flush();
99 }
100 });
101 }
102
103 @Override
104 public String toString()
105 {
106 return String.format("%s@%x",FlushInvoker.class.getSimpleName(),hashCode());
107 }
108 }
109
110 private class OnCloseCallback implements WriteCallback
111 {
112 @Override
113 public void writeFailed(Throwable x)
114 {
115 disconnect();
116 }
117
118 @Override
119 public void writeSuccess()
120 {
121 onWriteWebSocketClose();
122 }
123 }
124
125 public static class Stats {
126 private AtomicLong countFillInterestedEvents = new AtomicLong(0);
127 private AtomicLong countOnFillableEvents = new AtomicLong(0);
128 private AtomicLong countFillableErrors = new AtomicLong(0);
129
130 public long getFillableErrorCount() {
131 return countFillableErrors.get();
132 }
133
134 public long getFillInterestedCount() {
135 return countFillInterestedEvents.get();
136 }
137
138 public long getOnFillableCount() {
139 return countOnFillableEvents.get();
140 }
141 }
142
143 private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
144
145
146
147
148 private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD;
149
150 private final ForkInvoker<Callback> invoker = new FlushInvoker();
151 private final ByteBufferPool bufferPool;
152 private final Scheduler scheduler;
153 private final Generator generator;
154 private final Parser parser;
155 private final WebSocketPolicy policy;
156 private final WriteBytesProvider writeBytes;
157 private final AtomicBoolean suspendToken;
158 private WebSocketSession session;
159 private List<ExtensionConfig> extensions;
160 private boolean flushing;
161 private boolean isFilling;
162 private IOState ioState;
163 private Stats stats = new Stats();
164
165 public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
166 {
167 super(endp,executor,EXECUTE_ONFILLABLE);
168 this.policy = policy;
169 this.bufferPool = bufferPool;
170 this.generator = new Generator(policy,bufferPool);
171 this.parser = new Parser(policy,bufferPool);
172 this.scheduler = scheduler;
173 this.extensions = new ArrayList<>();
174 this.suspendToken = new AtomicBoolean(false);
175 this.ioState = new IOState();
176 this.ioState.setState(ConnectionState.CONNECTING);
177 this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
178 this.setInputBufferSize(policy.getInputBufferSize());
179 }
180
181 @Override
182 public void close()
183 {
184 close(StatusCode.NORMAL,null);
185 }
186
187 @Override
188 public void close(int statusCode, String reason)
189 {
190 enqueClose(statusCode,reason);
191 }
192
193 public void complete(final Callback callback)
194 {
195 LOG.debug("complete({})",callback);
196 synchronized (writeBytes)
197 {
198 flushing = false;
199 }
200
201 if (!ioState.isOpen() || (callback == null))
202 {
203 return;
204 }
205
206 invoker.invoke(callback);
207 }
208
209 @Override
210 public void disconnect()
211 {
212 disconnect(false);
213 }
214
215 public void disconnect(boolean onlyOutput)
216 {
217 ioState.setState(ConnectionState.CLOSED);
218 EndPoint endPoint = getEndPoint();
219
220
221 LOG.debug("Shutting down output {}",endPoint);
222 endPoint.shutdownOutput();
223 if (!onlyOutput)
224 {
225 LOG.debug("Closing {}",endPoint);
226 endPoint.close();
227 }
228 }
229
230
231
232
233
234
235
236
237
238
239 private void enqueClose(int statusCode, String reason)
240 {
241 synchronized (writeBytes)
242 {
243
244
245 if (writeBytes.isClosed())
246 {
247
248 return;
249 }
250 }
251 CloseInfo close = new CloseInfo(statusCode,reason);
252
253 outgoingFrame(close.asFrame(),new OnCloseCallback());
254 }
255
256 private void execute(Runnable task)
257 {
258 try
259 {
260 getExecutor().execute(task);
261 }
262 catch (RejectedExecutionException e)
263 {
264 LOG.debug("Job not dispatched: {}",task);
265 }
266 }
267
268 @Override
269 public void fillInterested()
270 {
271 stats.countFillInterestedEvents.incrementAndGet();
272 super.fillInterested();
273 }
274
275 public void flush()
276 {
277 ByteBuffer buffer = null;
278
279 synchronized (writeBytes)
280 {
281 if (writeBytes.isFailed())
282 {
283 LOG.debug(".flush() - queue is in failed state");
284 return;
285 }
286
287 if (flushing)
288 {
289 return;
290 }
291
292 if (LOG.isDebugEnabled())
293 {
294 LOG.debug(".flush() - flushing={} - writeBytes={}",flushing,writeBytes);
295 }
296
297 if (!isOpen())
298 {
299
300 writeBytes.failAll(new WebSocketException("Connection closed"));
301 return;
302 }
303
304 buffer = writeBytes.getByteBuffer();
305
306 if (buffer == null)
307 {
308 return;
309 }
310
311 flushing = true;
312
313 if (LOG.isDebugEnabled())
314 {
315 LOG.debug("Flushing {} - {}",BufferUtil.toDetailString(buffer),writeBytes);
316 }
317 }
318
319 write(buffer);
320 }
321
322 public ByteBufferPool getBufferPool()
323 {
324 return bufferPool;
325 }
326
327
328
329
330
331
332
333
334 public List<ExtensionConfig> getExtensions()
335 {
336 return extensions;
337 }
338
339 public Generator getGenerator()
340 {
341 return generator;
342 }
343
344 @Override
345 public IOState getIOState()
346 {
347 return ioState;
348 }
349
350 public Parser getParser()
351 {
352 return parser;
353 }
354
355 @Override
356 public WebSocketPolicy getPolicy()
357 {
358 return this.policy;
359 }
360
361 @Override
362 public InetSocketAddress getRemoteAddress()
363 {
364 return getEndPoint().getRemoteAddress();
365 }
366
367 public Scheduler getScheduler()
368 {
369 return scheduler;
370 }
371
372 @Override
373 public WebSocketSession getSession()
374 {
375 return session;
376 }
377
378 public Stats getStats()
379 {
380 return stats;
381 }
382
383 @Override
384 public boolean isOpen()
385 {
386 return getIOState().isOpen() && getEndPoint().isOpen();
387 }
388
389 @Override
390 public boolean isReading()
391 {
392 return isFilling;
393 }
394
395 @Override
396 public void onClose()
397 {
398 super.onClose();
399 this.getIOState().setState(ConnectionState.CLOSED);
400 }
401
402 @Override
403 public void onFillable()
404 {
405 LOG.debug("{} onFillable()",policy.getBehavior());
406 stats.countOnFillableEvents.incrementAndGet();
407 ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
408 BufferUtil.clear(buffer);
409 boolean readMore = false;
410 try
411 {
412 isFilling = true;
413 readMore = (read(buffer) != -1);
414 }
415 finally
416 {
417 bufferPool.release(buffer);
418 }
419
420 if (readMore && (suspendToken.get() == false))
421 {
422 fillInterested();
423 }
424 else
425 {
426 isFilling = false;
427 }
428 }
429
430 @Override
431 protected void onFillInterestedFailed(Throwable cause)
432 {
433 LOG.ignore(cause);
434 stats.countFillInterestedEvents.incrementAndGet();
435 super.onFillInterestedFailed(cause);
436 }
437
438 @Override
439 public void onOpen()
440 {
441 super.onOpen();
442 this.ioState.setState(ConnectionState.OPEN);
443 LOG.debug("fillInterested");
444 fillInterested();
445 }
446
447 @Override
448 protected boolean onReadTimeout()
449 {
450 LOG.warn("Read Timeout");
451
452 IOState state = getIOState();
453 if ((state.getState() == ConnectionState.CLOSING) || (state.getState() == ConnectionState.CLOSED))
454 {
455
456
457 return true;
458 }
459
460
461
462 session.close(StatusCode.NORMAL,"Idle Timeout");
463
464 return false;
465 }
466
467 public void onWriteWebSocketClose()
468 {
469 if (ioState.onCloseHandshake(false))
470 {
471 disconnect();
472 }
473 }
474
475
476
477
478 @Override
479 public void outgoingFrame(Frame frame, WriteCallback callback)
480 {
481 if (LOG.isDebugEnabled())
482 {
483 LOG.debug("outgoingFrame({}, {})",frame,callback);
484 }
485
486 if (!isOpen())
487 {
488 return;
489 }
490
491 synchronized (writeBytes)
492 {
493 writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
494 }
495
496 flush();
497 }
498
499 private int read(ByteBuffer buffer)
500 {
501 EndPoint endPoint = getEndPoint();
502 try
503 {
504 while (true)
505 {
506 int filled = endPoint.fill(buffer);
507 if (filled == 0)
508 {
509 return 0;
510 }
511 else if (filled < 0)
512 {
513 LOG.debug("read - EOF Reached");
514 return -1;
515 }
516 else
517 {
518 if (LOG.isDebugEnabled())
519 {
520 LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
521 }
522 parser.parse(buffer);
523 }
524 }
525 }
526 catch (IOException e)
527 {
528 LOG.warn(e);
529 enqueClose(StatusCode.PROTOCOL,e.getMessage());
530 return -1;
531 }
532 catch (CloseException e)
533 {
534 LOG.warn(e);
535 enqueClose(e.getStatusCode(),e.getMessage());
536 return -1;
537 }
538 }
539
540 @Override
541 public void resume()
542 {
543 if (suspendToken.getAndSet(false))
544 {
545 fillInterested();
546 }
547 }
548
549
550
551
552
553
554
555
556
557 public void setExtensions(List<ExtensionConfig> extensions)
558 {
559 this.extensions = extensions;
560 }
561
562 @Override
563 public void setInputBufferSize(int inputBufferSize)
564 {
565 if(inputBufferSize < MIN_BUFFER_SIZE) {
566 throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
567 }
568 super.setInputBufferSize(inputBufferSize);
569 }
570
571 @Override
572 public void setSession(WebSocketSession session)
573 {
574 this.session = session;
575 }
576
577 @Override
578 public SuspendToken suspend()
579 {
580 suspendToken.set(true);
581 return this;
582 }
583
584 @Override
585 public String toString()
586 {
587 return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
588 }
589
590 private <C> void write(ByteBuffer buffer)
591 {
592 EndPoint endpoint = getEndPoint();
593
594 if (!isOpen())
595 {
596 writeBytes.failAll(new IOException("Connection closed"));
597 return;
598 }
599
600 try
601 {
602 endpoint.write(writeBytes,buffer);
603 }
604 catch (Throwable t)
605 {
606 writeBytes.failed(t);
607 }
608 }
609 }