1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.nio.ByteBuffer;
22 import java.nio.channels.ClosedChannelException;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Queue;
29
30 import org.eclipse.jetty.http2.frames.Frame;
31 import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
32 import org.eclipse.jetty.io.ByteBufferPool;
33 import org.eclipse.jetty.io.EofException;
34 import org.eclipse.jetty.util.ArrayQueue;
35 import org.eclipse.jetty.util.Callback;
36 import org.eclipse.jetty.util.IteratingCallback;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.log.Logger;
39
40 public class HTTP2Flusher extends IteratingCallback
41 {
42 private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);
43
44 private final Queue<WindowEntry> windows = new ArrayDeque<>();
45 private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
46 private final Map<IStream, Integer> streams = new HashMap<>();
47 private final List<Entry> resets = new ArrayList<>();
48 private final List<Entry> actives = new ArrayList<>();
49 private final Queue<Entry> completes = new ArrayDeque<>();
50 private final HTTP2Session session;
51 private final ByteBufferPool.Lease lease;
52
53 public HTTP2Flusher(HTTP2Session session)
54 {
55 this.session = session;
56 this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
57 }
58
59 public void window(IStream stream, WindowUpdateFrame frame)
60 {
61 boolean added = false;
62 synchronized (this)
63 {
64 if (!isClosed())
65 added = windows.offer(new WindowEntry(stream, frame));
66 }
67
68 if (added)
69 iterate();
70 }
71
72 public boolean prepend(Entry entry)
73 {
74 boolean fail = false;
75 synchronized (this)
76 {
77 if (isClosed())
78 {
79 fail = true;
80 }
81 else
82 {
83 frames.add(0, entry);
84 if (LOG.isDebugEnabled())
85 LOG.debug("Prepended {}, frames={}", entry, frames.size());
86 }
87 }
88 if (fail)
89 closed(entry, new ClosedChannelException());
90 return !fail;
91 }
92
93 public boolean append(Entry entry)
94 {
95 boolean fail = false;
96 synchronized (this)
97 {
98 if (isClosed())
99 {
100 fail = true;
101 }
102 else
103 {
104 frames.offer(entry);
105 if (LOG.isDebugEnabled())
106 LOG.debug("Appended {}, frames={}", entry, frames.size());
107 }
108 }
109 if (fail)
110 closed(entry, new ClosedChannelException());
111 return !fail;
112 }
113
114 private Entry remove(int index)
115 {
116 synchronized (this)
117 {
118 if (index == 0)
119 return frames.pollUnsafe();
120 else
121 return frames.remove(index);
122 }
123 }
124
125 public int getQueueSize()
126 {
127 synchronized (this)
128 {
129 return frames.size();
130 }
131 }
132
133 @Override
134 protected Action process() throws Exception
135 {
136 if (LOG.isDebugEnabled())
137 LOG.debug("Flushing {}", session);
138
139 synchronized (this)
140 {
141
142
143 while (!windows.isEmpty())
144 {
145 WindowEntry entry = windows.poll();
146 entry.perform();
147 }
148
149
150
151
152 int sessionWindow = session.getSendWindow();
153
154 int index = 0;
155 int size = frames.size();
156 while (index < size)
157 {
158 Entry entry = frames.get(index);
159 IStream stream = entry.stream;
160
161
162 if (stream != null && stream.isReset() && !entry.isProtocol())
163 {
164 remove(index);
165 --size;
166 resets.add(entry);
167 if (LOG.isDebugEnabled())
168 LOG.debug("Gathered for reset {}", entry);
169 continue;
170 }
171
172
173 int remaining = entry.dataRemaining();
174 if (remaining > 0)
175 {
176 FlowControlStrategy flowControl = session.getFlowControlStrategy();
177 if (sessionWindow <= 0)
178 {
179 flowControl.onSessionStalled(session);
180 ++index;
181
182 continue;
183 }
184
185 if (stream != null)
186 {
187
188 Integer streamWindow = streams.get(stream);
189 if (streamWindow == null)
190 {
191 streamWindow = stream.updateSendWindow(0);
192 streams.put(stream, streamWindow);
193 }
194
195
196 if (streamWindow <= 0)
197 {
198 flowControl.onStreamStalled(stream);
199 ++index;
200
201 continue;
202 }
203 }
204
205
206 sessionWindow -= remaining;
207 if (stream != null)
208 streams.put(stream, streams.get(stream) - remaining);
209 }
210
211
212 remove(index);
213 --size;
214 actives.add(entry);
215
216 if (LOG.isDebugEnabled())
217 LOG.debug("Gathered for write {}", entry);
218 }
219 streams.clear();
220 }
221
222
223 for (int i = 0; i < resets.size(); ++i)
224 {
225 Entry entry = resets.get(i);
226 entry.reset();
227 }
228 resets.clear();
229
230 if (actives.isEmpty())
231 {
232 if (isClosed())
233 abort(new ClosedChannelException());
234
235 if (LOG.isDebugEnabled())
236 LOG.debug("Flushed {}", session);
237
238 return Action.IDLE;
239 }
240
241 for (int i = 0; i < actives.size(); ++i)
242 {
243 Entry entry = actives.get(i);
244 Throwable failure = entry.generate(lease);
245 if (failure != null)
246 {
247
248 failed(failure);
249 return Action.SUCCEEDED;
250 }
251 }
252
253 List<ByteBuffer> byteBuffers = lease.getByteBuffers();
254 if (LOG.isDebugEnabled())
255 LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
256 session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
257 return Action.SCHEDULED;
258 }
259
260 @Override
261 public void succeeded()
262 {
263 lease.recycle();
264
265
266 for (int i = 0; i < actives.size(); ++i)
267 completes.add(actives.get(i));
268 actives.clear();
269
270 if (LOG.isDebugEnabled())
271 LOG.debug("Written {} frames for {}", completes.size(), completes);
272
273
274 while (!completes.isEmpty())
275 {
276 Entry entry = completes.poll();
277 entry.succeeded();
278 }
279
280 super.succeeded();
281 }
282
283 @Override
284 protected void onCompleteSuccess()
285 {
286 throw new IllegalStateException();
287 }
288
289 @Override
290 protected void onCompleteFailure(Throwable x)
291 {
292 if (LOG.isDebugEnabled())
293 LOG.debug("Failed", x);
294
295 lease.recycle();
296
297
298 for (int i = 0; i < actives.size(); ++i)
299 completes.add(actives.get(i));
300 actives.clear();
301
302
303 while (!completes.isEmpty())
304 {
305 Entry entry = completes.poll();
306 entry.failed(x);
307 }
308
309 abort(x);
310 }
311
312 private void abort(Throwable x)
313 {
314 Queue<Entry> queued;
315 synchronized (this)
316 {
317 queued = new ArrayDeque<>(frames);
318 frames.clear();
319 }
320
321 if (LOG.isDebugEnabled())
322 LOG.debug("Aborting, queued={}", queued.size());
323
324 for (Entry entry : queued)
325 closed(entry, x);
326
327 session.abort(x);
328 }
329
330 private void closed(Entry entry, Throwable failure)
331 {
332 entry.failed(failure);
333 }
334
335 public static abstract class Entry implements Callback
336 {
337 protected final Frame frame;
338 protected final IStream stream;
339 protected final Callback callback;
340
341 protected Entry(Frame frame, IStream stream, Callback callback)
342 {
343 this.frame = frame;
344 this.stream = stream;
345 this.callback = callback;
346 }
347
348 public int dataRemaining()
349 {
350 return 0;
351 }
352
353 public Throwable generate(ByteBufferPool.Lease lease)
354 {
355 return null;
356 }
357
358 public void reset()
359 {
360 failed(new EofException("reset"));
361 }
362
363 @Override
364 public void failed(Throwable x)
365 {
366 if (stream != null)
367 {
368 stream.close();
369 stream.getSession().removeStream(stream, true);
370 }
371 callback.failed(x);
372 }
373
374 public boolean isProtocol()
375 {
376 switch (frame.getType())
377 {
378 case PRIORITY:
379 case RST_STREAM:
380 case GO_AWAY:
381 case WINDOW_UPDATE:
382 case DISCONNECT:
383 return true;
384 default:
385 return false;
386 }
387 }
388
389 @Override
390 public String toString()
391 {
392 return frame.toString();
393 }
394 }
395
396 private class WindowEntry
397 {
398 private final IStream stream;
399 private final WindowUpdateFrame frame;
400
401 public WindowEntry(IStream stream, WindowUpdateFrame frame)
402 {
403 this.stream = stream;
404 this.frame = frame;
405 }
406
407 public void perform()
408 {
409 FlowControlStrategy flowControl = session.getFlowControlStrategy();
410 flowControl.onWindowUpdate(session, stream, frame);
411 }
412 }
413 }