1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.nio.ByteBuffer;
22 import java.nio.channels.ClosedChannelException;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Queue;
29
30 import org.eclipse.jetty.http2.frames.Frame;
31 import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
32 import org.eclipse.jetty.io.ByteBufferPool;
33 import org.eclipse.jetty.io.EofException;
34 import org.eclipse.jetty.util.ArrayQueue;
35 import org.eclipse.jetty.util.Callback;
36 import org.eclipse.jetty.util.IteratingCallback;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.log.Logger;
39
40 public class HTTP2Flusher extends IteratingCallback
41 {
42 private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);
43
44 private final Queue<WindowEntry> windows = new ArrayDeque<>();
45 private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
46 private final Map<IStream, Integer> streams = new HashMap<>();
47 private final List<Entry> resets = new ArrayList<>();
48 private final List<Entry> actives = new ArrayList<>();
49 private final Queue<Entry> completes = new ArrayDeque<>();
50 private final HTTP2Session session;
51 private final ByteBufferPool.Lease lease;
52
53 public HTTP2Flusher(HTTP2Session session)
54 {
55 this.session = session;
56 this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
57 }
58
59 public void window(IStream stream, WindowUpdateFrame frame)
60 {
61 boolean added = false;
62 synchronized (this)
63 {
64 if (!isClosed())
65 added = windows.offer(new WindowEntry(stream, frame));
66 }
67
68 if (added)
69 iterate();
70 }
71
72 public boolean prepend(Entry entry)
73 {
74 boolean fail = false;
75 synchronized (this)
76 {
77 if (isClosed())
78 {
79 fail = true;
80 }
81 else
82 {
83 frames.add(0, entry);
84 if (LOG.isDebugEnabled())
85 LOG.debug("Prepended {}, frames={}", entry, frames.size());
86 }
87 }
88 if (fail)
89 closed(entry, new ClosedChannelException());
90 return !fail;
91 }
92
93 public boolean append(Entry entry)
94 {
95 boolean fail = false;
96 synchronized (this)
97 {
98 if (isClosed())
99 {
100 fail = true;
101 }
102 else
103 {
104 frames.offer(entry);
105 if (LOG.isDebugEnabled())
106 LOG.debug("Appended {}, frames={}", entry, frames.size());
107 }
108 }
109 if (fail)
110 closed(entry, new ClosedChannelException());
111 return !fail;
112 }
113
114 private Entry remove(int index)
115 {
116 synchronized (this)
117 {
118 if (index == 0)
119 return frames.pollUnsafe();
120 else
121 return frames.remove(index);
122 }
123 }
124
125 public int getQueueSize()
126 {
127 synchronized (this)
128 {
129 return frames.size();
130 }
131 }
132
133 @Override
134 protected Action process() throws Exception
135 {
136 if (LOG.isDebugEnabled())
137 LOG.debug("Flushing {}", session);
138
139 synchronized (this)
140 {
141
142
143 while (!windows.isEmpty())
144 {
145 WindowEntry entry = windows.poll();
146 entry.perform();
147 }
148
149
150
151
152 int sessionWindow = session.getSendWindow();
153
154 int index = 0;
155 int size = frames.size();
156 while (index < size)
157 {
158 Entry entry = frames.get(index);
159 IStream stream = entry.stream;
160
161
162 if (stream != null && stream.isReset() && !entry.isProtocol())
163 {
164 remove(index);
165 --size;
166 resets.add(entry);
167 if (LOG.isDebugEnabled())
168 LOG.debug("Gathered for reset {}", entry);
169 continue;
170 }
171
172
173 int remaining = entry.dataRemaining();
174 if (remaining > 0)
175 {
176 if (sessionWindow <= 0)
177 {
178 ++index;
179
180 continue;
181 }
182
183 if (stream != null)
184 {
185
186 Integer streamWindow = streams.get(stream);
187 if (streamWindow == null)
188 {
189 streamWindow = stream.updateSendWindow(0);
190 streams.put(stream, streamWindow);
191 }
192
193
194 if (streamWindow <= 0)
195 {
196 ++index;
197
198 continue;
199 }
200 }
201
202
203 sessionWindow -= remaining;
204 if (stream != null)
205 streams.put(stream, streams.get(stream) - remaining);
206 }
207
208
209 remove(index);
210 --size;
211 actives.add(entry);
212
213 if (LOG.isDebugEnabled())
214 LOG.debug("Gathered for write {}", entry);
215 }
216 streams.clear();
217 }
218
219
220 for (int i = 0; i < resets.size(); ++i)
221 {
222 Entry entry = resets.get(i);
223 entry.reset();
224 }
225 resets.clear();
226
227 if (actives.isEmpty())
228 {
229 if (isClosed())
230 fail(new ClosedChannelException(), true);
231
232 if (LOG.isDebugEnabled())
233 LOG.debug("Flushed {}", session);
234
235 return Action.IDLE;
236 }
237
238 for (int i = 0; i < actives.size(); ++i)
239 {
240 Entry entry = actives.get(i);
241 Throwable failure = entry.generate(lease);
242 if (failure != null)
243 {
244
245 failed(failure);
246 return Action.SUCCEEDED;
247 }
248 }
249
250 List<ByteBuffer> byteBuffers = lease.getByteBuffers();
251 if (LOG.isDebugEnabled())
252 LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
253 session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
254 return Action.SCHEDULED;
255 }
256
257 @Override
258 public void succeeded()
259 {
260 lease.recycle();
261
262
263 for (int i = 0; i < actives.size(); ++i)
264 completes.add(actives.get(i));
265 actives.clear();
266
267 if (LOG.isDebugEnabled())
268 LOG.debug("Written {} frames for {}", completes.size(), completes);
269
270
271 while (!completes.isEmpty())
272 {
273 Entry entry = completes.poll();
274 entry.succeeded();
275 }
276
277 super.succeeded();
278 }
279
280 @Override
281 protected void onCompleteSuccess()
282 {
283 throw new IllegalStateException();
284 }
285
286 @Override
287 protected void onCompleteFailure(Throwable x)
288 {
289 lease.recycle();
290
291
292 for (int i = 0; i < actives.size(); ++i)
293 completes.add(actives.get(i));
294 actives.clear();
295
296
297 while (!completes.isEmpty())
298 {
299 Entry entry = completes.poll();
300 entry.failed(x);
301 }
302
303 fail(x, isClosed());
304 }
305
306 private void fail(Throwable x, boolean closed)
307 {
308 Queue<Entry> queued;
309 synchronized (this)
310 {
311 queued = new ArrayDeque<>(frames);
312 frames.clear();
313 }
314
315 if (LOG.isDebugEnabled())
316 LOG.debug("{}, queued={}", closed ? "Closing" : "Failing", queued.size());
317
318 for (Entry entry : queued)
319 entry.failed(x);
320
321 if (!closed)
322 session.abort(x);
323 }
324
325 private void closed(Entry entry, Throwable failure)
326 {
327 entry.failed(failure);
328 }
329
330 public static abstract class Entry implements Callback
331 {
332 protected final Frame frame;
333 protected final IStream stream;
334 protected final Callback callback;
335
336 protected Entry(Frame frame, IStream stream, Callback callback)
337 {
338 this.frame = frame;
339 this.stream = stream;
340 this.callback = callback;
341 }
342
343 public int dataRemaining()
344 {
345 return 0;
346 }
347
348 public Throwable generate(ByteBufferPool.Lease lease)
349 {
350 return null;
351 }
352
353 public void reset()
354 {
355 failed(new EofException("reset"));
356 }
357
358 @Override
359 public void failed(Throwable x)
360 {
361 if (stream != null)
362 {
363 stream.close();
364 stream.getSession().removeStream(stream);
365 }
366 callback.failed(x);
367 }
368
369 public boolean isProtocol()
370 {
371 switch (frame.getType())
372 {
373 case PRIORITY:
374 case RST_STREAM:
375 case GO_AWAY:
376 case WINDOW_UPDATE:
377 case DISCONNECT:
378 return true;
379 default:
380 return false;
381 }
382 }
383
384 @Override
385 public String toString()
386 {
387 return frame.toString();
388 }
389 }
390
391 private class WindowEntry
392 {
393 private final IStream stream;
394 private final WindowUpdateFrame frame;
395
396 public WindowEntry(IStream stream, WindowUpdateFrame frame)
397 {
398 this.stream = stream;
399 this.frame = frame;
400 }
401
402 public void perform()
403 {
404 FlowControlStrategy flowControl = session.getFlowControlStrategy();
405 flowControl.onWindowUpdate(session, stream, frame);
406 }
407 }
408 }