1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.atomic.AtomicInteger;
24
25 import org.eclipse.jetty.http2.frames.Frame;
26 import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
27 import org.eclipse.jetty.util.Atomics;
28 import org.eclipse.jetty.util.Callback;
29 import org.eclipse.jetty.util.annotation.ManagedAttribute;
30 import org.eclipse.jetty.util.annotation.ManagedObject;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 @ManagedObject
55 public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
56 {
57 private final AtomicInteger maxSessionRecvWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE);
58 private final AtomicInteger sessionLevel = new AtomicInteger();
59 private final Map<IStream, AtomicInteger> streamLevels = new ConcurrentHashMap<>();
60 private float bufferRatio;
61
62 public BufferingFlowControlStrategy(float bufferRatio)
63 {
64 this(DEFAULT_WINDOW_SIZE, bufferRatio);
65 }
66
67 public BufferingFlowControlStrategy(int initialStreamSendWindow, float bufferRatio)
68 {
69 super(initialStreamSendWindow);
70 this.bufferRatio = bufferRatio;
71 }
72
73 @ManagedAttribute("The ratio between the receive buffer and the consume buffer")
74 public float getBufferRatio()
75 {
76 return bufferRatio;
77 }
78
79 public void setBufferRatio(float bufferRatio)
80 {
81 this.bufferRatio = bufferRatio;
82 }
83
84 @Override
85 public void onStreamCreated(IStream stream)
86 {
87 super.onStreamCreated(stream);
88 streamLevels.put(stream, new AtomicInteger());
89 }
90
91 @Override
92 public void onStreamDestroyed(IStream stream)
93 {
94 streamLevels.remove(stream);
95 super.onStreamDestroyed(stream);
96 }
97
98 @Override
99 public void onDataConsumed(ISession session, IStream stream, int length)
100 {
101 if (length <= 0)
102 return;
103
104 float ratio = bufferRatio;
105
106 WindowUpdateFrame windowFrame = null;
107 int level = sessionLevel.addAndGet(length);
108 int maxLevel = (int)(maxSessionRecvWindow.get() * ratio);
109 if (level > maxLevel)
110 {
111 level = sessionLevel.getAndSet(0);
112 session.updateRecvWindow(level);
113 if (LOG.isDebugEnabled())
114 LOG.debug("Data consumed, updated session recv window by {}/{} for {}", level, maxLevel, session);
115 windowFrame = new WindowUpdateFrame(0, level);
116 }
117 else
118 {
119 if (LOG.isDebugEnabled())
120 LOG.debug("Data consumed, session recv window level {}/{} for {}", level, maxLevel, session);
121 }
122
123 Frame[] windowFrames = Frame.EMPTY_ARRAY;
124 if (stream != null)
125 {
126 if (stream.isClosed())
127 {
128 if (LOG.isDebugEnabled())
129 LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream);
130 }
131 else
132 {
133 AtomicInteger streamLevel = streamLevels.get(stream);
134 if (streamLevel != null)
135 {
136 level = streamLevel.addAndGet(length);
137 maxLevel = (int)(getInitialStreamRecvWindow() * ratio);
138 if (level > maxLevel)
139 {
140 level = streamLevel.getAndSet(0);
141 stream.updateRecvWindow(level);
142 if (LOG.isDebugEnabled())
143 LOG.debug("Data consumed, updated stream recv window by {}/{} for {}", level, maxLevel, stream);
144 WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
145 if (windowFrame == null)
146 windowFrame = frame;
147 else
148 windowFrames = new Frame[]{frame};
149 }
150 else
151 {
152 if (LOG.isDebugEnabled())
153 LOG.debug("Data consumed, stream recv window level {}/{} for {}", level, maxLevel, session);
154 }
155 }
156 }
157 }
158
159 if (windowFrame != null)
160 session.frames(stream, Callback.NOOP, windowFrame, windowFrames);
161 }
162
163 @Override
164 public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
165 {
166 super.windowUpdate(session, stream, frame);
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195 if (frame.getStreamId() == 0)
196 {
197 int sessionWindow = session.updateRecvWindow(0);
198 Atomics.updateMax(maxSessionRecvWindow, sessionWindow);
199 }
200 }
201
202 @Override
203 public String toString()
204 {
205 return String.format("%s@%x[ratio=%.2f,sessionStallTime=%dms,streamsStallTime=%dms]",
206 getClass().getSimpleName(),
207 hashCode(),
208 bufferRatio,
209 getSessionStallTime(),
210 getStreamsStallTime());
211 }
212 }