View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.nio.ByteBuffer;
22  import java.nio.channels.ClosedChannelException;
23  import java.util.ArrayDeque;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Queue;
29  
30  import org.eclipse.jetty.http2.frames.Frame;
31  import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
32  import org.eclipse.jetty.io.ByteBufferPool;
33  import org.eclipse.jetty.io.EofException;
34  import org.eclipse.jetty.util.ArrayQueue;
35  import org.eclipse.jetty.util.Callback;
36  import org.eclipse.jetty.util.IteratingCallback;
37  import org.eclipse.jetty.util.log.Log;
38  import org.eclipse.jetty.util.log.Logger;
39  
40  public class HTTP2Flusher extends IteratingCallback
41  {
42      private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);
43  
44      private final Queue<WindowEntry> windows = new ArrayDeque<>();
45      private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
46      private final Map<IStream, Integer> streams = new HashMap<>();
47      private final List<Entry> resets = new ArrayList<>();
48      private final List<Entry> actives = new ArrayList<>();
49      private final Queue<Entry> completes = new ArrayDeque<>();
50      private final HTTP2Session session;
51      private final ByteBufferPool.Lease lease;
52  
53      public HTTP2Flusher(HTTP2Session session)
54      {
55          this.session = session;
56          this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
57      }
58  
59      public void window(IStream stream, WindowUpdateFrame frame)
60      {
61          boolean added = false;
62          synchronized (this)
63          {
64              if (!isClosed())
65                  added = windows.offer(new WindowEntry(stream, frame));
66          }
67          // Flush stalled data.
68          if (added)
69              iterate();
70      }
71  
72      public boolean prepend(Entry entry)
73      {
74          boolean fail = false;
75          synchronized (this)
76          {
77              if (isClosed())
78              {
79                  fail = true;
80              }
81              else
82              {
83                  frames.add(0, entry);
84                  if (LOG.isDebugEnabled())
85                      LOG.debug("Prepended {}, frames={}", entry, frames.size());
86              }
87          }
88          if (fail)
89              closed(entry, new ClosedChannelException());
90          return !fail;
91      }
92  
93      public boolean append(Entry entry)
94      {
95          boolean fail = false;
96          synchronized (this)
97          {
98              if (isClosed())
99              {
100                 fail = true;
101             }
102             else
103             {
104                 frames.offer(entry);
105                 if (LOG.isDebugEnabled())
106                     LOG.debug("Appended {}, frames={}", entry, frames.size());
107             }
108         }
109         if (fail)
110             closed(entry, new ClosedChannelException());
111         return !fail;
112     }
113 
114     private Entry remove(int index)
115     {
116         synchronized (this)
117         {
118             if (index == 0)
119                 return frames.pollUnsafe();
120             else
121                 return frames.remove(index);
122         }
123     }
124 
125     public int getQueueSize()
126     {
127         synchronized (this)
128         {
129             return frames.size();
130         }
131     }
132 
133     @Override
134     protected Action process() throws Exception
135     {
136         if (LOG.isDebugEnabled())
137             LOG.debug("Flushing {}", session);
138 
139         synchronized (this)
140         {
141             // First thing, update the window sizes, so we can
142             // reason about the frames to remove from the queue.
143             while (!windows.isEmpty())
144             {
145                 WindowEntry entry = windows.poll();
146                 entry.perform();
147             }
148 
149             // Now the window sizes cannot change.
150             // Window updates that happen concurrently will
151             // be queued and processed on the next iteration.
152             int sessionWindow = session.getSendWindow();
153 
154             int index = 0;
155             int size = frames.size();
156             while (index < size)
157             {
158                 Entry entry = frames.get(index);
159                 IStream stream = entry.stream;
160 
161                 // If the stream has been reset, don't send the frame.
162                 if (stream != null && stream.isReset() && !entry.isProtocol())
163                 {
164                     remove(index);
165                     --size;
166                     resets.add(entry);
167                     if (LOG.isDebugEnabled())
168                         LOG.debug("Gathered for reset {}", entry);
169                     continue;
170                 }
171 
172                 // Check if the frame fits in the flow control windows.
173                 int remaining = entry.dataRemaining();
174                 if (remaining > 0)
175                 {
176                     if (sessionWindow <= 0)
177                     {
178                         ++index;
179                         // There may be *non* flow controlled frames to send.
180                         continue;
181                     }
182 
183                     if (stream != null)
184                     {
185                         // The stream may have a smaller window than the session.
186                         Integer streamWindow = streams.get(stream);
187                         if (streamWindow == null)
188                         {
189                             streamWindow = stream.updateSendWindow(0);
190                             streams.put(stream, streamWindow);
191                         }
192 
193                         // Is it a frame belonging to an already stalled stream ?
194                         if (streamWindow <= 0)
195                         {
196                             ++index;
197                             // There may be *non* flow controlled frames to send.
198                             continue;
199                         }
200                     }
201 
202                     // The frame fits both flow control windows, reduce them.
203                     sessionWindow -= remaining;
204                     if (stream != null)
205                         streams.put(stream, streams.get(stream) - remaining);
206                 }
207 
208                 // The frame will be written, remove it from the queue.
209                 remove(index);
210                 --size;
211                 actives.add(entry);
212 
213                 if (LOG.isDebugEnabled())
214                     LOG.debug("Gathered for write {}", entry);
215             }
216             streams.clear();
217         }
218 
219         // Perform resets outside the sync block.
220         for (int i = 0; i < resets.size(); ++i)
221         {
222             Entry entry = resets.get(i);
223             entry.reset();
224         }
225         resets.clear();
226 
227         if (actives.isEmpty())
228         {
229             if (isClosed())
230                 fail(new ClosedChannelException(), true);
231 
232             if (LOG.isDebugEnabled())
233                 LOG.debug("Flushed {}", session);
234 
235             return Action.IDLE;
236         }
237 
238         for (int i = 0; i < actives.size(); ++i)
239         {
240             Entry entry = actives.get(i);
241             Throwable failure = entry.generate(lease);
242             if (failure != null)
243             {
244                 // Failure to generate the entry is catastrophic.
245                 failed(failure);
246                 return Action.SUCCEEDED;
247             }
248         }
249 
250         List<ByteBuffer> byteBuffers = lease.getByteBuffers();
251         if (LOG.isDebugEnabled())
252             LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
253         session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
254         return Action.SCHEDULED;
255     }
256 
257     @Override
258     public void succeeded()
259     {
260         lease.recycle();
261 
262         // Transfer active items to avoid reentrancy.
263         for (int i = 0; i < actives.size(); ++i)
264             completes.add(actives.get(i));
265         actives.clear();
266 
267         if (LOG.isDebugEnabled())
268             LOG.debug("Written {} frames for {}", completes.size(), completes);
269 
270         // Drain the frames one by one to avoid reentrancy.
271         while (!completes.isEmpty())
272         {
273             Entry entry = completes.poll();
274             entry.succeeded();
275         }
276 
277         super.succeeded();
278     }
279 
280     @Override
281     protected void onCompleteSuccess()
282     {
283         throw new IllegalStateException();
284     }
285 
286     @Override
287     protected void onCompleteFailure(Throwable x)
288     {
289         lease.recycle();
290 
291         // Transfer active items to avoid reentrancy.
292         for (int i = 0; i < actives.size(); ++i)
293             completes.add(actives.get(i));
294         actives.clear();
295 
296         // Drain the frames one by one to avoid reentrancy.
297         while (!completes.isEmpty())
298         {
299             Entry entry = completes.poll();
300             entry.failed(x);
301         }
302 
303         fail(x, isClosed());
304     }
305 
306     private void fail(Throwable x, boolean closed)
307     {
308         Queue<Entry> queued;
309         synchronized (this)
310         {
311             queued = new ArrayDeque<>(frames);
312             frames.clear();
313         }
314 
315         if (LOG.isDebugEnabled())
316             LOG.debug("{}, queued={}", closed ? "Closing" : "Failing", queued.size());
317 
318         for (Entry entry : queued)
319             entry.failed(x);
320 
321         if (!closed)
322             session.abort(x);
323     }
324 
325     private void closed(Entry entry, Throwable failure)
326     {
327         entry.failed(failure);
328     }
329 
330     public static abstract class Entry implements Callback
331     {
332         protected final Frame frame;
333         protected final IStream stream;
334         protected final Callback callback;
335 
336         protected Entry(Frame frame, IStream stream, Callback callback)
337         {
338             this.frame = frame;
339             this.stream = stream;
340             this.callback = callback;
341         }
342 
343         public int dataRemaining()
344         {
345             return 0;
346         }
347 
348         public Throwable generate(ByteBufferPool.Lease lease)
349         {
350             return null;
351         }
352 
353         public void reset()
354         {
355             failed(new EofException("reset"));
356         }
357 
358         @Override
359         public void failed(Throwable x)
360         {
361             if (stream != null)
362             {
363                 stream.close();
364                 stream.getSession().removeStream(stream);
365             }
366             callback.failed(x);
367         }
368 
369         public boolean isProtocol()
370         {
371             switch (frame.getType())
372             {
373                 case PRIORITY:
374                 case RST_STREAM:
375                 case GO_AWAY:
376                 case WINDOW_UPDATE:
377                 case DISCONNECT:
378                     return true;
379                 default:
380                     return false;
381             }
382         }
383 
384         @Override
385         public String toString()
386         {
387             return frame.toString();
388         }
389     }
390 
391     private class WindowEntry
392     {
393         private final IStream stream;
394         private final WindowUpdateFrame frame;
395 
396         public WindowEntry(IStream stream, WindowUpdateFrame frame)
397         {
398             this.stream = stream;
399             this.frame = frame;
400         }
401 
402         public void perform()
403         {
404             FlowControlStrategy flowControl = session.getFlowControlStrategy();
405             flowControl.onWindowUpdate(session, stream, frame);
406         }
407     }
408 }