View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.nio.ByteBuffer;
22  import java.nio.channels.ClosedChannelException;
23  import java.util.ArrayDeque;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Queue;
29  
30  import org.eclipse.jetty.http2.frames.Frame;
31  import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
32  import org.eclipse.jetty.io.ByteBufferPool;
33  import org.eclipse.jetty.io.EofException;
34  import org.eclipse.jetty.util.ArrayQueue;
35  import org.eclipse.jetty.util.Callback;
36  import org.eclipse.jetty.util.IteratingCallback;
37  import org.eclipse.jetty.util.log.Log;
38  import org.eclipse.jetty.util.log.Logger;
39  
40  public class HTTP2Flusher extends IteratingCallback
41  {
42      private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);
43  
44      private final Queue<WindowEntry> windows = new ArrayDeque<>();
45      private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
46      private final Map<IStream, Integer> streams = new HashMap<>();
47      private final List<Entry> resets = new ArrayList<>();
48      private final List<Entry> actives = new ArrayList<>();
49      private final Queue<Entry> completes = new ArrayDeque<>();
50      private final HTTP2Session session;
51      private final ByteBufferPool.Lease lease;
52  
53      public HTTP2Flusher(HTTP2Session session)
54      {
55          this.session = session;
56          this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
57      }
58  
59      public void window(IStream stream, WindowUpdateFrame frame)
60      {
61          boolean added = false;
62          synchronized (this)
63          {
64              if (!isClosed())
65                  added = windows.offer(new WindowEntry(stream, frame));
66          }
67          // Flush stalled data.
68          if (added)
69              iterate();
70      }
71  
72      public boolean prepend(Entry entry)
73      {
74          boolean fail = false;
75          synchronized (this)
76          {
77              if (isClosed())
78              {
79                  fail = true;
80              }
81              else
82              {
83                  frames.add(0, entry);
84                  if (LOG.isDebugEnabled())
85                      LOG.debug("Prepended {}, frames={}", entry, frames.size());
86              }
87          }
88          if (fail)
89              closed(entry, new ClosedChannelException());
90          return !fail;
91      }
92  
93      public boolean append(Entry entry)
94      {
95          boolean fail = false;
96          synchronized (this)
97          {
98              if (isClosed())
99              {
100                 fail = true;
101             }
102             else
103             {
104                 frames.offer(entry);
105                 if (LOG.isDebugEnabled())
106                     LOG.debug("Appended {}, frames={}", entry, frames.size());
107             }
108         }
109         if (fail)
110             closed(entry, new ClosedChannelException());
111         return !fail;
112     }
113 
114     private Entry remove(int index)
115     {
116         synchronized (this)
117         {
118             if (index == 0)
119                 return frames.pollUnsafe();
120             else
121                 return frames.remove(index);
122         }
123     }
124 
125     public int getQueueSize()
126     {
127         synchronized (this)
128         {
129             return frames.size();
130         }
131     }
132 
133     @Override
134     protected Action process() throws Exception
135     {
136         if (LOG.isDebugEnabled())
137             LOG.debug("Flushing {}", session);
138 
139         synchronized (this)
140         {
141             // First thing, update the window sizes, so we can
142             // reason about the frames to remove from the queue.
143             while (!windows.isEmpty())
144             {
145                 WindowEntry entry = windows.poll();
146                 entry.perform();
147             }
148 
149             // Now the window sizes cannot change.
150             // Window updates that happen concurrently will
151             // be queued and processed on the next iteration.
152             int sessionWindow = session.getSendWindow();
153 
154             int index = 0;
155             int size = frames.size();
156             while (index < size)
157             {
158                 Entry entry = frames.get(index);
159                 IStream stream = entry.stream;
160 
161                 // If the stream has been reset, don't send the frame.
162                 if (stream != null && stream.isReset() && !entry.isProtocol())
163                 {
164                     remove(index);
165                     --size;
166                     resets.add(entry);
167                     if (LOG.isDebugEnabled())
168                         LOG.debug("Gathered for reset {}", entry);
169                     continue;
170                 }
171 
172                 // Check if the frame fits in the flow control windows.
173                 int remaining = entry.dataRemaining();
174                 if (remaining > 0)
175                 {
176                     FlowControlStrategy flowControl = session.getFlowControlStrategy();
177                     if (sessionWindow <= 0)
178                     {
179                         flowControl.onSessionStalled(session);
180                         ++index;
181                         // There may be *non* flow controlled frames to send.
182                         continue;
183                     }
184 
185                     if (stream != null)
186                     {
187                         // The stream may have a smaller window than the session.
188                         Integer streamWindow = streams.get(stream);
189                         if (streamWindow == null)
190                         {
191                             streamWindow = stream.updateSendWindow(0);
192                             streams.put(stream, streamWindow);
193                         }
194 
195                         // Is it a frame belonging to an already stalled stream ?
196                         if (streamWindow <= 0)
197                         {
198                             flowControl.onStreamStalled(stream);
199                             ++index;
200                             // There may be *non* flow controlled frames to send.
201                             continue;
202                         }
203                     }
204 
205                     // The frame fits both flow control windows, reduce them.
206                     sessionWindow -= remaining;
207                     if (stream != null)
208                         streams.put(stream, streams.get(stream) - remaining);
209                 }
210 
211                 // The frame will be written, remove it from the queue.
212                 remove(index);
213                 --size;
214                 actives.add(entry);
215 
216                 if (LOG.isDebugEnabled())
217                     LOG.debug("Gathered for write {}", entry);
218             }
219             streams.clear();
220         }
221 
222         // Perform resets outside the sync block.
223         for (int i = 0; i < resets.size(); ++i)
224         {
225             Entry entry = resets.get(i);
226             entry.reset();
227         }
228         resets.clear();
229 
230         if (actives.isEmpty())
231         {
232             if (isClosed())
233                 abort(new ClosedChannelException());
234 
235             if (LOG.isDebugEnabled())
236                 LOG.debug("Flushed {}", session);
237 
238             return Action.IDLE;
239         }
240 
241         for (int i = 0; i < actives.size(); ++i)
242         {
243             Entry entry = actives.get(i);
244             Throwable failure = entry.generate(lease);
245             if (failure != null)
246             {
247                 // Failure to generate the entry is catastrophic.
248                 failed(failure);
249                 return Action.SUCCEEDED;
250             }
251         }
252 
253         List<ByteBuffer> byteBuffers = lease.getByteBuffers();
254         if (LOG.isDebugEnabled())
255             LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
256         session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
257         return Action.SCHEDULED;
258     }
259 
260     @Override
261     public void succeeded()
262     {
263         lease.recycle();
264 
265         // Transfer active items to avoid reentrancy.
266         for (int i = 0; i < actives.size(); ++i)
267             completes.add(actives.get(i));
268         actives.clear();
269 
270         if (LOG.isDebugEnabled())
271             LOG.debug("Written {} frames for {}", completes.size(), completes);
272 
273         // Drain the frames one by one to avoid reentrancy.
274         while (!completes.isEmpty())
275         {
276             Entry entry = completes.poll();
277             entry.succeeded();
278         }
279 
280         super.succeeded();
281     }
282 
283     @Override
284     protected void onCompleteSuccess()
285     {
286         throw new IllegalStateException();
287     }
288 
289     @Override
290     protected void onCompleteFailure(Throwable x)
291     {
292         if (LOG.isDebugEnabled())
293             LOG.debug("Failed", x);
294 
295         lease.recycle();
296 
297         // Transfer active items to avoid reentrancy.
298         for (int i = 0; i < actives.size(); ++i)
299             completes.add(actives.get(i));
300         actives.clear();
301 
302         // Drain the frames one by one to avoid reentrancy.
303         while (!completes.isEmpty())
304         {
305             Entry entry = completes.poll();
306             entry.failed(x);
307         }
308 
309         abort(x);
310     }
311 
312     private void abort(Throwable x)
313     {
314         Queue<Entry> queued;
315         synchronized (this)
316         {
317             queued = new ArrayDeque<>(frames);
318             frames.clear();
319         }
320 
321         if (LOG.isDebugEnabled())
322             LOG.debug("Aborting, queued={}", queued.size());
323 
324         for (Entry entry : queued)
325             closed(entry, x);
326 
327         session.abort(x);
328     }
329 
330     private void closed(Entry entry, Throwable failure)
331     {
332         entry.failed(failure);
333     }
334 
335     public static abstract class Entry implements Callback
336     {
337         protected final Frame frame;
338         protected final IStream stream;
339         protected final Callback callback;
340 
341         protected Entry(Frame frame, IStream stream, Callback callback)
342         {
343             this.frame = frame;
344             this.stream = stream;
345             this.callback = callback;
346         }
347 
348         public int dataRemaining()
349         {
350             return 0;
351         }
352 
353         public Throwable generate(ByteBufferPool.Lease lease)
354         {
355             return null;
356         }
357 
358         public void reset()
359         {
360             failed(new EofException("reset"));
361         }
362 
363         @Override
364         public void failed(Throwable x)
365         {
366             if (stream != null)
367             {
368                 stream.close();
369                 stream.getSession().removeStream(stream, true);
370             }
371             callback.failed(x);
372         }
373 
374         public boolean isProtocol()
375         {
376             switch (frame.getType())
377             {
378                 case PRIORITY:
379                 case RST_STREAM:
380                 case GO_AWAY:
381                 case WINDOW_UPDATE:
382                 case DISCONNECT:
383                     return true;
384                 default:
385                     return false;
386             }
387         }
388 
389         @Override
390         public String toString()
391         {
392             return frame.toString();
393         }
394     }
395 
396     private class WindowEntry
397     {
398         private final IStream stream;
399         private final WindowUpdateFrame frame;
400 
401         public WindowEntry(IStream stream, WindowUpdateFrame frame)
402         {
403             this.stream = stream;
404             this.frame = frame;
405         }
406 
407         public void perform()
408         {
409             FlowControlStrategy flowControl = session.getFlowControlStrategy();
410             flowControl.onWindowUpdate(session, stream, frame);
411         }
412     }
413 }