1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.io.IOException;
22 import java.nio.channels.ClosedChannelException;
23 import java.nio.charset.StandardCharsets;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.eclipse.jetty.http2.api.Session;
36 import org.eclipse.jetty.http2.api.Stream;
37 import org.eclipse.jetty.http2.frames.DataFrame;
38 import org.eclipse.jetty.http2.frames.DisconnectFrame;
39 import org.eclipse.jetty.http2.frames.Frame;
40 import org.eclipse.jetty.http2.frames.FrameType;
41 import org.eclipse.jetty.http2.frames.GoAwayFrame;
42 import org.eclipse.jetty.http2.frames.HeadersFrame;
43 import org.eclipse.jetty.http2.frames.PingFrame;
44 import org.eclipse.jetty.http2.frames.PriorityFrame;
45 import org.eclipse.jetty.http2.frames.PushPromiseFrame;
46 import org.eclipse.jetty.http2.frames.ResetFrame;
47 import org.eclipse.jetty.http2.frames.SettingsFrame;
48 import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
49 import org.eclipse.jetty.http2.generator.Generator;
50 import org.eclipse.jetty.http2.parser.Parser;
51 import org.eclipse.jetty.io.ByteBufferPool;
52 import org.eclipse.jetty.io.EndPoint;
53 import org.eclipse.jetty.util.Atomics;
54 import org.eclipse.jetty.util.Callback;
55 import org.eclipse.jetty.util.CountingCallback;
56 import org.eclipse.jetty.util.Promise;
57 import org.eclipse.jetty.util.log.Log;
58 import org.eclipse.jetty.util.log.Logger;
59 import org.eclipse.jetty.util.thread.Scheduler;
60
61 public abstract class HTTP2Session implements ISession, Parser.Listener
62 {
63 private static final Logger LOG = Log.getLogger(HTTP2Session.class);
64
65 private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
66 private final AtomicInteger streamIds = new AtomicInteger();
67 private final AtomicInteger lastStreamId = new AtomicInteger();
68 private final AtomicInteger localStreamCount = new AtomicInteger();
69 private final AtomicInteger remoteStreamCount = new AtomicInteger();
70 private final AtomicInteger sendWindow = new AtomicInteger();
71 private final AtomicInteger recvWindow = new AtomicInteger();
72 private final AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED);
73 private final Scheduler scheduler;
74 private final EndPoint endPoint;
75 private final Generator generator;
76 private final Listener listener;
77 private final FlowControlStrategy flowControl;
78 private final HTTP2Flusher flusher;
79 private int maxLocalStreams;
80 private int maxRemoteStreams;
81 private long streamIdleTimeout;
82 private boolean pushEnabled;
83
84 public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControlStrategy flowControl, int initialStreamId)
85 {
86 this.scheduler = scheduler;
87 this.endPoint = endPoint;
88 this.generator = generator;
89 this.listener = listener;
90 this.flowControl = flowControl;
91 this.flusher = new HTTP2Flusher(this);
92 this.maxLocalStreams = -1;
93 this.maxRemoteStreams = -1;
94 this.streamIds.set(initialStreamId);
95 this.streamIdleTimeout = endPoint.getIdleTimeout();
96 this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
97 this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
98 this.pushEnabled = true;
99 }
100
101 public FlowControlStrategy getFlowControlStrategy()
102 {
103 return flowControl;
104 }
105
106 public int getMaxLocalStreams()
107 {
108 return maxLocalStreams;
109 }
110
111 public void setMaxLocalStreams(int maxLocalStreams)
112 {
113 this.maxLocalStreams = maxLocalStreams;
114 }
115
116 public int getMaxRemoteStreams()
117 {
118 return maxRemoteStreams;
119 }
120
121 public void setMaxRemoteStreams(int maxRemoteStreams)
122 {
123 this.maxRemoteStreams = maxRemoteStreams;
124 }
125
126 public long getStreamIdleTimeout()
127 {
128 return streamIdleTimeout;
129 }
130
131 public void setStreamIdleTimeout(long streamIdleTimeout)
132 {
133 this.streamIdleTimeout = streamIdleTimeout;
134 }
135
136 public EndPoint getEndPoint()
137 {
138 return endPoint;
139 }
140
141 public Generator getGenerator()
142 {
143 return generator;
144 }
145
146 @Override
147 public void onData(final DataFrame frame)
148 {
149 if (LOG.isDebugEnabled())
150 LOG.debug("Received {}", frame);
151
152 int streamId = frame.getStreamId();
153 final IStream stream = getStream(streamId);
154
155
156
157 final int flowControlLength = frame.remaining() + frame.padding();
158 flowControl.onDataReceived(this, stream, flowControlLength);
159
160 if (stream != null)
161 {
162 if (getRecvWindow() < 0)
163 {
164 close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.NOOP);
165 }
166 else
167 {
168 stream.process(frame, new Callback()
169 {
170 @Override
171 public void succeeded()
172 {
173 flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
174 }
175
176 @Override
177 public void failed(Throwable x)
178 {
179
180
181 flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
182 }
183 });
184 }
185 }
186 else
187 {
188 if (LOG.isDebugEnabled())
189 LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
190
191
192 flowControl.onDataConsumed(this, null, flowControlLength);
193 }
194 }
195
196 @Override
197 public abstract void onHeaders(HeadersFrame frame);
198
199 @Override
200 public void onPriority(PriorityFrame frame)
201 {
202 if (LOG.isDebugEnabled())
203 LOG.debug("Received {}", frame);
204 }
205
206 @Override
207 public void onReset(ResetFrame frame)
208 {
209 if (LOG.isDebugEnabled())
210 LOG.debug("Received {}", frame);
211
212 IStream stream = getStream(frame.getStreamId());
213 if (stream != null)
214 stream.process(frame, Callback.NOOP);
215 else
216 notifyReset(this, frame);
217 }
218
219 @Override
220 public void onSettings(SettingsFrame frame)
221 {
222
223 onSettings(frame, true);
224 }
225
226 public void onSettings(SettingsFrame frame, boolean reply)
227 {
228 if (LOG.isDebugEnabled())
229 LOG.debug("Received {}", frame);
230
231 if (frame.isReply())
232 return;
233
234
235 for (Map.Entry<Integer, Integer> entry : frame.getSettings().entrySet())
236 {
237 int key = entry.getKey();
238 int value = entry.getValue();
239 switch (key)
240 {
241 case SettingsFrame.HEADER_TABLE_SIZE:
242 {
243 if (LOG.isDebugEnabled())
244 LOG.debug("Update HPACK header table size to {}", value);
245 generator.setHeaderTableSize(value);
246 break;
247 }
248 case SettingsFrame.ENABLE_PUSH:
249 {
250
251 if (value != 0 && value != 1)
252 {
253 onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_enable_push");
254 return;
255 }
256 pushEnabled = value == 1;
257 break;
258 }
259 case SettingsFrame.MAX_CONCURRENT_STREAMS:
260 {
261 maxLocalStreams = value;
262 if (LOG.isDebugEnabled())
263 LOG.debug("Update max local concurrent streams to {}", maxLocalStreams);
264 break;
265 }
266 case SettingsFrame.INITIAL_WINDOW_SIZE:
267 {
268 if (LOG.isDebugEnabled())
269 LOG.debug("Update initial window size to {}", value);
270 flowControl.updateInitialStreamWindow(this, value, false);
271 break;
272 }
273 case SettingsFrame.MAX_FRAME_SIZE:
274 {
275 if (LOG.isDebugEnabled())
276 LOG.debug("Update max frame size to {}", value);
277
278 if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH)
279 {
280 onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size");
281 return;
282 }
283 generator.setMaxFrameSize(value);
284 break;
285 }
286 case SettingsFrame.MAX_HEADER_LIST_SIZE:
287 {
288
289 LOG.warn("NOT IMPLEMENTED max header list size to {}", value);
290 break;
291 }
292 default:
293 {
294 LOG.debug("Unknown setting {}:{}", key, value);
295 break;
296 }
297 }
298 }
299 notifySettings(this, frame);
300
301 if (reply)
302 {
303 SettingsFrame replyFrame = new SettingsFrame(Collections.<Integer, Integer>emptyMap(), true);
304 settings(replyFrame, Callback.NOOP);
305 }
306 }
307
308 @Override
309 public void onPing(PingFrame frame)
310 {
311 if (LOG.isDebugEnabled())
312 LOG.debug("Received {}", frame);
313
314 if (frame.isReply())
315 {
316 notifyPing(this, frame);
317 }
318 else
319 {
320 PingFrame reply = new PingFrame(frame.getPayload(), true);
321 control(null, Callback.NOOP, reply);
322 }
323 }
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342 @Override
343 public void onGoAway(final GoAwayFrame frame)
344 {
345 if (LOG.isDebugEnabled())
346 LOG.debug("Received {}", frame);
347
348 while (true)
349 {
350 CloseState current = closed.get();
351 switch (current)
352 {
353 case NOT_CLOSED:
354 {
355 if (closed.compareAndSet(current, CloseState.REMOTELY_CLOSED))
356 {
357
358
359 control(null, new Callback()
360 {
361 @Override
362 public void succeeded()
363 {
364 notifyClose(HTTP2Session.this, frame);
365 }
366
367 @Override
368 public void failed(Throwable x)
369 {
370 notifyClose(HTTP2Session.this, frame);
371 }
372 }, new DisconnectFrame());
373 return;
374 }
375 break;
376 }
377 default:
378 {
379 if (LOG.isDebugEnabled())
380 LOG.debug("Ignored {}, already closed", frame);
381 return;
382 }
383 }
384 }
385 }
386
387 @Override
388 public void onWindowUpdate(WindowUpdateFrame frame)
389 {
390 if (LOG.isDebugEnabled())
391 LOG.debug("Received {}", frame);
392
393 int streamId = frame.getStreamId();
394 if (streamId > 0)
395 {
396 IStream stream = getStream(streamId);
397 if (stream != null)
398 onWindowUpdate(stream, frame);
399 }
400 else
401 {
402 onWindowUpdate(null, frame);
403 }
404 }
405
406 @Override
407 public void onConnectionFailure(int error, String reason)
408 {
409 close(error, reason, Callback.NOOP);
410 notifyFailure(this, new IOException(String.format("%d/%s", error, reason)));
411 }
412
413 @Override
414 public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
415 {
416
417
418 boolean queued;
419 synchronized (this)
420 {
421 int streamId = frame.getStreamId();
422 if (streamId <= 0)
423 {
424 streamId = streamIds.getAndAdd(2);
425 PriorityFrame priority = frame.getPriority();
426 priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
427 priority.getWeight(), priority.isExclusive());
428 frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
429 }
430 final IStream stream = createLocalStream(streamId, promise);
431 if (stream == null)
432 return;
433 stream.setListener(listener);
434
435 ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
436 queued = flusher.append(entry);
437 }
438
439 if (queued)
440 flusher.iterate();
441 }
442
443 @Override
444 public int priority(PriorityFrame frame, Callback callback)
445 {
446 int streamId = frame.getStreamId();
447 IStream stream = streams.get(streamId);
448 if (stream == null)
449 {
450 streamId = streamIds.getAndAdd(2);
451 frame = new PriorityFrame(streamId, frame.getParentStreamId(),
452 frame.getWeight(), frame.isExclusive());
453 }
454 control(stream, callback, frame);
455 return streamId;
456 }
457
458 @Override
459 public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
460 {
461
462
463 boolean queued;
464 synchronized (this)
465 {
466 int streamId = streamIds.getAndAdd(2);
467 frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
468
469 final IStream pushStream = createLocalStream(streamId, promise);
470 if (pushStream == null)
471 return;
472 pushStream.setListener(listener);
473
474 ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
475 queued = flusher.append(entry);
476 }
477
478 if (queued)
479 flusher.iterate();
480 }
481
482
483 @Override
484 public void settings(SettingsFrame frame, Callback callback)
485 {
486 control(null, callback, frame);
487 }
488
489 @Override
490 public void ping(PingFrame frame, Callback callback)
491 {
492 if (frame.isReply())
493 callback.failed(new IllegalArgumentException());
494 else
495 control(null, callback, frame);
496 }
497
498 protected void reset(ResetFrame frame, Callback callback)
499 {
500 control(getStream(frame.getStreamId()), callback, frame);
501 }
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526 @Override
527 public boolean close(int error, String reason, Callback callback)
528 {
529 while (true)
530 {
531 CloseState current = closed.get();
532 switch (current)
533 {
534 case NOT_CLOSED:
535 {
536 if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
537 {
538 byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
539 GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
540 if (LOG.isDebugEnabled())
541 LOG.debug("Sending {}", frame);
542 control(null, callback, frame);
543 return true;
544 }
545 break;
546 }
547 default:
548 {
549 if (LOG.isDebugEnabled())
550 LOG.debug("Ignoring close {}/{}, already closed", error, reason);
551 callback.succeeded();
552 return false;
553 }
554 }
555 }
556 }
557
558 @Override
559 public boolean isClosed()
560 {
561 return closed.get() != CloseState.NOT_CLOSED;
562 }
563
564 private void control(IStream stream, Callback callback, Frame frame)
565 {
566 frames(stream, callback, frame, Frame.EMPTY_ARRAY);
567 }
568
569 @Override
570 public void frames(IStream stream, Callback callback, Frame frame, Frame... frames)
571 {
572
573
574
575
576
577 int length = frames.length;
578 if (length == 0)
579 {
580 frame(new ControlEntry(frame, stream, callback), true);
581 }
582 else
583 {
584 callback = new CountingCallback(callback, 1 + length);
585 frame(new ControlEntry(frame, stream, callback), false);
586 for (int i = 1; i <= length; ++i)
587 frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
588 }
589 }
590
591 @Override
592 public void data(IStream stream, Callback callback, DataFrame frame)
593 {
594
595 frame(new DataEntry(frame, stream, callback), true);
596 }
597
598 private void frame(HTTP2Flusher.Entry entry, boolean flush)
599 {
600 if (LOG.isDebugEnabled())
601 LOG.debug("Sending {}", entry.frame);
602
603 boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
604 if (queued && flush)
605 flusher.iterate();
606 }
607
608 protected IStream createLocalStream(int streamId, Promise<Stream> promise)
609 {
610 while (true)
611 {
612 int localCount = localStreamCount.get();
613 int maxCount = maxLocalStreams;
614 if (maxCount >= 0 && localCount >= maxCount)
615 {
616 promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
617 return null;
618 }
619 if (localStreamCount.compareAndSet(localCount, localCount + 1))
620 break;
621 }
622
623 IStream stream = newStream(streamId);
624 if (streams.putIfAbsent(streamId, stream) == null)
625 {
626 stream.setIdleTimeout(getStreamIdleTimeout());
627 flowControl.onStreamCreated(stream, true);
628 if (LOG.isDebugEnabled())
629 LOG.debug("Created local {}", stream);
630 return stream;
631 }
632 else
633 {
634 promise.failed(new IllegalStateException("Duplicate stream " + streamId));
635 return null;
636 }
637 }
638
639 protected IStream createRemoteStream(int streamId)
640 {
641
642 while (true)
643 {
644 int remoteCount = remoteStreamCount.get();
645 int maxCount = getMaxRemoteStreams();
646 if (maxCount >= 0 && remoteCount >= maxCount)
647 {
648 reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
649 return null;
650 }
651 if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
652 break;
653 }
654
655 IStream stream = newStream(streamId);
656
657
658 if (streams.putIfAbsent(streamId, stream) == null)
659 {
660 updateLastStreamId(streamId);
661 stream.setIdleTimeout(getStreamIdleTimeout());
662 flowControl.onStreamCreated(stream, false);
663 if (LOG.isDebugEnabled())
664 LOG.debug("Created remote {}", stream);
665 return stream;
666 }
667 else
668 {
669 close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.NOOP);
670 return null;
671 }
672 }
673
674 protected IStream newStream(int streamId)
675 {
676 return new HTTP2Stream(scheduler, this, streamId);
677 }
678
679 @Override
680 public void removeStream(IStream stream, boolean local)
681 {
682 IStream removed = streams.remove(stream.getId());
683 if (removed != null)
684 {
685 assert removed == stream;
686
687 if (local)
688 localStreamCount.decrementAndGet();
689 else
690 remoteStreamCount.decrementAndGet();
691
692 flowControl.onStreamDestroyed(stream, local);
693
694 if (LOG.isDebugEnabled())
695 LOG.debug("Removed {}", stream);
696 }
697 }
698
699 @Override
700 public Collection<Stream> getStreams()
701 {
702 List<Stream> result = new ArrayList<>();
703 result.addAll(streams.values());
704 return result;
705 }
706
707 @Override
708 public IStream getStream(int streamId)
709 {
710 return streams.get(streamId);
711 }
712
713 public int getSendWindow()
714 {
715 return sendWindow.get();
716 }
717
718 public int getRecvWindow()
719 {
720 return recvWindow.get();
721 }
722
723 @Override
724 public int updateSendWindow(int delta)
725 {
726 return sendWindow.getAndAdd(delta);
727 }
728
729 @Override
730 public int updateRecvWindow(int delta)
731 {
732 return recvWindow.getAndAdd(delta);
733 }
734
735 @Override
736 public void onWindowUpdate(IStream stream, WindowUpdateFrame frame)
737 {
738
739
740
741
742
743
744
745
746
747 flusher.window(stream, frame);
748 }
749
750 @Override
751 public boolean isPushEnabled()
752 {
753 return pushEnabled;
754 }
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778 @Override
779 public void onShutdown()
780 {
781 if (LOG.isDebugEnabled())
782 LOG.debug("Shutting down {}", this);
783
784 switch (closed.get())
785 {
786 case NOT_CLOSED:
787 {
788
789 if (LOG.isDebugEnabled())
790 LOG.debug("Abrupt close for {}", this);
791 abort(new ClosedChannelException());
792 break;
793 }
794 case LOCALLY_CLOSED:
795 {
796
797
798 control(null, Callback.NOOP, new DisconnectFrame());
799 break;
800 }
801 case REMOTELY_CLOSED:
802 {
803
804
805 break;
806 }
807 default:
808 {
809 break;
810 }
811 }
812 }
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834 @Override
835 public void onIdleTimeout()
836 {
837 switch (closed.get())
838 {
839 case NOT_CLOSED:
840 {
841
842 close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
843 break;
844 }
845 case LOCALLY_CLOSED:
846 case REMOTELY_CLOSED:
847 {
848 abort(new TimeoutException());
849 break;
850 }
851 default:
852 {
853 break;
854 }
855 }
856 }
857
858 @Override
859 public void onFrame(Frame frame)
860 {
861 onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade");
862 }
863
864 public void disconnect()
865 {
866 if (LOG.isDebugEnabled())
867 LOG.debug("Disconnecting {}", this);
868 endPoint.close();
869 }
870
871 private void terminate()
872 {
873 while (true)
874 {
875 CloseState current = closed.get();
876 switch (current)
877 {
878 case NOT_CLOSED:
879 case LOCALLY_CLOSED:
880 case REMOTELY_CLOSED:
881 {
882 if (closed.compareAndSet(current, CloseState.CLOSED))
883 {
884 flusher.close();
885 for (IStream stream : streams.values())
886 stream.close();
887 streams.clear();
888 disconnect();
889 return;
890 }
891 break;
892 }
893 default:
894 {
895 return;
896 }
897 }
898 }
899 }
900
901 protected void abort(Throwable failure)
902 {
903 terminate();
904 notifyFailure(this, failure);
905 }
906
907 public boolean isDisconnected()
908 {
909 return !endPoint.isOpen();
910 }
911
912 private void updateLastStreamId(int streamId)
913 {
914 Atomics.updateMax(lastStreamId, streamId);
915 }
916
917 protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame)
918 {
919 try
920 {
921 return listener.onNewStream(stream, frame);
922 }
923 catch (Throwable x)
924 {
925 LOG.info("Failure while notifying listener " + listener, x);
926 return null;
927 }
928 }
929
930 protected void notifySettings(Session session, SettingsFrame frame)
931 {
932 try
933 {
934 listener.onSettings(session, frame);
935 }
936 catch (Throwable x)
937 {
938 LOG.info("Failure while notifying listener " + listener, x);
939 }
940 }
941
942 protected void notifyPing(Session session, PingFrame frame)
943 {
944 try
945 {
946 listener.onPing(session, frame);
947 }
948 catch (Throwable x)
949 {
950 LOG.info("Failure while notifying listener " + listener, x);
951 }
952 }
953
954 protected void notifyReset(Session session, ResetFrame frame)
955 {
956 try
957 {
958 listener.onReset(session, frame);
959 }
960 catch (Throwable x)
961 {
962 LOG.info("Failure while notifying listener " + listener, x);
963 }
964 }
965
966 protected void notifyClose(Session session, GoAwayFrame frame)
967 {
968 try
969 {
970 listener.onClose(session, frame);
971 }
972 catch (Throwable x)
973 {
974 LOG.info("Failure while notifying listener " + listener, x);
975 }
976 }
977
978 protected void notifyFailure(Session session, Throwable failure)
979 {
980 try
981 {
982 listener.onFailure(session, failure);
983 }
984 catch (Throwable x)
985 {
986 LOG.info("Failure while notifying listener " + listener, x);
987 }
988 }
989
990 @Override
991 public String toString()
992 {
993 return String.format("%s@%x{queueSize=%d,sendWindow=%s,recvWindow=%s,streams=%d,%s}", getClass().getSimpleName(),
994 hashCode(), flusher.getQueueSize(), sendWindow, recvWindow, streams.size(), closed);
995 }
996
997 private class ControlEntry extends HTTP2Flusher.Entry
998 {
999 private ControlEntry(Frame frame, IStream stream, Callback callback)
1000 {
1001 super(frame, stream, callback);
1002 }
1003
1004 public Throwable generate(ByteBufferPool.Lease lease)
1005 {
1006 try
1007 {
1008 generator.control(lease, frame);
1009 if (LOG.isDebugEnabled())
1010 LOG.debug("Generated {}", frame);
1011 prepare();
1012 return null;
1013 }
1014 catch (Throwable x)
1015 {
1016 if (LOG.isDebugEnabled())
1017 LOG.debug("Failure generating frame " + frame, x);
1018 return x;
1019 }
1020 }
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035 private void prepare()
1036 {
1037 switch (frame.getType())
1038 {
1039 case SETTINGS:
1040 {
1041 SettingsFrame settingsFrame = (SettingsFrame)frame;
1042 Integer initialWindow = settingsFrame.getSettings().get(SettingsFrame.INITIAL_WINDOW_SIZE);
1043 if (initialWindow != null)
1044 flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true);
1045 break;
1046 }
1047 default:
1048 {
1049 break;
1050 }
1051 }
1052 }
1053
1054 @Override
1055 public void succeeded()
1056 {
1057 switch (frame.getType())
1058 {
1059 case HEADERS:
1060 {
1061 HeadersFrame headersFrame = (HeadersFrame)frame;
1062 if (stream.updateClose(headersFrame.isEndStream(), true))
1063 removeStream(stream, true);
1064 break;
1065 }
1066 case RST_STREAM:
1067 {
1068 if (stream != null)
1069 {
1070 stream.close();
1071 removeStream(stream, true);
1072 }
1073 break;
1074 }
1075 case PUSH_PROMISE:
1076 {
1077
1078
1079 stream.updateClose(true, false);
1080 break;
1081 }
1082 case GO_AWAY:
1083 {
1084
1085
1086 getEndPoint().shutdownOutput();
1087 break;
1088 }
1089 case WINDOW_UPDATE:
1090 {
1091 flowControl.windowUpdate(HTTP2Session.this, stream, (WindowUpdateFrame)frame);
1092 break;
1093 }
1094 case DISCONNECT:
1095 {
1096 terminate();
1097 break;
1098 }
1099 default:
1100 {
1101 break;
1102 }
1103 }
1104 callback.succeeded();
1105 }
1106 }
1107
1108 private class DataEntry extends HTTP2Flusher.Entry
1109 {
1110 private int length;
1111
1112 private DataEntry(DataFrame frame, IStream stream, Callback callback)
1113 {
1114 super(frame, stream, callback);
1115 }
1116
1117 @Override
1118 public int dataRemaining()
1119 {
1120
1121
1122
1123
1124
1125 return ((DataFrame)frame).remaining();
1126 }
1127
1128 public Throwable generate(ByteBufferPool.Lease lease)
1129 {
1130 try
1131 {
1132 int flowControlLength = dataRemaining();
1133
1134 int sessionSendWindow = getSendWindow();
1135 if (sessionSendWindow < 0)
1136 throw new IllegalStateException();
1137
1138 int streamSendWindow = stream.updateSendWindow(0);
1139 if (streamSendWindow < 0)
1140 throw new IllegalStateException();
1141
1142 int window = Math.min(streamSendWindow, sessionSendWindow);
1143
1144 int length = this.length = Math.min(flowControlLength, window);
1145 if (LOG.isDebugEnabled())
1146 LOG.debug("Generated {}, length/window={}/{}", frame, length, window);
1147
1148 generator.data(lease, (DataFrame)frame, length);
1149 flowControl.onDataSending(stream, length);
1150 return null;
1151 }
1152 catch (Throwable x)
1153 {
1154 if (LOG.isDebugEnabled())
1155 LOG.debug("Failure generating frame " + frame, x);
1156 return x;
1157 }
1158 }
1159
1160 @Override
1161 public void succeeded()
1162 {
1163 flowControl.onDataSent(stream, length);
1164
1165 DataFrame dataFrame = (DataFrame)frame;
1166 if (dataFrame.remaining() > 0)
1167 {
1168
1169
1170
1171 flusher.prepend(this);
1172 }
1173 else
1174 {
1175
1176
1177 if (stream.updateClose(dataFrame.isEndStream(), true))
1178 removeStream(stream, true);
1179 callback.succeeded();
1180 }
1181 }
1182 }
1183
1184 private static class PromiseCallback<C> implements Callback
1185 {
1186 private final Promise<C> promise;
1187 private final C value;
1188
1189 private PromiseCallback(Promise<C> promise, C value)
1190 {
1191 this.promise = promise;
1192 this.value = value;
1193 }
1194
1195 @Override
1196 public void succeeded()
1197 {
1198 promise.succeeded(value);
1199 }
1200
1201 @Override
1202 public void failed(Throwable x)
1203 {
1204 promise.failed(x);
1205 }
1206 }
1207 }