1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import org.eclipse.jetty.http2.api.Stream;
22 import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
23 import org.eclipse.jetty.util.log.Log;
24 import org.eclipse.jetty.util.log.Logger;
25
26 public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
27 {
28 protected static final Logger LOG = Log.getLogger(FlowControlStrategy.class);
29
30 private int initialStreamSendWindow;
31 private int initialStreamRecvWindow;
32
33 public AbstractFlowControlStrategy(int initialStreamSendWindow)
34 {
35 this.initialStreamSendWindow = initialStreamSendWindow;
36 this.initialStreamRecvWindow = DEFAULT_WINDOW_SIZE;
37 }
38
39 protected int getInitialStreamSendWindow()
40 {
41 return initialStreamSendWindow;
42 }
43
44 protected int getInitialStreamRecvWindow()
45 {
46 return initialStreamRecvWindow;
47 }
48
49 @Override
50 public void onStreamCreated(IStream stream, boolean local)
51 {
52 stream.updateSendWindow(initialStreamSendWindow);
53 stream.updateRecvWindow(initialStreamRecvWindow);
54 }
55
56 @Override
57 public void onStreamDestroyed(IStream stream, boolean local)
58 {
59 }
60
61 @Override
62 public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local)
63 {
64 int previousInitialStreamWindow;
65 if (local)
66 {
67 previousInitialStreamWindow = getInitialStreamRecvWindow();
68 this.initialStreamRecvWindow = initialStreamWindow;
69 }
70 else
71 {
72 previousInitialStreamWindow = getInitialStreamSendWindow();
73 this.initialStreamSendWindow = initialStreamWindow;
74 }
75 int delta = initialStreamWindow - previousInitialStreamWindow;
76
77
78 for (Stream stream : session.getStreams())
79 {
80 if (local)
81 {
82 ((IStream)stream).updateRecvWindow(delta);
83 if (LOG.isDebugEnabled())
84 LOG.debug("Updated initial stream recv window {} -> {} for {}", previousInitialStreamWindow, initialStreamWindow, stream);
85 }
86 else
87 {
88 session.onWindowUpdate((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
89 }
90 }
91 }
92
93 @Override
94 public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
95 {
96 int delta = frame.getWindowDelta();
97 if (frame.getStreamId() > 0)
98 {
99
100 if (stream != null)
101 {
102 int oldSize = stream.updateSendWindow(delta);
103 if (LOG.isDebugEnabled())
104 LOG.debug("Updated stream send window {} -> {} for {}", oldSize, oldSize + delta, stream);
105 }
106 }
107 else
108 {
109 int oldSize = session.updateSendWindow(delta);
110 if (LOG.isDebugEnabled())
111 LOG.debug("Updated session send window {} -> {} for {}", oldSize, oldSize + delta, session);
112 }
113 }
114
115 @Override
116 public void onDataReceived(ISession session, IStream stream, int length)
117 {
118 int oldSize = session.updateRecvWindow(-length);
119 if (LOG.isDebugEnabled())
120 LOG.debug("Data received, updated session recv window {} -> {} for {}", oldSize, oldSize - length, session);
121
122 if (stream != null)
123 {
124 oldSize = stream.updateRecvWindow(-length);
125 if (LOG.isDebugEnabled())
126 LOG.debug("Data received, updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream);
127 }
128 }
129
130 @Override
131 public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
132 {
133 }
134
135 @Override
136 public void onDataSending(IStream stream, int length)
137 {
138 if (length == 0)
139 return;
140
141 ISession session = stream.getSession();
142 int oldSize = session.updateSendWindow(-length);
143 if (LOG.isDebugEnabled())
144 LOG.debug("Updated session send window {} -> {} for {}", oldSize, oldSize - length, session);
145
146 oldSize = stream.updateSendWindow(-length);
147 if (LOG.isDebugEnabled())
148 LOG.debug("Updated stream send window {} -> {} for {}", oldSize, oldSize - length, stream);
149 }
150
151 @Override
152 public void onDataSent(IStream stream, int length)
153 {
154 }
155
156 @Override
157 public void onSessionStalled(ISession session)
158 {
159 if (LOG.isDebugEnabled())
160 LOG.debug("Session stalled {}", session);
161 }
162
163 @Override
164 public void onStreamStalled(IStream stream)
165 {
166 if (LOG.isDebugEnabled())
167 LOG.debug("Stream stalled {}", stream);
168 }
169 }