1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.atomic.AtomicInteger;
24
25 import org.eclipse.jetty.http2.frames.Frame;
26 import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
27 import org.eclipse.jetty.util.Atomics;
28 import org.eclipse.jetty.util.Callback;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
53 {
54 private final AtomicInteger maxSessionRecvWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE);
55 private final AtomicInteger sessionLevel = new AtomicInteger();
56 private final Map<IStream, AtomicInteger> streamLevels = new ConcurrentHashMap<>();
57 private final float bufferRatio;
58
59 public BufferingFlowControlStrategy(float bufferRatio)
60 {
61 this(DEFAULT_WINDOW_SIZE, bufferRatio);
62 }
63
64 public BufferingFlowControlStrategy(int initialStreamSendWindow, float bufferRatio)
65 {
66 super(initialStreamSendWindow);
67 this.bufferRatio = bufferRatio;
68 }
69
70 @Override
71 public void onStreamCreated(IStream stream, boolean local)
72 {
73 super.onStreamCreated(stream, local);
74 streamLevels.put(stream, new AtomicInteger());
75 }
76
77 @Override
78 public void onStreamDestroyed(IStream stream, boolean local)
79 {
80 streamLevels.remove(stream);
81 super.onStreamDestroyed(stream, local);
82 }
83
84 @Override
85 public void onDataConsumed(ISession session, IStream stream, int length)
86 {
87 if (length <= 0)
88 return;
89
90 WindowUpdateFrame windowFrame = null;
91 int level = sessionLevel.addAndGet(length);
92 int maxLevel = (int)(maxSessionRecvWindow.get() * bufferRatio);
93 if (level > maxLevel)
94 {
95 level = sessionLevel.getAndSet(0);
96 session.updateRecvWindow(level);
97 if (LOG.isDebugEnabled())
98 LOG.debug("Data consumed, updated session recv window by {}/{} for {}", level, maxLevel, session);
99 windowFrame = new WindowUpdateFrame(0, level);
100 }
101 else
102 {
103 if (LOG.isDebugEnabled())
104 LOG.debug("Data consumed, session recv window level {}/{} for {}", level, maxLevel, session);
105 }
106
107 Frame[] windowFrames = Frame.EMPTY_ARRAY;
108 if (stream != null)
109 {
110 if (stream.isClosed())
111 {
112 if (LOG.isDebugEnabled())
113 LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream);
114 }
115 else
116 {
117 AtomicInteger streamLevel = streamLevels.get(stream);
118 if (streamLevel != null)
119 {
120 level = streamLevel.addAndGet(length);
121 maxLevel = (int)(getInitialStreamRecvWindow() * bufferRatio);
122 if (level > maxLevel)
123 {
124 level = streamLevel.getAndSet(0);
125 stream.updateRecvWindow(level);
126 if (LOG.isDebugEnabled())
127 LOG.debug("Data consumed, updated stream recv window by {}/{} for {}", level, maxLevel, stream);
128 WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
129 if (windowFrame == null)
130 windowFrame = frame;
131 else
132 windowFrames = new Frame[]{frame};
133 }
134 else
135 {
136 if (LOG.isDebugEnabled())
137 LOG.debug("Data consumed, stream recv window level {}/{} for {}", level, maxLevel, session);
138 }
139 }
140 }
141 }
142
143 if (windowFrame != null)
144 session.frames(stream, Callback.NOOP, windowFrame, windowFrames);
145 }
146
147 @Override
148 public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
149 {
150 super.windowUpdate(session, stream, frame);
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179 if (frame.getStreamId() == 0)
180 {
181 int sessionWindow = session.updateRecvWindow(0);
182 Atomics.updateMax(maxSessionRecvWindow, sessionWindow);
183 }
184 }
185 }