1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.InterruptedByTimeoutException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.HashSet;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Objects;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.CopyOnWriteArrayList;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43 import org.eclipse.jetty.io.ByteBufferPool;
44 import org.eclipse.jetty.io.EndPoint;
45 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
46 import org.eclipse.jetty.spdy.api.DataInfo;
47 import org.eclipse.jetty.spdy.api.GoAwayInfo;
48 import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
49 import org.eclipse.jetty.spdy.api.PingInfo;
50 import org.eclipse.jetty.spdy.api.PingResultInfo;
51 import org.eclipse.jetty.spdy.api.PushInfo;
52 import org.eclipse.jetty.spdy.api.RstInfo;
53 import org.eclipse.jetty.spdy.api.SPDYException;
54 import org.eclipse.jetty.spdy.api.Session;
55 import org.eclipse.jetty.spdy.api.SessionFrameListener;
56 import org.eclipse.jetty.spdy.api.SessionStatus;
57 import org.eclipse.jetty.spdy.api.Settings;
58 import org.eclipse.jetty.spdy.api.SettingsInfo;
59 import org.eclipse.jetty.spdy.api.Stream;
60 import org.eclipse.jetty.spdy.api.StreamFrameListener;
61 import org.eclipse.jetty.spdy.api.StreamStatus;
62 import org.eclipse.jetty.spdy.api.SynInfo;
63 import org.eclipse.jetty.spdy.frames.ControlFrame;
64 import org.eclipse.jetty.spdy.frames.ControlFrameType;
65 import org.eclipse.jetty.spdy.frames.CredentialFrame;
66 import org.eclipse.jetty.spdy.frames.DataFrame;
67 import org.eclipse.jetty.spdy.frames.GoAwayFrame;
68 import org.eclipse.jetty.spdy.frames.HeadersFrame;
69 import org.eclipse.jetty.spdy.frames.PingFrame;
70 import org.eclipse.jetty.spdy.frames.RstStreamFrame;
71 import org.eclipse.jetty.spdy.frames.SettingsFrame;
72 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
73 import org.eclipse.jetty.spdy.frames.SynStreamFrame;
74 import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
75 import org.eclipse.jetty.spdy.generator.Generator;
76 import org.eclipse.jetty.spdy.parser.Parser;
77 import org.eclipse.jetty.util.Atomics;
78 import org.eclipse.jetty.util.BufferUtil;
79 import org.eclipse.jetty.util.Callback;
80 import org.eclipse.jetty.util.ForkInvoker;
81 import org.eclipse.jetty.util.FutureCallback;
82 import org.eclipse.jetty.util.FuturePromise;
83 import org.eclipse.jetty.util.Promise;
84 import org.eclipse.jetty.util.component.ContainerLifeCycle;
85 import org.eclipse.jetty.util.component.Dumpable;
86 import org.eclipse.jetty.util.log.Log;
87 import org.eclipse.jetty.util.log.Logger;
88 import org.eclipse.jetty.util.thread.Scheduler;
89
90 public class StandardSession implements ISession, Parser.Listener, Dumpable
91 {
92 private static final Logger LOG = Log.getLogger(Session.class);
93
94 private final ForkInvoker<Callback> invoker = new SessionInvoker();
95 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
96 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
97 private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
98 private final LinkedList<FrameBytes> queue = new LinkedList<>();
99 private final ByteBufferPool bufferPool;
100 private final Executor threadPool;
101 private final Scheduler scheduler;
102 private final short version;
103 private final Controller controller;
104 private final EndPoint endPoint;
105 private final IdleListener idleListener;
106 private final AtomicInteger streamIds;
107 private final AtomicInteger pingIds;
108 private final SessionFrameListener listener;
109 private final Generator generator;
110 private final AtomicBoolean goAwaySent = new AtomicBoolean();
111 private final AtomicBoolean goAwayReceived = new AtomicBoolean();
112 private final AtomicInteger lastStreamId = new AtomicInteger();
113 private final FlowControlStrategy flowControlStrategy;
114 private boolean flushing;
115 private Throwable failure;
116
117 public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, Scheduler scheduler,
118 Controller controller, EndPoint endPoint, IdleListener idleListener, int initialStreamId,
119 SessionFrameListener listener, Generator generator, FlowControlStrategy flowControlStrategy)
120 {
121
122 this.version = version;
123 this.bufferPool = bufferPool;
124 this.threadPool = threadPool;
125 this.scheduler = scheduler;
126 this.controller = controller;
127 this.endPoint = endPoint;
128 this.idleListener = idleListener;
129 this.streamIds = new AtomicInteger(initialStreamId);
130 this.pingIds = new AtomicInteger(initialStreamId);
131 this.listener = listener;
132 this.generator = generator;
133 this.flowControlStrategy = flowControlStrategy;
134 }
135
136 @Override
137 public short getVersion()
138 {
139 return version;
140 }
141
142 @Override
143 public void addListener(Listener listener)
144 {
145 listeners.add(listener);
146 }
147
148 @Override
149 public void removeListener(Listener listener)
150 {
151 listeners.remove(listener);
152 }
153
154 @Override
155 public Stream syn(SynInfo synInfo, StreamFrameListener listener) throws ExecutionException, InterruptedException, TimeoutException
156 {
157 FuturePromise<Stream> result = new FuturePromise<>();
158 syn(synInfo, listener, result);
159 if (synInfo.getTimeout() > 0)
160 return result.get(synInfo.getTimeout(), synInfo.getUnit());
161 else
162 return result.get();
163 }
164
165 @Override
166 public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise)
167 {
168
169
170
171
172
173
174 int associatedStreamId = 0;
175 if (synInfo instanceof PushSynInfo)
176 associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
177
178 synchronized (this)
179 {
180 int streamId = streamIds.getAndAdd(2);
181
182 SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
183 IStream stream = createStream(synStream, listener, true, promise);
184 generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
185 }
186 flush();
187 }
188
189 @Override
190 public void rst(RstInfo rstInfo) throws InterruptedException, ExecutionException, TimeoutException
191 {
192 FutureCallback result = new FutureCallback();
193 rst(rstInfo, result);
194 if (rstInfo.getTimeout() > 0)
195 result.get(rstInfo.getTimeout(), rstInfo.getUnit());
196 else
197 result.get();
198 }
199
200 @Override
201 public void rst(RstInfo rstInfo, Callback callback)
202 {
203
204 if (goAwaySent.get())
205 {
206 complete(callback);
207 }
208 else
209 {
210 int streamId = rstInfo.getStreamId();
211 IStream stream = streams.get(streamId);
212 RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
213 control(stream, frame, rstInfo.getTimeout(), rstInfo.getUnit(), callback);
214 if (stream != null)
215 {
216 stream.process(frame);
217 removeStream(stream);
218 }
219 }
220 }
221
222 @Override
223 public void settings(SettingsInfo settingsInfo) throws ExecutionException, InterruptedException, TimeoutException
224 {
225 FutureCallback result = new FutureCallback();
226 settings(settingsInfo, result);
227 if (settingsInfo.getTimeout() > 0)
228 result.get(settingsInfo.getTimeout(), settingsInfo.getUnit());
229 else
230 result.get();
231 }
232
233 @Override
234 public void settings(SettingsInfo settingsInfo, Callback callback)
235 {
236 SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
237 control(null, frame, settingsInfo.getTimeout(), settingsInfo.getUnit(), callback);
238 }
239
240 @Override
241 public PingResultInfo ping(PingInfo pingInfo) throws ExecutionException, InterruptedException, TimeoutException
242 {
243
244 FuturePromise<PingResultInfo> result = new FuturePromise<>();
245 ping(pingInfo, result);
246 if (pingInfo.getTimeout() > 0)
247 return result.get(pingInfo.getTimeout(), pingInfo.getUnit());
248 else
249 return result.get();
250 }
251
252 @Override
253 public void ping(PingInfo pingInfo, Promise<PingResultInfo> promise)
254 {
255 int pingId = pingIds.getAndAdd(2);
256 PingInfoCallback pingInfoCallback = new PingInfoCallback(pingId, promise);
257 PingFrame frame = new PingFrame(version, pingId);
258 control(null, frame, pingInfo.getTimeout(), pingInfo.getUnit(), pingInfoCallback);
259 }
260
261 @Override
262 public void goAway(GoAwayInfo goAwayInfo) throws ExecutionException, InterruptedException, TimeoutException
263 {
264 goAway(goAwayInfo, SessionStatus.OK);
265 }
266
267 private void goAway(GoAwayInfo goAwayInfo, SessionStatus sessionStatus) throws ExecutionException, InterruptedException, TimeoutException
268 {
269 FutureCallback result = new FutureCallback();
270 goAway(sessionStatus, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), result);
271 if (goAwayInfo.getTimeout() > 0)
272 result.get(goAwayInfo.getTimeout(), goAwayInfo.getUnit());
273 else
274 result.get();
275 }
276
277 @Override
278 public void goAway(GoAwayInfo goAwayInfo, Callback callback)
279 {
280 goAway(SessionStatus.OK, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), callback);
281 }
282
283 private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback callback)
284 {
285 if (goAwaySent.compareAndSet(false, true))
286 {
287 if (!goAwayReceived.get())
288 {
289 GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode());
290 control(null, frame, timeout, unit, callback);
291 return;
292 }
293 }
294 complete(callback);
295 }
296
297 @Override
298 public Set<Stream> getStreams()
299 {
300 Set<Stream> result = new HashSet<>();
301 result.addAll(streams.values());
302 return result;
303 }
304
305 @Override
306 public IStream getStream(int streamId)
307 {
308 return streams.get(streamId);
309 }
310
311 @Override
312 public Object getAttribute(String key)
313 {
314 return attributes.get(key);
315 }
316
317 @Override
318 public void setAttribute(String key, Object value)
319 {
320 attributes.put(key, value);
321 }
322
323 @Override
324 public Object removeAttribute(String key)
325 {
326 return attributes.remove(key);
327 }
328
329 @Override
330 public InetSocketAddress getLocalAddress()
331 {
332 return endPoint.getLocalAddress();
333 }
334
335 @Override
336 public InetSocketAddress getRemoteAddress()
337 {
338 return endPoint.getRemoteAddress();
339 }
340
341 @Override
342 public void onControlFrame(ControlFrame frame)
343 {
344 notifyIdle(idleListener, false);
345 try
346 {
347 LOG.debug("Processing {}", frame);
348
349 if (goAwaySent.get())
350 {
351 LOG.debug("Skipped processing of {}", frame);
352 return;
353 }
354
355 switch (frame.getType())
356 {
357 case SYN_STREAM:
358 {
359 onSyn((SynStreamFrame)frame);
360 break;
361 }
362 case SYN_REPLY:
363 {
364 onReply((SynReplyFrame)frame);
365 break;
366 }
367 case RST_STREAM:
368 {
369 onRst((RstStreamFrame)frame);
370 break;
371 }
372 case SETTINGS:
373 {
374 onSettings((SettingsFrame)frame);
375 break;
376 }
377 case NOOP:
378 {
379
380 break;
381 }
382 case PING:
383 {
384 onPing((PingFrame)frame);
385 break;
386 }
387 case GO_AWAY:
388 {
389 onGoAway((GoAwayFrame)frame);
390 break;
391 }
392 case HEADERS:
393 {
394 onHeaders((HeadersFrame)frame);
395 break;
396 }
397 case WINDOW_UPDATE:
398 {
399 onWindowUpdate((WindowUpdateFrame)frame);
400 break;
401 }
402 case CREDENTIAL:
403 {
404 onCredential((CredentialFrame)frame);
405 break;
406 }
407 default:
408 {
409 throw new IllegalStateException();
410 }
411 }
412 }
413 finally
414 {
415 notifyIdle(idleListener, true);
416 }
417 }
418
419 @Override
420 public void onDataFrame(DataFrame frame, ByteBuffer data)
421 {
422 notifyIdle(idleListener, false);
423 try
424 {
425 LOG.debug("Processing {}, {} data bytes", frame, data.remaining());
426
427 if (goAwaySent.get())
428 {
429 LOG.debug("Skipped processing of {}", frame);
430 return;
431 }
432
433 int streamId = frame.getStreamId();
434 IStream stream = streams.get(streamId);
435 if (stream == null)
436 {
437 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
438 LOG.debug("Unknown stream {}", rstInfo);
439 rst(rstInfo, new Callback.Adapter());
440 }
441 else
442 {
443 processData(stream, frame, data);
444 }
445 }
446 finally
447 {
448 notifyIdle(idleListener, true);
449 }
450 }
451
452 private void notifyIdle(IdleListener listener, boolean idle)
453 {
454 if (listener != null)
455 listener.onIdle(idle);
456 }
457
458 private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
459 {
460 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose())
461 {
462 @Override
463 public void consume(int delta)
464 {
465 super.consume(delta);
466 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
467 }
468 };
469 flowControlStrategy.onDataReceived(this, stream, dataInfo);
470 stream.process(dataInfo);
471 if (stream.isClosed())
472 removeStream(stream);
473 }
474
475 @Override
476 public void onStreamException(StreamException x)
477 {
478 notifyOnException(listener, x);
479 rst(new RstInfo(x.getStreamId(), x.getStreamStatus()), new Callback.Adapter());
480 }
481
482 @Override
483 public void onSessionException(SessionException x)
484 {
485 Throwable cause = x.getCause();
486 notifyOnException(listener, cause == null ? x : cause);
487 goAway(x.getSessionStatus(), 0, TimeUnit.SECONDS, new Callback.Adapter());
488 }
489
490 private void onSyn(SynStreamFrame frame)
491 {
492 IStream stream = createStream(frame, null, false, null);
493 if (stream != null)
494 processSyn(listener, stream, frame);
495 }
496
497 private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
498 {
499 stream.process(frame);
500
501 updateLastStreamId(stream);
502 StreamFrameListener streamListener;
503 if (stream.isUnidirectional())
504 {
505 PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
506 streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
507 }
508 else
509 {
510 SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
511 streamListener = notifyOnSyn(listener, stream, synInfo);
512 }
513 stream.setStreamFrameListener(streamListener);
514 flush();
515
516 if (stream.isClosed())
517 removeStream(stream);
518 }
519
520 private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
521 {
522 IStream associatedStream = streams.get(frame.getAssociatedStreamId());
523 IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise);
524 flowControlStrategy.onNewStream(this, stream);
525
526 stream.updateCloseState(frame.isClose(), local);
527 stream.setStreamFrameListener(listener);
528
529 if (stream.isUnidirectional())
530 {
531
532 stream.updateCloseState(true, !local);
533 if (!stream.isClosed())
534 stream.getAssociatedStream().associate(stream);
535 }
536
537 int streamId = stream.getId();
538 if (streams.putIfAbsent(streamId, stream) != null)
539 {
540 if (local)
541 throw new IllegalStateException("Duplicate stream id " + streamId);
542 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
543 LOG.debug("Duplicate stream, {}", rstInfo);
544 try
545 {
546 rst(rstInfo);
547 }
548 catch (InterruptedException | ExecutionException | TimeoutException e)
549 {
550 e.printStackTrace();
551 }
552 return null;
553 }
554 else
555 {
556 LOG.debug("Created {}", stream);
557 if (local)
558 notifyStreamCreated(stream);
559 return stream;
560 }
561 }
562
563 private void notifyStreamCreated(IStream stream)
564 {
565 for (Listener listener : listeners)
566 {
567 if (listener instanceof StreamListener)
568 {
569 try
570 {
571 ((StreamListener)listener).onStreamCreated(stream);
572 }
573 catch (Exception x)
574 {
575 LOG.info("Exception while notifying listener " + listener, x);
576 }
577 catch (Error x)
578 {
579 LOG.info("Exception while notifying listener " + listener, x);
580 throw x;
581 }
582 }
583 }
584 }
585
586 private void removeStream(IStream stream)
587 {
588 if (stream.isUnidirectional())
589 stream.getAssociatedStream().disassociate(stream);
590
591 IStream removed = streams.remove(stream.getId());
592 if (removed != null)
593 assert removed == stream;
594
595 LOG.debug("Removed {}", stream);
596 notifyStreamClosed(stream);
597 }
598
599 private void notifyStreamClosed(IStream stream)
600 {
601 for (Listener listener : listeners)
602 {
603 if (listener instanceof StreamListener)
604 {
605 try
606 {
607 ((StreamListener)listener).onStreamClosed(stream);
608 }
609 catch (Exception x)
610 {
611 LOG.info("Exception while notifying listener " + listener, x);
612 }
613 catch (Error x)
614 {
615 LOG.info("Exception while notifying listener " + listener, x);
616 throw x;
617 }
618 }
619 }
620 }
621
622 private void onReply(SynReplyFrame frame)
623 {
624 int streamId = frame.getStreamId();
625 IStream stream = streams.get(streamId);
626 if (stream == null)
627 {
628 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
629 LOG.debug("Unknown stream {}", rstInfo);
630 rst(rstInfo, new Callback.Adapter());
631 }
632 else
633 {
634 processReply(stream, frame);
635 }
636 }
637
638 private void processReply(IStream stream, SynReplyFrame frame)
639 {
640 stream.process(frame);
641 if (stream.isClosed())
642 removeStream(stream);
643 }
644
645 private void onRst(RstStreamFrame frame)
646 {
647 IStream stream = streams.get(frame.getStreamId());
648
649 if (stream != null)
650 stream.process(frame);
651
652 RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode()));
653 notifyOnRst(listener, rstInfo);
654 flush();
655
656 if (stream != null)
657 removeStream(stream);
658 }
659
660 private void onSettings(SettingsFrame frame)
661 {
662 Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
663 if (windowSizeSetting != null)
664 {
665 int windowSize = windowSizeSetting.value();
666 setWindowSize(windowSize);
667 LOG.debug("Updated session window size to {}", windowSize);
668 }
669 SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
670 notifyOnSettings(listener, settingsInfo);
671 flush();
672 }
673
674 private void onPing(PingFrame frame)
675 {
676 int pingId = frame.getPingId();
677 if (pingId % 2 == pingIds.get() % 2)
678 {
679 PingResultInfo pingResultInfo = new PingResultInfo(frame.getPingId());
680 notifyOnPing(listener, pingResultInfo);
681 flush();
682 }
683 else
684 {
685 control(null, frame, 0, TimeUnit.MILLISECONDS, new Callback.Adapter());
686 }
687 }
688
689 private void onGoAway(GoAwayFrame frame)
690 {
691 if (goAwayReceived.compareAndSet(false, true))
692 {
693
694 GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
695 notifyOnGoAway(listener, goAwayResultInfo);
696 flush();
697
698
699
700 }
701 }
702
703 private void onHeaders(HeadersFrame frame)
704 {
705 int streamId = frame.getStreamId();
706 IStream stream = streams.get(streamId);
707 if (stream == null)
708 {
709 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
710 LOG.debug("Unknown stream, {}", rstInfo);
711 rst(rstInfo, new Callback.Adapter());
712 }
713 else
714 {
715 processHeaders(stream, frame);
716 }
717 }
718
719 private void processHeaders(IStream stream, HeadersFrame frame)
720 {
721 stream.process(frame);
722 if (stream.isClosed())
723 removeStream(stream);
724 }
725
726 private void onWindowUpdate(WindowUpdateFrame frame)
727 {
728 int streamId = frame.getStreamId();
729 IStream stream = streams.get(streamId);
730 flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
731 flush();
732 }
733
734 private void onCredential(CredentialFrame frame)
735 {
736 LOG.warn("{} frame not yet supported", frame.getType());
737 flush();
738 }
739
740 protected void close()
741 {
742
743 if (controller != null)
744 controller.close(false);
745 }
746
747 private void notifyOnException(SessionFrameListener listener, Throwable x)
748 {
749 try
750 {
751 if (listener != null)
752 {
753 LOG.debug("Invoking callback with {} on listener {}", x, listener);
754 listener.onException(x);
755 }
756 }
757 catch (Exception xx)
758 {
759 LOG.info("Exception while notifying listener " + listener, xx);
760 }
761 catch (Error xx)
762 {
763 LOG.info("Exception while notifying listener " + listener, xx);
764 throw xx;
765 }
766 }
767
768 private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
769 {
770 try
771 {
772 if (listener == null)
773 return null;
774 LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
775 return listener.onPush(stream, pushInfo);
776 }
777 catch (Exception x)
778 {
779 LOG.info("Exception while notifying listener " + listener, x);
780 return null;
781 }
782 catch (Error x)
783 {
784 LOG.info("Exception while notifying listener " + listener, x);
785 throw x;
786 }
787 }
788
789 private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
790 {
791 try
792 {
793 if (listener == null)
794 return null;
795 LOG.debug("Invoking callback with {} on listener {}", synInfo, listener);
796 return listener.onSyn(stream, synInfo);
797 }
798 catch (Exception x)
799 {
800 LOG.info("Exception while notifying listener " + listener, x);
801 return null;
802 }
803 catch (Error x)
804 {
805 LOG.info("Exception while notifying listener " + listener, x);
806 throw x;
807 }
808 }
809
810 private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
811 {
812 try
813 {
814 if (listener != null)
815 {
816 LOG.debug("Invoking callback with {} on listener {}", rstInfo, listener);
817 listener.onRst(this, rstInfo);
818 }
819 }
820 catch (Exception x)
821 {
822 LOG.info("Exception while notifying listener " + listener, x);
823 }
824 catch (Error x)
825 {
826 LOG.info("Exception while notifying listener " + listener, x);
827 throw x;
828 }
829 }
830
831 private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
832 {
833 try
834 {
835 if (listener != null)
836 {
837 LOG.debug("Invoking callback with {} on listener {}", settingsInfo, listener);
838 listener.onSettings(this, settingsInfo);
839 }
840 }
841 catch (Exception x)
842 {
843 LOG.info("Exception while notifying listener " + listener, x);
844 }
845 catch (Error x)
846 {
847 LOG.info("Exception while notifying listener " + listener, x);
848 throw x;
849 }
850 }
851
852 private void notifyOnPing(SessionFrameListener listener, PingResultInfo pingResultInfo)
853 {
854 try
855 {
856 if (listener != null)
857 {
858 LOG.debug("Invoking callback with {} on listener {}", pingResultInfo, listener);
859 listener.onPing(this, pingResultInfo);
860 }
861 }
862 catch (Exception x)
863 {
864 LOG.info("Exception while notifying listener " + listener, x);
865 }
866 catch (Error x)
867 {
868 LOG.info("Exception while notifying listener " + listener, x);
869 throw x;
870 }
871 }
872
873 private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
874 {
875 try
876 {
877 if (listener != null)
878 {
879 LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
880 listener.onGoAway(this, goAwayResultInfo);
881 }
882 }
883 catch (Exception x)
884 {
885 LOG.info("Exception while notifying listener " + listener, x);
886 }
887 catch (Error x)
888 {
889 LOG.info("Exception while notifying listener " + listener, x);
890 throw x;
891 }
892 }
893
894
895 @Override
896 public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
897 {
898 generateAndEnqueueControlFrame(stream, frame, timeout, unit, callback);
899 flush();
900 }
901
902 private void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
903 {
904 try
905 {
906
907
908
909 synchronized (this)
910 {
911 ByteBuffer buffer = generator.control(frame);
912 LOG.debug("Queuing {} on {}", frame, stream);
913 ControlFrameBytes frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
914 if (timeout > 0)
915 frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
916
917
918 if (ControlFrameType.PING == frame.getType())
919 prepend(frameBytes);
920 else
921 append(frameBytes);
922 }
923 }
924 catch (Exception x)
925 {
926 notifyCallbackFailed(callback, x);
927 }
928 }
929
930 private void updateLastStreamId(IStream stream)
931 {
932 int streamId = stream.getId();
933 if (streamId % 2 != streamIds.get() % 2)
934 Atomics.updateMax(lastStreamId, streamId);
935 }
936
937 @Override
938 public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback callback)
939 {
940 LOG.debug("Queuing {} on {}", dataInfo, stream);
941 DataFrameBytes frameBytes = new DataFrameBytes(stream, callback, dataInfo);
942 if (timeout > 0)
943 frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
944 append(frameBytes);
945 flush();
946 }
947
948 @Override
949 public void shutdown()
950 {
951 FrameBytes frameBytes = new CloseFrameBytes();
952 append(frameBytes);
953 flush();
954 }
955
956 private void execute(Runnable task)
957 {
958 threadPool.execute(task);
959 }
960
961 @Override
962 public void flush()
963 {
964 FrameBytes frameBytes = null;
965 ByteBuffer buffer = null;
966 synchronized (queue)
967 {
968 if (flushing || queue.isEmpty())
969 return;
970
971 Set<IStream> stalledStreams = null;
972 for (int i = 0; i < queue.size(); ++i)
973 {
974 frameBytes = queue.get(i);
975
976 IStream stream = frameBytes.getStream();
977 if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
978 continue;
979
980 buffer = frameBytes.getByteBuffer();
981 if (buffer != null)
982 {
983 queue.remove(i);
984 if (stream != null && stream.isReset() && !(frameBytes instanceof ControlFrameBytes))
985 {
986 frameBytes.fail(new StreamException(stream.getId(), StreamStatus.INVALID_STREAM,
987 "Stream: " + stream + " is reset!"));
988 return;
989 }
990 break;
991 }
992
993 if (stalledStreams == null)
994 stalledStreams = new HashSet<>();
995 if (stream != null)
996 stalledStreams.add(stream);
997
998 LOG.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
999 }
1000
1001 if (buffer == null)
1002 return;
1003
1004 flushing = true;
1005 LOG.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size());
1006 }
1007 write(buffer, frameBytes);
1008 }
1009
1010 private void append(FrameBytes frameBytes)
1011 {
1012 Throwable failure;
1013 synchronized (queue)
1014 {
1015 failure = this.failure;
1016 if (failure == null)
1017 {
1018
1019
1020 if (frameBytes instanceof ControlFrameBytes)
1021 queue.addLast(frameBytes);
1022 else
1023 {
1024 int index = queue.size();
1025 while (index > 0)
1026 {
1027 FrameBytes element = queue.get(index - 1);
1028 if (element.compareTo(frameBytes) >= 0)
1029 break;
1030 --index;
1031 }
1032 queue.add(index, frameBytes);
1033 }
1034 }
1035 }
1036
1037 if (failure != null)
1038 frameBytes.fail(new SPDYException(failure));
1039 }
1040
1041 private void prepend(FrameBytes frameBytes)
1042 {
1043 Throwable failure;
1044 synchronized (queue)
1045 {
1046 failure = this.failure;
1047 if (failure == null)
1048 {
1049 int index = 0;
1050 while (index < queue.size())
1051 {
1052 FrameBytes element = queue.get(index);
1053 if (element.compareTo(frameBytes) <= 0)
1054 break;
1055 ++index;
1056 }
1057 queue.add(index, frameBytes);
1058 }
1059 }
1060
1061 if (failure != null)
1062 frameBytes.fail(new SPDYException(failure));
1063 }
1064
1065 protected void write(ByteBuffer buffer, Callback callback)
1066 {
1067 if (controller != null)
1068 {
1069 LOG.debug("Writing {} frame bytes of {}", buffer.remaining(), buffer.limit());
1070 controller.write(buffer, callback);
1071 }
1072 }
1073
1074 private void complete(final Callback callback)
1075 {
1076
1077
1078
1079
1080 invoker.invoke(callback);
1081 }
1082
1083 private void notifyCallbackFailed(Callback callback, Throwable x)
1084 {
1085 try
1086 {
1087 if (callback != null)
1088 callback.failed(x);
1089 }
1090 catch (Exception xx)
1091 {
1092 LOG.info("Exception while notifying callback " + callback, xx);
1093 }
1094 catch (Error xx)
1095 {
1096 LOG.info("Exception while notifying callback " + callback, xx);
1097 throw xx;
1098 }
1099 }
1100
1101 public int getWindowSize()
1102 {
1103 return flowControlStrategy.getWindowSize(this);
1104 }
1105
1106 public void setWindowSize(int initialWindowSize)
1107 {
1108 flowControlStrategy.setWindowSize(this, initialWindowSize);
1109 }
1110
1111 @Override
1112 public String toString()
1113 {
1114 return String.format("%s@%x{v%d,queueSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(),
1115 hashCode(), version, queue.size(), getWindowSize(), streams.size());
1116 }
1117
1118 @Override
1119 public String dump()
1120 {
1121 return ContainerLifeCycle.dump(this);
1122 }
1123
1124 @Override
1125 public void dump(Appendable out, String indent) throws IOException
1126 {
1127 ContainerLifeCycle.dumpObject(out, this);
1128 ContainerLifeCycle.dump(out, indent, Collections.singletonList(controller), streams.values());
1129 }
1130
1131 private class SessionInvoker extends ForkInvoker<Callback>
1132 {
1133 private SessionInvoker()
1134 {
1135 super(4);
1136 }
1137
1138 @Override
1139 public void fork(final Callback callback)
1140 {
1141 execute(new Runnable()
1142 {
1143 @Override
1144 public void run()
1145 {
1146 callback.succeeded();
1147 flush();
1148 }
1149 });
1150 }
1151
1152 @Override
1153 public void call(Callback callback)
1154 {
1155 callback.succeeded();
1156 flush();
1157 }
1158 }
1159
1160 public interface FrameBytes extends Comparable<FrameBytes>, Callback
1161 {
1162 public IStream getStream();
1163
1164 public abstract ByteBuffer getByteBuffer();
1165
1166 public abstract void complete();
1167
1168 public abstract void fail(Throwable throwable);
1169 }
1170
1171 private abstract class AbstractFrameBytes implements FrameBytes, Runnable
1172 {
1173 private final IStream stream;
1174 private final Callback callback;
1175 protected volatile Scheduler.Task task;
1176
1177 protected AbstractFrameBytes(IStream stream, Callback callback)
1178 {
1179 this.stream = stream;
1180 this.callback = Objects.requireNonNull(callback);
1181 }
1182
1183 @Override
1184 public IStream getStream()
1185 {
1186 return stream;
1187 }
1188
1189 @Override
1190 public int compareTo(FrameBytes that)
1191 {
1192
1193
1194 IStream thisStream = getStream();
1195 IStream thatStream = that.getStream();
1196 if (thisStream == null)
1197 return thatStream == null ? 0 : -1;
1198 if (thatStream == null)
1199 return 1;
1200
1201 return thatStream.getPriority() - thisStream.getPriority();
1202 }
1203
1204 @Override
1205 public void complete()
1206 {
1207 cancelTask();
1208 StandardSession.this.complete(callback);
1209 }
1210
1211 @Override
1212 public void fail(Throwable x)
1213 {
1214 cancelTask();
1215 notifyCallbackFailed(callback, x);
1216 StandardSession.this.flush();
1217 }
1218
1219 private void cancelTask()
1220 {
1221 Scheduler.Task task = this.task;
1222 if (task != null)
1223 task.cancel();
1224 }
1225
1226 @Override
1227 public void run()
1228 {
1229 close();
1230 fail(new InterruptedByTimeoutException());
1231 }
1232
1233 @Override
1234 public void succeeded()
1235 {
1236 synchronized (queue)
1237 {
1238 if (LOG.isDebugEnabled())
1239 LOG.debug("Completed write of {}, {} frame(s) in queue", this, queue.size());
1240 flushing = false;
1241 }
1242 complete();
1243 }
1244
1245 @Override
1246 public void failed(Throwable x)
1247 {
1248 List<FrameBytes> frameBytesToFail = new ArrayList<>();
1249 frameBytesToFail.add(this);
1250
1251 synchronized (queue)
1252 {
1253 failure = x;
1254 if (LOG.isDebugEnabled())
1255 {
1256 String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
1257 LOG.debug(logMessage, x);
1258 }
1259 frameBytesToFail.addAll(queue);
1260 queue.clear();
1261 flushing = false;
1262 }
1263
1264 for (FrameBytes fb : frameBytesToFail)
1265 fb.fail(x);
1266 }
1267 }
1268
1269 private class ControlFrameBytes extends AbstractFrameBytes
1270 {
1271 private final ControlFrame frame;
1272 private final ByteBuffer buffer;
1273
1274 private ControlFrameBytes(IStream stream, Callback callback, ControlFrame frame, ByteBuffer buffer)
1275 {
1276 super(stream, callback);
1277 this.frame = frame;
1278 this.buffer = buffer;
1279 }
1280
1281 @Override
1282 public ByteBuffer getByteBuffer()
1283 {
1284 return buffer;
1285 }
1286
1287 @Override
1288 public void complete()
1289 {
1290 bufferPool.release(buffer);
1291
1292 super.complete();
1293
1294 if (frame.getType() == ControlFrameType.GO_AWAY)
1295 {
1296
1297
1298 close();
1299 }
1300 IStream stream = getStream();
1301 if (stream != null && stream.isClosed())
1302 removeStream(stream);
1303 }
1304
1305 @Override
1306 public String toString()
1307 {
1308 return frame.toString();
1309 }
1310 }
1311
1312 private class DataFrameBytes extends AbstractFrameBytes
1313 {
1314 private final DataInfo dataInfo;
1315 private int size;
1316 private volatile ByteBuffer buffer;
1317
1318 private DataFrameBytes(IStream stream, Callback handler, DataInfo dataInfo)
1319 {
1320 super(stream, handler);
1321 this.dataInfo = dataInfo;
1322 }
1323
1324 @Override
1325 public ByteBuffer getByteBuffer()
1326 {
1327 try
1328 {
1329 IStream stream = getStream();
1330 int windowSize = stream.getWindowSize();
1331 if (windowSize <= 0)
1332 return null;
1333
1334 size = dataInfo.available();
1335 if (size > windowSize)
1336 size = windowSize;
1337
1338 buffer = generator.data(stream.getId(), size, dataInfo);
1339 return buffer;
1340 }
1341 catch (Throwable x)
1342 {
1343 fail(x);
1344 return null;
1345 }
1346 }
1347
1348 @Override
1349 public void complete()
1350 {
1351 bufferPool.release(buffer);
1352 IStream stream = getStream();
1353 flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1354 if (dataInfo.available() > 0)
1355 {
1356
1357
1358
1359 prepend(this);
1360 flush();
1361 }
1362 else
1363 {
1364 super.complete();
1365 stream.updateCloseState(dataInfo.isClose(), true);
1366 if (stream.isClosed())
1367 removeStream(stream);
1368 }
1369 }
1370
1371 @Override
1372 public String toString()
1373 {
1374 return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1375 }
1376 }
1377
1378 private class CloseFrameBytes extends AbstractFrameBytes
1379 {
1380 private CloseFrameBytes()
1381 {
1382 super(null, new Callback.Adapter());
1383 }
1384
1385 @Override
1386 public ByteBuffer getByteBuffer()
1387 {
1388 return BufferUtil.EMPTY_BUFFER;
1389 }
1390
1391 @Override
1392 public void complete()
1393 {
1394 super.complete();
1395 close();
1396 }
1397 }
1398
1399 private static class PingInfoCallback extends PingResultInfo implements Callback
1400 {
1401 private final Promise<PingResultInfo> promise;
1402
1403 public PingInfoCallback(int pingId, Promise<PingResultInfo> promise)
1404 {
1405 super(pingId);
1406 this.promise = promise;
1407 }
1408
1409 @Override
1410 public void succeeded()
1411 {
1412 if (promise != null)
1413 promise.succeeded(this);
1414 }
1415
1416 @Override
1417 public void failed(Throwable x)
1418 {
1419 if (promise != null)
1420 promise.failed(x);
1421 }
1422 }
1423 }