View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.util.Map;
22  import java.util.concurrent.ConcurrentHashMap;
23  import java.util.concurrent.atomic.AtomicInteger;
24  
25  import org.eclipse.jetty.http2.frames.Frame;
26  import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
27  import org.eclipse.jetty.util.Atomics;
28  import org.eclipse.jetty.util.Callback;
29  import org.eclipse.jetty.util.annotation.ManagedAttribute;
30  import org.eclipse.jetty.util.annotation.ManagedObject;
31  
32  /**
33   * <p>A flow control strategy that accumulates updates and emits window control
34   * frames when the accumulated value reaches a threshold.</p>
35   * <p>The sender flow control window is represented in the receiver as two
36   * buckets: a bigger bucket, initially full, that is drained when data is
37   * received, and a smaller bucket, initially empty, that is filled when data is
38   * consumed. Only the smaller bucket can refill the bigger bucket.</p>
39   * <p>The smaller bucket is defined as a fraction of the bigger bucket.</p>
40   * <p>For a more visual representation, see the
41   * <a href="http://en.wikipedia.org/wiki/Shishi-odoshi">rocking bamboo fountain</a>.</p>
42   * <p>The algorithm works in this way.</p>
43   * <p>The initial bigger bucket (BB) capacity is 100, and let's imagine the smaller
44   * bucket (SB) being 40% of the bigger bucket: 40.</p>
45   * <p>The receiver receives a data frame of 60, so now BB=40; the data frame is
46   * passed to the application that consumes 25, so now SB=25. Since SB is not full,
47   * no window control frames are emitted.</p>
48   * <p>The application consumes other 20, so now SB=45. Since SB is full, its 45
49   * are transferred to BB, which is now BB=85, and a window control frame is sent
50   * with delta=45.</p>
51   * <p>The application consumes the remaining 15, so now SB=15, and no window
52   * control frame is emitted.</p>
53   */
54  @ManagedObject
55  public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
56  {
57      private final AtomicInteger maxSessionRecvWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE);
58      private final AtomicInteger sessionLevel = new AtomicInteger();
59      private final Map<IStream, AtomicInteger> streamLevels = new ConcurrentHashMap<>();
60      private float bufferRatio;
61  
62      public BufferingFlowControlStrategy(float bufferRatio)
63      {
64          this(DEFAULT_WINDOW_SIZE, bufferRatio);
65      }
66  
67      public BufferingFlowControlStrategy(int initialStreamSendWindow, float bufferRatio)
68      {
69          super(initialStreamSendWindow);
70          this.bufferRatio = bufferRatio;
71      }
72  
73      @ManagedAttribute("The ratio between the receive buffer and the consume buffer")
74      public float getBufferRatio()
75      {
76          return bufferRatio;
77      }
78  
79      public void setBufferRatio(float bufferRatio)
80      {
81          this.bufferRatio = bufferRatio;
82      }
83  
84      @Override
85      public void onStreamCreated(IStream stream)
86      {
87          super.onStreamCreated(stream);
88          streamLevels.put(stream, new AtomicInteger());
89      }
90  
91      @Override
92      public void onStreamDestroyed(IStream stream)
93      {
94          streamLevels.remove(stream);
95          super.onStreamDestroyed(stream);
96      }
97  
98      @Override
99      public void onDataConsumed(ISession session, IStream stream, int length)
100     {
101         if (length <= 0)
102             return;
103 
104         float ratio = bufferRatio;
105 
106         WindowUpdateFrame windowFrame = null;
107         int level = sessionLevel.addAndGet(length);
108         int maxLevel = (int)(maxSessionRecvWindow.get() * ratio);
109         if (level > maxLevel)
110         {
111             level = sessionLevel.getAndSet(0);
112             session.updateRecvWindow(level);
113             if (LOG.isDebugEnabled())
114                 LOG.debug("Data consumed, updated session recv window by {}/{} for {}", level, maxLevel, session);
115             windowFrame = new WindowUpdateFrame(0, level);
116         }
117         else
118         {
119             if (LOG.isDebugEnabled())
120                 LOG.debug("Data consumed, session recv window level {}/{} for {}", level, maxLevel, session);
121         }
122 
123         Frame[] windowFrames = Frame.EMPTY_ARRAY;
124         if (stream != null)
125         {
126             if (stream.isClosed())
127             {
128                 if (LOG.isDebugEnabled())
129                     LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream);
130             }
131             else
132             {
133                 AtomicInteger streamLevel = streamLevels.get(stream);
134                 if (streamLevel != null)
135                 {
136                     level = streamLevel.addAndGet(length);
137                     maxLevel = (int)(getInitialStreamRecvWindow() * ratio);
138                     if (level > maxLevel)
139                     {
140                         level = streamLevel.getAndSet(0);
141                         stream.updateRecvWindow(level);
142                         if (LOG.isDebugEnabled())
143                             LOG.debug("Data consumed, updated stream recv window by {}/{} for {}", level, maxLevel, stream);
144                         WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
145                         if (windowFrame == null)
146                             windowFrame = frame;
147                         else
148                             windowFrames = new Frame[]{frame};
149                     }
150                     else
151                     {
152                         if (LOG.isDebugEnabled())
153                             LOG.debug("Data consumed, stream recv window level {}/{} for {}", level, maxLevel, session);
154                     }
155                 }
156             }
157         }
158 
159         if (windowFrame != null)
160             session.frames(stream, Callback.NOOP, windowFrame, windowFrames);
161     }
162 
163     @Override
164     public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
165     {
166         super.windowUpdate(session, stream, frame);
167 
168         // Window updates cannot be negative.
169         // The SettingsFrame.INITIAL_WINDOW_SIZE setting
170         // only influences the *stream* window size.
171         // Therefore the session window can only be enlarged,
172         // and here we keep track of its max value.
173 
174         // Updating the max session recv window is done here
175         // so that if a peer decides to send an unilateral
176         // window update to enlarge the session window,
177         // without the corresponding data consumption, here
178         // we can track it.
179         // Note that it is not perfect, since there is a time
180         // window between the session recv window being updated
181         // before the window update frame is sent, and the
182         // invocation of this method: in between data may arrive
183         // and reduce the session recv window size.
184         // But eventually the max value will be seen.
185 
186         // Note that we cannot avoid the time window described
187         // above by updating the session recv window from here
188         // because there is a race between the sender and the
189         // receiver: the sender may receive a window update and
190         // send more data, while this method has not yet been
191         // invoked; when the data is received the session recv
192         // window may become negative and the connection will
193         // be closed (per specification).
194 
195         if (frame.getStreamId() == 0)
196         {
197             int sessionWindow = session.updateRecvWindow(0);
198             Atomics.updateMax(maxSessionRecvWindow, sessionWindow);
199         }
200     }
201 
202     @Override
203     public String toString()
204     {
205         return String.format("%s@%x[ratio=%.2f,sessionStallTime=%dms,streamsStallTime=%dms]",
206                 getClass().getSimpleName(),
207                 hashCode(),
208                 bufferRatio,
209                 getSessionStallTime(),
210                 getStreamsStallTime());
211     }
212 }