1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.InterruptedByTimeoutException;
25 import java.util.Collections;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Objects;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentMap;
33 import java.util.concurrent.CopyOnWriteArrayList;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40 import org.eclipse.jetty.io.ByteBufferPool;
41 import org.eclipse.jetty.io.EndPoint;
42 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
43 import org.eclipse.jetty.spdy.api.DataInfo;
44 import org.eclipse.jetty.spdy.api.GoAwayInfo;
45 import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
46 import org.eclipse.jetty.spdy.api.PingInfo;
47 import org.eclipse.jetty.spdy.api.PingResultInfo;
48 import org.eclipse.jetty.spdy.api.PushInfo;
49 import org.eclipse.jetty.spdy.api.RstInfo;
50 import org.eclipse.jetty.spdy.api.SPDYException;
51 import org.eclipse.jetty.spdy.api.Session;
52 import org.eclipse.jetty.spdy.api.SessionFrameListener;
53 import org.eclipse.jetty.spdy.api.SessionStatus;
54 import org.eclipse.jetty.spdy.api.Settings;
55 import org.eclipse.jetty.spdy.api.SettingsInfo;
56 import org.eclipse.jetty.spdy.api.Stream;
57 import org.eclipse.jetty.spdy.api.StreamFrameListener;
58 import org.eclipse.jetty.spdy.api.StreamStatus;
59 import org.eclipse.jetty.spdy.api.SynInfo;
60 import org.eclipse.jetty.spdy.frames.ControlFrame;
61 import org.eclipse.jetty.spdy.frames.ControlFrameType;
62 import org.eclipse.jetty.spdy.frames.CredentialFrame;
63 import org.eclipse.jetty.spdy.frames.DataFrame;
64 import org.eclipse.jetty.spdy.frames.GoAwayFrame;
65 import org.eclipse.jetty.spdy.frames.HeadersFrame;
66 import org.eclipse.jetty.spdy.frames.PingFrame;
67 import org.eclipse.jetty.spdy.frames.RstStreamFrame;
68 import org.eclipse.jetty.spdy.frames.SettingsFrame;
69 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
70 import org.eclipse.jetty.spdy.frames.SynStreamFrame;
71 import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
72 import org.eclipse.jetty.spdy.generator.Generator;
73 import org.eclipse.jetty.spdy.parser.Parser;
74 import org.eclipse.jetty.util.Atomics;
75 import org.eclipse.jetty.util.BufferUtil;
76 import org.eclipse.jetty.util.Callback;
77 import org.eclipse.jetty.util.FutureCallback;
78 import org.eclipse.jetty.util.FuturePromise;
79 import org.eclipse.jetty.util.Promise;
80 import org.eclipse.jetty.util.component.ContainerLifeCycle;
81 import org.eclipse.jetty.util.component.Dumpable;
82 import org.eclipse.jetty.util.log.Log;
83 import org.eclipse.jetty.util.log.Logger;
84 import org.eclipse.jetty.util.thread.Scheduler;
85
86 public class StandardSession implements ISession, Parser.Listener, Dumpable
87 {
88 private static final Logger LOG = Log.getLogger(Session.class);
89
90 private final Flusher flusher;
91 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
92 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
93 private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
94 private final ByteBufferPool bufferPool;
95 private final Scheduler scheduler;
96 private final short version;
97 private final Controller controller;
98 private final EndPoint endPoint;
99 private final IdleListener idleListener;
100 private final AtomicInteger streamIds;
101 private final AtomicInteger pingIds;
102 private final SessionFrameListener listener;
103 private final Generator generator;
104 private final AtomicBoolean goAwaySent = new AtomicBoolean();
105 private final AtomicBoolean goAwayReceived = new AtomicBoolean();
106 private final AtomicInteger lastStreamId = new AtomicInteger();
107 private final AtomicInteger localStreamCount = new AtomicInteger(0);
108 private final FlowControlStrategy flowControlStrategy;
109 private volatile int maxConcurrentLocalStreams = -1;
110
111 public StandardSession(short version, ByteBufferPool bufferPool, Scheduler scheduler,
112 Controller controller, EndPoint endPoint, IdleListener idleListener, int initialStreamId,
113 SessionFrameListener listener, Generator generator, FlowControlStrategy flowControlStrategy)
114 {
115
116 this.version = version;
117 this.bufferPool = bufferPool;
118 this.scheduler = scheduler;
119 this.controller = controller;
120 this.endPoint = endPoint;
121 this.idleListener = idleListener;
122 this.streamIds = new AtomicInteger(initialStreamId);
123 this.pingIds = new AtomicInteger(initialStreamId);
124 this.listener = listener;
125 this.generator = generator;
126 this.flowControlStrategy = flowControlStrategy;
127 this.flusher = new Flusher(controller);
128 }
129
130 @Override
131 public short getVersion()
132 {
133 return version;
134 }
135
136 @Override
137 public void addListener(Listener listener)
138 {
139 listeners.add(listener);
140 }
141
142 @Override
143 public void removeListener(Listener listener)
144 {
145 listeners.remove(listener);
146 }
147
148 @Override
149 public Stream syn(SynInfo synInfo, StreamFrameListener listener) throws ExecutionException, InterruptedException, TimeoutException
150 {
151 FuturePromise<Stream> result = new FuturePromise<>();
152 syn(synInfo, listener, result);
153 if (synInfo.getTimeout() > 0)
154 return result.get(synInfo.getTimeout(), synInfo.getUnit());
155 else
156 return result.get();
157 }
158
159 @Override
160 public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise)
161 {
162
163
164
165
166
167
168 int associatedStreamId = 0;
169 if (synInfo instanceof PushSynInfo)
170 associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
171
172 synchronized (this)
173 {
174 int streamId = streamIds.getAndAdd(2);
175
176 SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
177 IStream stream = createStream(synStream, listener, true, promise);
178 if (stream == null)
179 return;
180 generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
181 }
182 }
183
184 @Override
185 public void rst(RstInfo rstInfo) throws InterruptedException, ExecutionException, TimeoutException
186 {
187 FutureCallback result = new FutureCallback();
188 rst(rstInfo, result);
189 if (rstInfo.getTimeout() > 0)
190 result.get(rstInfo.getTimeout(), rstInfo.getUnit());
191 else
192 result.get();
193 }
194
195 @Override
196 public void rst(RstInfo rstInfo, Callback callback)
197 {
198
199 if (goAwaySent.get())
200 {
201 complete(callback);
202 }
203 else
204 {
205 int streamId = rstInfo.getStreamId();
206 IStream stream = streams.get(streamId);
207 RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
208 control(stream, frame, rstInfo.getTimeout(), rstInfo.getUnit(), callback);
209 if (stream != null)
210 {
211 stream.process(frame);
212 flusher.removeFrameBytesFromQueue(stream);
213 removeStream(stream);
214 }
215 }
216 }
217
218 @Override
219 public void settings(SettingsInfo settingsInfo) throws ExecutionException, InterruptedException, TimeoutException
220 {
221 FutureCallback result = new FutureCallback();
222 settings(settingsInfo, result);
223 if (settingsInfo.getTimeout() > 0)
224 result.get(settingsInfo.getTimeout(), settingsInfo.getUnit());
225 else
226 result.get();
227 }
228
229 @Override
230 public void settings(SettingsInfo settingsInfo, Callback callback)
231 {
232 SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
233 control(null, frame, settingsInfo.getTimeout(), settingsInfo.getUnit(), callback);
234 }
235
236 @Override
237 public PingResultInfo ping(PingInfo pingInfo) throws ExecutionException, InterruptedException, TimeoutException
238 {
239 FuturePromise<PingResultInfo> result = new FuturePromise<>();
240 ping(pingInfo, result);
241 if (pingInfo.getTimeout() > 0)
242 return result.get(pingInfo.getTimeout(), pingInfo.getUnit());
243 else
244 return result.get();
245 }
246
247 @Override
248 public void ping(PingInfo pingInfo, Promise<PingResultInfo> promise)
249 {
250 int pingId = pingIds.getAndAdd(2);
251 PingInfoCallback pingInfoCallback = new PingInfoCallback(pingId, promise);
252 PingFrame frame = new PingFrame(version, pingId);
253 control(null, frame, pingInfo.getTimeout(), pingInfo.getUnit(), pingInfoCallback);
254 }
255
256 @Override
257 public void goAway(GoAwayInfo goAwayInfo) throws ExecutionException, InterruptedException, TimeoutException
258 {
259 goAway(goAwayInfo, SessionStatus.OK);
260 }
261
262 private void goAway(GoAwayInfo goAwayInfo, SessionStatus sessionStatus) throws ExecutionException, InterruptedException, TimeoutException
263 {
264 FutureCallback result = new FutureCallback();
265 goAway(sessionStatus, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), result);
266 if (goAwayInfo.getTimeout() > 0)
267 result.get(goAwayInfo.getTimeout(), goAwayInfo.getUnit());
268 else
269 result.get();
270 }
271
272 @Override
273 public void goAway(GoAwayInfo goAwayInfo, Callback callback)
274 {
275 goAway(SessionStatus.OK, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), callback);
276 }
277
278 private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback callback)
279 {
280 if (goAwaySent.compareAndSet(false, true))
281 {
282 if (!goAwayReceived.get())
283 {
284 GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode());
285 control(null, frame, timeout, unit, callback);
286 return;
287 }
288 }
289 complete(callback);
290 }
291
292 @Override
293 public Set<Stream> getStreams()
294 {
295 Set<Stream> result = new HashSet<>();
296 result.addAll(streams.values());
297 return result;
298 }
299
300 @Override
301 public IStream getStream(int streamId)
302 {
303 return streams.get(streamId);
304 }
305
306 @Override
307 public Object getAttribute(String key)
308 {
309 return attributes.get(key);
310 }
311
312 @Override
313 public void setAttribute(String key, Object value)
314 {
315 attributes.put(key, value);
316 }
317
318 @Override
319 public Object removeAttribute(String key)
320 {
321 return attributes.remove(key);
322 }
323
324 @Override
325 public InetSocketAddress getLocalAddress()
326 {
327 return endPoint.getLocalAddress();
328 }
329
330 @Override
331 public InetSocketAddress getRemoteAddress()
332 {
333 return endPoint.getRemoteAddress();
334 }
335
336 @Override
337 public void onControlFrame(ControlFrame frame)
338 {
339 notifyIdle(idleListener, false);
340 try
341 {
342 if (LOG.isDebugEnabled())
343 LOG.debug("Processing {}", frame);
344
345 if (goAwaySent.get())
346 {
347 if (LOG.isDebugEnabled())
348 LOG.debug("Skipped processing of {}", frame);
349 return;
350 }
351
352 switch (frame.getType())
353 {
354 case SYN_STREAM:
355 {
356 onSyn((SynStreamFrame)frame);
357 break;
358 }
359 case SYN_REPLY:
360 {
361 onReply((SynReplyFrame)frame);
362 break;
363 }
364 case RST_STREAM:
365 {
366 onRst((RstStreamFrame)frame);
367 break;
368 }
369 case SETTINGS:
370 {
371 onSettings((SettingsFrame)frame);
372 break;
373 }
374 case NOOP:
375 {
376
377 break;
378 }
379 case PING:
380 {
381 onPing((PingFrame)frame);
382 break;
383 }
384 case GO_AWAY:
385 {
386 onGoAway((GoAwayFrame)frame);
387 break;
388 }
389 case HEADERS:
390 {
391 onHeaders((HeadersFrame)frame);
392 break;
393 }
394 case WINDOW_UPDATE:
395 {
396 onWindowUpdate((WindowUpdateFrame)frame);
397 break;
398 }
399 case CREDENTIAL:
400 {
401 onCredential((CredentialFrame)frame);
402 break;
403 }
404 default:
405 {
406 throw new IllegalStateException();
407 }
408 }
409 }
410 finally
411 {
412 notifyIdle(idleListener, true);
413 }
414 }
415
416 @Override
417 public void onDataFrame(DataFrame frame, ByteBuffer data)
418 {
419 notifyIdle(idleListener, false);
420 try
421 {
422 if (LOG.isDebugEnabled())
423 LOG.debug("Processing {}, {} data bytes", frame, data.remaining());
424
425 if (goAwaySent.get())
426 {
427 if (LOG.isDebugEnabled())
428 LOG.debug("Skipped processing of {}", frame);
429 return;
430 }
431
432 int streamId = frame.getStreamId();
433 IStream stream = streams.get(streamId);
434 if (stream == null)
435 {
436 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
437 if (LOG.isDebugEnabled())
438 LOG.debug("Unknown stream {}", rstInfo);
439 rst(rstInfo, Callback.Adapter.INSTANCE);
440 }
441 else
442 {
443 processData(stream, frame, data);
444 }
445 }
446 finally
447 {
448 notifyIdle(idleListener, true);
449 }
450 }
451
452 private void notifyIdle(IdleListener listener, boolean idle)
453 {
454 if (listener != null)
455 listener.onIdle(idle);
456 }
457
458 private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
459 {
460 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose())
461 {
462 @Override
463 public void consume(int delta)
464 {
465 super.consume(delta);
466 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
467 }
468 };
469 flowControlStrategy.onDataReceived(this, stream, dataInfo);
470 stream.process(dataInfo);
471 if (stream.isClosed())
472 removeStream(stream);
473 }
474
475 @Override
476 public void onStreamException(StreamException x)
477 {
478 notifyOnFailure(listener, x);
479 rst(new RstInfo(x.getStreamId(), x.getStreamStatus()), Callback.Adapter.INSTANCE);
480 }
481
482 @Override
483 public void onSessionException(SessionException x)
484 {
485 Throwable cause = x.getCause();
486 notifyOnFailure(listener, cause == null ? x : cause);
487 goAway(x.getSessionStatus(), 0, TimeUnit.SECONDS, Callback.Adapter.INSTANCE);
488 }
489
490 private void onSyn(final SynStreamFrame frame)
491 {
492 IStream stream = createStream(frame, null, false, new Promise.Adapter<Stream>()
493 {
494 @Override
495 public void failed(Throwable x)
496 {
497 LOG.debug("Received: {} but creating new Stream failed: {}", frame, x.getMessage());
498 }
499 });
500 if (stream != null)
501 processSyn(listener, stream, frame);
502 }
503
504 private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
505 {
506 stream.process(frame);
507
508 updateLastStreamId(stream);
509 StreamFrameListener streamListener;
510 if (stream.isUnidirectional())
511 {
512 PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
513 streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
514 }
515 else
516 {
517 SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
518 streamListener = notifyOnSyn(listener, stream, synInfo);
519 }
520 stream.setStreamFrameListener(streamListener);
521
522 if (stream.isClosed())
523 removeStream(stream);
524 }
525
526 private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
527 {
528 IStream associatedStream = streams.get(frame.getAssociatedStreamId());
529 IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream,
530 scheduler, promise);
531 stream.setIdleTimeout(endPoint.getIdleTimeout());
532 flowControlStrategy.onNewStream(this, stream);
533
534 stream.updateCloseState(frame.isClose(), local);
535 stream.setStreamFrameListener(listener);
536
537 if (stream.isUnidirectional())
538 {
539
540 stream.updateCloseState(true, !local);
541 if (!stream.isClosed())
542 stream.getAssociatedStream().associate(stream);
543 }
544
545 int streamId = stream.getId();
546
547 if (local)
548 {
549 while (true)
550 {
551 int oldStreamCountValue = localStreamCount.get();
552 int maxConcurrentStreams = maxConcurrentLocalStreams;
553 if (maxConcurrentStreams > -1 && oldStreamCountValue >= maxConcurrentStreams)
554 {
555 String message = String.format("Max concurrent local streams (%d) exceeded.",
556 maxConcurrentStreams);
557 LOG.debug(message);
558 promise.failed(new SPDYException(message));
559 return null;
560 }
561 if (localStreamCount.compareAndSet(oldStreamCountValue, oldStreamCountValue + 1))
562 break;
563 }
564 }
565
566 if (streams.putIfAbsent(streamId, stream) != null)
567 {
568 String message = "Duplicate stream id " + streamId;
569 IllegalStateException duplicateIdException = new IllegalStateException(message);
570 promise.failed(duplicateIdException);
571 if (local)
572 {
573 localStreamCount.decrementAndGet();
574 throw duplicateIdException;
575 }
576 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
577 if (LOG.isDebugEnabled())
578 LOG.debug("Duplicate stream, {}", rstInfo);
579 rst(rstInfo, Callback.Adapter.INSTANCE);
580 return null;
581 }
582 else
583 {
584 if (LOG.isDebugEnabled())
585 LOG.debug("Created {}", stream);
586 notifyStreamCreated(stream);
587 return stream;
588 }
589 }
590
591 private void notifyStreamCreated(IStream stream)
592 {
593 for (Listener listener : listeners)
594 {
595 if (listener instanceof StreamListener)
596 {
597 try
598 {
599 ((StreamListener)listener).onStreamCreated(stream);
600 }
601 catch (Exception x)
602 {
603 LOG.info("Exception while notifying listener " + listener, x);
604 }
605 catch (Error x)
606 {
607 LOG.info("Exception while notifying listener " + listener, x);
608 throw x;
609 }
610 }
611 }
612 }
613
614 private void removeStream(IStream stream)
615 {
616 if (stream.isUnidirectional())
617 stream.getAssociatedStream().disassociate(stream);
618
619 IStream removed = streams.remove(stream.getId());
620 if (removed != null)
621 {
622 assert removed == stream;
623
624 if (streamIds.get() % 2 == stream.getId() % 2)
625 localStreamCount.decrementAndGet();
626
627 if (LOG.isDebugEnabled())
628 LOG.debug("Removed {}", stream);
629 notifyStreamClosed(stream);
630 }
631 }
632
633 private void notifyStreamClosed(IStream stream)
634 {
635 for (Listener listener : listeners)
636 {
637 if (listener instanceof StreamListener)
638 {
639 try
640 {
641 ((StreamListener)listener).onStreamClosed(stream);
642 }
643 catch (Exception x)
644 {
645 LOG.info("Exception while notifying listener " + listener, x);
646 }
647 catch (Error x)
648 {
649 LOG.info("Exception while notifying listener " + listener, x);
650 throw x;
651 }
652 }
653 }
654 }
655
656 private void onReply(SynReplyFrame frame)
657 {
658 int streamId = frame.getStreamId();
659 IStream stream = streams.get(streamId);
660 if (stream == null)
661 {
662 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
663 if (LOG.isDebugEnabled())
664 LOG.debug("Unknown stream {}", rstInfo);
665 rst(rstInfo, Callback.Adapter.INSTANCE);
666 }
667 else
668 {
669 processReply(stream, frame);
670 }
671 }
672
673 private void processReply(IStream stream, SynReplyFrame frame)
674 {
675 stream.process(frame);
676 if (stream.isClosed())
677 removeStream(stream);
678 }
679
680 private void onRst(RstStreamFrame frame)
681 {
682 IStream stream = streams.get(frame.getStreamId());
683
684 if (stream != null)
685 stream.process(frame);
686
687 RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode()));
688 notifyOnRst(listener, rstInfo);
689
690 if (stream != null)
691 removeStream(stream);
692 }
693
694 private void onSettings(SettingsFrame frame)
695 {
696 Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
697 if (windowSizeSetting != null)
698 {
699 int windowSize = windowSizeSetting.value();
700 setWindowSize(windowSize);
701 if (LOG.isDebugEnabled())
702 LOG.debug("Updated session window size to {}", windowSize);
703 }
704 Settings.Setting maxConcurrentStreamsSetting = frame.getSettings().get(Settings.ID.MAX_CONCURRENT_STREAMS);
705 if (maxConcurrentStreamsSetting != null)
706 {
707 int maxConcurrentStreamsValue = maxConcurrentStreamsSetting.value();
708 maxConcurrentLocalStreams = maxConcurrentStreamsValue;
709 if (LOG.isDebugEnabled())
710 LOG.debug("Updated session maxConcurrentLocalStreams to {}", maxConcurrentStreamsValue);
711 }
712 SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
713 notifyOnSettings(listener, settingsInfo);
714 }
715
716 private void onPing(PingFrame frame)
717 {
718 int pingId = frame.getPingId();
719 if (pingId % 2 == pingIds.get() % 2)
720 {
721 PingResultInfo pingResultInfo = new PingResultInfo(frame.getPingId());
722 notifyOnPing(listener, pingResultInfo);
723 }
724 else
725 {
726 control(null, frame, 0, TimeUnit.MILLISECONDS, Callback.Adapter.INSTANCE);
727 }
728 }
729
730 private void onGoAway(GoAwayFrame frame)
731 {
732 if (goAwayReceived.compareAndSet(false, true))
733 {
734 GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
735 notifyOnGoAway(listener, goAwayResultInfo);
736
737
738
739 }
740 }
741
742 private void onHeaders(HeadersFrame frame)
743 {
744 int streamId = frame.getStreamId();
745 IStream stream = streams.get(streamId);
746 if (stream == null)
747 {
748 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
749 if (LOG.isDebugEnabled())
750 LOG.debug("Unknown stream, {}", rstInfo);
751 rst(rstInfo, Callback.Adapter.INSTANCE);
752 }
753 else
754 {
755 processHeaders(stream, frame);
756 }
757 }
758
759 private void processHeaders(IStream stream, HeadersFrame frame)
760 {
761 stream.process(frame);
762 if (stream.isClosed())
763 removeStream(stream);
764 }
765
766 private void onWindowUpdate(WindowUpdateFrame frame)
767 {
768 int streamId = frame.getStreamId();
769 IStream stream = streams.get(streamId);
770 flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
771 flusher.flush();
772 }
773
774 private void onCredential(CredentialFrame frame)
775 {
776 LOG.warn("{} frame not yet supported", frame.getType());
777 }
778
779 protected void close()
780 {
781
782 if (controller != null)
783 controller.close(false);
784 }
785
786 private void notifyOnFailure(SessionFrameListener listener, Throwable x)
787 {
788 try
789 {
790 if (listener != null)
791 {
792 if (LOG.isDebugEnabled())
793 LOG.debug("Invoking callback with {} on listener {}", x, listener);
794 listener.onFailure(this, x);
795 }
796 }
797 catch (Exception xx)
798 {
799 LOG.info("Exception while notifying listener " + listener, xx);
800 }
801 catch (Error xx)
802 {
803 LOG.info("Exception while notifying listener " + listener, xx);
804 throw xx;
805 }
806 }
807
808 private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
809 {
810 try
811 {
812 if (listener == null)
813 return null;
814 if (LOG.isDebugEnabled())
815 LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
816 return listener.onPush(stream, pushInfo);
817 }
818 catch (Exception x)
819 {
820 LOG.info("Exception while notifying listener " + listener, x);
821 return null;
822 }
823 catch (Error x)
824 {
825 LOG.info("Exception while notifying listener " + listener, x);
826 throw x;
827 }
828 }
829
830 private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
831 {
832 try
833 {
834 if (listener == null)
835 return null;
836 if (LOG.isDebugEnabled())
837 LOG.debug("Invoking callback with {} on listener {}", synInfo, listener);
838 return listener.onSyn(stream, synInfo);
839 }
840 catch (Exception x)
841 {
842 LOG.info("Exception while notifying listener " + listener, x);
843 return null;
844 }
845 catch (Error x)
846 {
847 LOG.info("Exception while notifying listener " + listener, x);
848 throw x;
849 }
850 }
851
852 private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
853 {
854 try
855 {
856 if (listener != null)
857 {
858 if (LOG.isDebugEnabled())
859 LOG.debug("Invoking callback with {} on listener {}", rstInfo, listener);
860 listener.onRst(this, rstInfo);
861 }
862 }
863 catch (Exception x)
864 {
865 LOG.info("Exception while notifying listener " + listener, x);
866 }
867 catch (Error x)
868 {
869 LOG.info("Exception while notifying listener " + listener, x);
870 throw x;
871 }
872 }
873
874 private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
875 {
876 try
877 {
878 if (listener != null)
879 {
880 if (LOG.isDebugEnabled())
881 LOG.debug("Invoking callback with {} on listener {}", settingsInfo, listener);
882 listener.onSettings(this, settingsInfo);
883 }
884 }
885 catch (Exception x)
886 {
887 LOG.info("Exception while notifying listener " + listener, x);
888 }
889 catch (Error x)
890 {
891 LOG.info("Exception while notifying listener " + listener, x);
892 throw x;
893 }
894 }
895
896 private void notifyOnPing(SessionFrameListener listener, PingResultInfo pingResultInfo)
897 {
898 try
899 {
900 if (listener != null)
901 {
902 if (LOG.isDebugEnabled())
903 LOG.debug("Invoking callback with {} on listener {}", pingResultInfo, listener);
904 listener.onPing(this, pingResultInfo);
905 }
906 }
907 catch (Exception x)
908 {
909 LOG.info("Exception while notifying listener " + listener, x);
910 }
911 catch (Error x)
912 {
913 LOG.info("Exception while notifying listener " + listener, x);
914 throw x;
915 }
916 }
917
918 private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
919 {
920 try
921 {
922 if (listener != null)
923 {
924 if (LOG.isDebugEnabled())
925 LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
926 listener.onGoAway(this, goAwayResultInfo);
927 }
928 }
929 catch (Exception x)
930 {
931 LOG.info("Exception while notifying listener " + listener, x);
932 }
933 catch (Error x)
934 {
935 LOG.info("Exception while notifying listener " + listener, x);
936 throw x;
937 }
938 }
939
940 @Override
941 public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
942 {
943 generateAndEnqueueControlFrame(stream, frame, timeout, unit, callback);
944 }
945
946 private void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
947 {
948 try
949 {
950
951
952
953 ControlFrameBytes frameBytes;
954 Throwable throwable;
955 synchronized (this)
956 {
957 ByteBuffer buffer = generator.control(frame);
958 if (LOG.isDebugEnabled())
959 LOG.debug("Queuing {} on {}", frame, stream);
960 frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
961 if (timeout > 0)
962 frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
963
964
965 if (ControlFrameType.PING == frame.getType())
966 throwable = flusher.prepend(frameBytes);
967 else
968 throwable = flusher.append(frameBytes);
969 }
970
971 flush(frameBytes, throwable);
972 }
973 catch (Exception x)
974 {
975 notifyCallbackFailed(callback, x);
976 }
977 }
978
979 private void updateLastStreamId(IStream stream)
980 {
981 int streamId = stream.getId();
982 if (streamId % 2 != streamIds.get() % 2)
983 Atomics.updateMax(lastStreamId, streamId);
984 }
985
986 @Override
987 public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback callback)
988 {
989 if (LOG.isDebugEnabled())
990 LOG.debug("Queuing {} on {}", dataInfo, stream);
991 DataFrameBytes frameBytes = new DataFrameBytes(stream, callback, dataInfo);
992 if (timeout > 0)
993 frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
994 flush(frameBytes, flusher.append(frameBytes));
995 }
996
997 @Override
998 public void shutdown()
999 {
1000 CloseFrameBytes frameBytes = new CloseFrameBytes();
1001 flush(frameBytes, flusher.append(frameBytes));
1002 }
1003
1004 private void flush(FrameBytes frameBytes, Throwable throwable)
1005 {
1006 if (throwable != null)
1007 frameBytes.failed(throwable);
1008 else
1009 flusher.flush();
1010 }
1011
1012 private void complete(final Callback callback)
1013 {
1014 try
1015 {
1016 if (callback != null)
1017 callback.succeeded();
1018 }
1019 catch (Throwable x)
1020 {
1021 LOG.info("Exception while notifying callback " + callback, x);
1022 }
1023 }
1024
1025 private void notifyCallbackFailed(Callback callback, Throwable failure)
1026 {
1027 try
1028 {
1029 if (callback != null)
1030 callback.failed(failure);
1031 }
1032 catch (Throwable x)
1033 {
1034 LOG.info("Exception while notifying callback " + callback, x);
1035 }
1036 }
1037
1038 public int getWindowSize()
1039 {
1040 return flowControlStrategy.getWindowSize(this);
1041 }
1042
1043 public void setWindowSize(int initialWindowSize)
1044 {
1045 flowControlStrategy.setWindowSize(this, initialWindowSize);
1046 }
1047
1048 @Override
1049 public String toString()
1050 {
1051 return String.format("%s@%x{v%d,queueSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(),
1052 hashCode(), version, flusher.getQueueSize(), getWindowSize(), streams.size());
1053 }
1054
1055 @Override
1056 public String dump()
1057 {
1058 return ContainerLifeCycle.dump(this);
1059 }
1060
1061 @Override
1062 public void dump(Appendable out, String indent) throws IOException
1063 {
1064 ContainerLifeCycle.dumpObject(out, this);
1065 ContainerLifeCycle.dump(out, indent, Collections.singletonList(controller), streams.values());
1066 }
1067
1068 public interface FrameBytes extends Comparable<FrameBytes>, Callback
1069 {
1070 public IStream getStream();
1071
1072 public abstract ByteBuffer getByteBuffer();
1073 }
1074
1075 abstract class AbstractFrameBytes implements FrameBytes, Runnable
1076 {
1077 private final IStream stream;
1078 private final Callback callback;
1079 protected volatile Scheduler.Task task;
1080
1081 protected AbstractFrameBytes(IStream stream, Callback callback)
1082 {
1083 this.stream = stream;
1084 this.callback = Objects.requireNonNull(callback);
1085 }
1086
1087 @Override
1088 public IStream getStream()
1089 {
1090 return stream;
1091 }
1092
1093 @Override
1094 public int compareTo(FrameBytes that)
1095 {
1096
1097
1098 IStream thisStream = getStream();
1099 IStream thatStream = that.getStream();
1100 if (thisStream == null)
1101 return thatStream == null ? 0 : -1;
1102 if (thatStream == null)
1103 return 1;
1104
1105 return thatStream.getPriority() - thisStream.getPriority();
1106 }
1107
1108 private void cancelTask()
1109 {
1110 Scheduler.Task task = this.task;
1111 if (task != null)
1112 task.cancel();
1113 }
1114
1115 @Override
1116 public void run()
1117 {
1118 close();
1119 failed(new InterruptedByTimeoutException());
1120 }
1121
1122 @Override
1123 public void succeeded()
1124 {
1125 cancelTask();
1126 StandardSession.this.complete(callback);
1127 }
1128
1129 @Override
1130 public void failed(Throwable x)
1131 {
1132 cancelTask();
1133 notifyCallbackFailed(callback, x);
1134 }
1135 }
1136
1137 protected class ControlFrameBytes extends AbstractFrameBytes
1138 {
1139 private final ControlFrame frame;
1140 private final ByteBuffer buffer;
1141
1142 private ControlFrameBytes(IStream stream, Callback callback, ControlFrame frame, ByteBuffer buffer)
1143 {
1144 super(stream, callback);
1145 this.frame = frame;
1146 this.buffer = buffer;
1147 }
1148
1149 @Override
1150 public ByteBuffer getByteBuffer()
1151 {
1152 return buffer;
1153 }
1154
1155 @Override
1156 public void succeeded()
1157 {
1158 bufferPool.release(buffer);
1159
1160 super.succeeded();
1161
1162 if (frame.getType() == ControlFrameType.GO_AWAY)
1163 {
1164
1165
1166 close();
1167 }
1168 IStream stream = getStream();
1169 if (stream != null && stream.isClosed())
1170 removeStream(stream);
1171 }
1172
1173 @Override
1174 public String toString()
1175 {
1176 return frame.toString();
1177 }
1178 }
1179
1180 protected class DataFrameBytes extends AbstractFrameBytes
1181 {
1182 private final DataInfo dataInfo;
1183 private int size;
1184 private volatile ByteBuffer buffer;
1185
1186 private DataFrameBytes(IStream stream, Callback handler, DataInfo dataInfo)
1187 {
1188 super(stream, handler);
1189 this.dataInfo = dataInfo;
1190 }
1191
1192 @Override
1193 public ByteBuffer getByteBuffer()
1194 {
1195 try
1196 {
1197 IStream stream = getStream();
1198 int windowSize = stream.getWindowSize();
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214 size = dataInfo.available();
1215 if (size > windowSize)
1216 size = windowSize;
1217
1218 buffer = generator.data(stream.getId(), size, dataInfo);
1219 return buffer;
1220 }
1221 catch (Throwable x)
1222 {
1223 failed(x);
1224 return null;
1225 }
1226 }
1227
1228 @Override
1229 public void succeeded()
1230 {
1231 bufferPool.release(buffer);
1232 IStream stream = getStream();
1233 dataInfo.consume(size);
1234 flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1235 if (dataInfo.available() > 0)
1236 {
1237
1238
1239
1240 flush(this, flusher.prepend(this));
1241 }
1242 else
1243 {
1244 super.succeeded();
1245 stream.updateCloseState(dataInfo.isClose(), true);
1246 if (stream.isClosed())
1247 removeStream(stream);
1248 }
1249 }
1250
1251 @Override
1252 public String toString()
1253 {
1254 return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1255 }
1256 }
1257
1258 protected class CloseFrameBytes extends AbstractFrameBytes
1259 {
1260 private CloseFrameBytes()
1261 {
1262 super(null, Callback.Adapter.INSTANCE);
1263 }
1264
1265 @Override
1266 public ByteBuffer getByteBuffer()
1267 {
1268 return BufferUtil.EMPTY_BUFFER;
1269 }
1270
1271 @Override
1272 public void succeeded()
1273 {
1274 super.succeeded();
1275 close();
1276 }
1277 }
1278
1279 private static class PingInfoCallback extends PingResultInfo implements Callback
1280 {
1281 private final Promise<PingResultInfo> promise;
1282
1283 public PingInfoCallback(int pingId, Promise<PingResultInfo> promise)
1284 {
1285 super(pingId);
1286 this.promise = promise;
1287 }
1288
1289 @Override
1290 public void succeeded()
1291 {
1292 if (promise != null)
1293 promise.succeeded(this);
1294 }
1295
1296 @Override
1297 public void failed(Throwable x)
1298 {
1299 if (promise != null)
1300 promise.failed(x);
1301 }
1302 }
1303 }