1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.io.IOException;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.eclipse.jetty.http2.api.Stream;
28 import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
29 import org.eclipse.jetty.util.annotation.ManagedAttribute;
30 import org.eclipse.jetty.util.annotation.ManagedObject;
31 import org.eclipse.jetty.util.annotation.ManagedOperation;
32 import org.eclipse.jetty.util.component.ContainerLifeCycle;
33 import org.eclipse.jetty.util.component.Dumpable;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36
37 @ManagedObject
38 public abstract class AbstractFlowControlStrategy implements FlowControlStrategy, Dumpable
39 {
40 protected static final Logger LOG = Log.getLogger(FlowControlStrategy.class);
41
42 private final AtomicLong sessionStall = new AtomicLong();
43 private final AtomicLong sessionStallTime = new AtomicLong();
44 private final Map<IStream, Long> streamsStalls = new ConcurrentHashMap<>();
45 private final AtomicLong streamsStallTime = new AtomicLong();
46 private int initialStreamSendWindow;
47 private int initialStreamRecvWindow;
48
49 public AbstractFlowControlStrategy(int initialStreamSendWindow)
50 {
51 this.initialStreamSendWindow = initialStreamSendWindow;
52 this.initialStreamRecvWindow = DEFAULT_WINDOW_SIZE;
53 }
54
55 @ManagedAttribute(value = "The initial size of stream's flow control send window", readonly = true)
56 public int getInitialStreamSendWindow()
57 {
58 return initialStreamSendWindow;
59 }
60
61 @ManagedAttribute(value = "The initial size of stream's flow control receive window", readonly = true)
62 public int getInitialStreamRecvWindow()
63 {
64 return initialStreamRecvWindow;
65 }
66
67 @Override
68 public void onStreamCreated(IStream stream)
69 {
70 stream.updateSendWindow(initialStreamSendWindow);
71 stream.updateRecvWindow(initialStreamRecvWindow);
72 }
73
74 @Override
75 public void onStreamDestroyed(IStream stream)
76 {
77 }
78
79 @Override
80 public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local)
81 {
82 int previousInitialStreamWindow;
83 if (local)
84 {
85 previousInitialStreamWindow = getInitialStreamRecvWindow();
86 this.initialStreamRecvWindow = initialStreamWindow;
87 }
88 else
89 {
90 previousInitialStreamWindow = getInitialStreamSendWindow();
91 this.initialStreamSendWindow = initialStreamWindow;
92 }
93 int delta = initialStreamWindow - previousInitialStreamWindow;
94
95
96 for (Stream stream : session.getStreams())
97 {
98 if (local)
99 {
100 ((IStream)stream).updateRecvWindow(delta);
101 if (LOG.isDebugEnabled())
102 LOG.debug("Updated initial stream recv window {} -> {} for {}", previousInitialStreamWindow, initialStreamWindow, stream);
103 }
104 else
105 {
106 session.onWindowUpdate((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
107 }
108 }
109 }
110
111 @Override
112 public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
113 {
114 int delta = frame.getWindowDelta();
115 if (frame.getStreamId() > 0)
116 {
117
118 if (stream != null)
119 {
120 int oldSize = stream.updateSendWindow(delta);
121 if (LOG.isDebugEnabled())
122 LOG.debug("Updated stream send window {} -> {} for {}", oldSize, oldSize + delta, stream);
123 if (oldSize <= 0)
124 onStreamUnstalled(stream);
125 }
126 }
127 else
128 {
129 int oldSize = session.updateSendWindow(delta);
130 if (LOG.isDebugEnabled())
131 LOG.debug("Updated session send window {} -> {} for {}", oldSize, oldSize + delta, session);
132 if (oldSize <= 0)
133 onSessionUnstalled(session);
134 }
135 }
136
137 @Override
138 public void onDataReceived(ISession session, IStream stream, int length)
139 {
140 int oldSize = session.updateRecvWindow(-length);
141 if (LOG.isDebugEnabled())
142 LOG.debug("Data received, updated session recv window {} -> {} for {}", oldSize, oldSize - length, session);
143
144 if (stream != null)
145 {
146 oldSize = stream.updateRecvWindow(-length);
147 if (LOG.isDebugEnabled())
148 LOG.debug("Data received, updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream);
149 }
150 }
151
152 @Override
153 public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
154 {
155 }
156
157 @Override
158 public void onDataSending(IStream stream, int length)
159 {
160 if (length == 0)
161 return;
162
163 ISession session = stream.getSession();
164 int oldSessionWindow = session.updateSendWindow(-length);
165 int newSessionWindow = oldSessionWindow - length;
166 if (LOG.isDebugEnabled())
167 LOG.debug("Sending, session send window {} -> {} for {}", oldSessionWindow, newSessionWindow, session);
168 if (newSessionWindow <= 0)
169 onSessionStalled(session);
170
171 int oldStreamWindow = stream.updateSendWindow(-length);
172 int newStreamWindow = oldStreamWindow - length;
173 if (LOG.isDebugEnabled())
174 LOG.debug("Sending, stream send window {} -> {} for {}", oldStreamWindow, newStreamWindow, stream);
175 if (newStreamWindow <= 0)
176 onStreamStalled(stream);
177 }
178
179 @Override
180 public void onDataSent(IStream stream, int length)
181 {
182 }
183
184 protected void onSessionStalled(ISession session)
185 {
186 sessionStall.set(System.nanoTime());
187 if (LOG.isDebugEnabled())
188 LOG.debug("Session stalled {}", session);
189 }
190
191 protected void onStreamStalled(IStream stream)
192 {
193 streamsStalls.put(stream, System.nanoTime());
194 if (LOG.isDebugEnabled())
195 LOG.debug("Stream stalled {}", stream);
196 }
197
198 protected void onSessionUnstalled(ISession session)
199 {
200 sessionStallTime.addAndGet(System.nanoTime() - sessionStall.getAndSet(0));
201 if (LOG.isDebugEnabled())
202 LOG.debug("Session unstalled {}", session);
203 }
204
205 protected void onStreamUnstalled(IStream stream)
206 {
207 Long time = streamsStalls.remove(stream);
208 if (time != null)
209 streamsStallTime.addAndGet(System.nanoTime() - time);
210 if (LOG.isDebugEnabled())
211 LOG.debug("Stream unstalled {}", stream);
212 }
213
214 @ManagedAttribute(value = "The time, in milliseconds, that the session flow control has stalled", readonly = true)
215 public long getSessionStallTime()
216 {
217 return TimeUnit.NANOSECONDS.toMillis(sessionStallTime.get());
218 }
219
220 @ManagedAttribute(value = "The time, in milliseconds, that the streams flow control has stalled", readonly = true)
221 public long getStreamsStallTime()
222 {
223 return TimeUnit.NANOSECONDS.toMillis(streamsStallTime.get());
224 }
225
226 @ManagedOperation(value = "Resets the statistics", impact = "ACTION")
227 public void reset()
228 {
229 sessionStallTime.set(0);
230 streamsStallTime.set(0);
231 }
232
233 @Override
234 public String dump()
235 {
236 return ContainerLifeCycle.dump(this);
237 }
238
239 @Override
240 public void dump(Appendable out, String indent) throws IOException
241 {
242 out.append(toString()).append(System.lineSeparator());
243 }
244 }