View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.util.Map;
22  import java.util.concurrent.ConcurrentHashMap;
23  import java.util.concurrent.atomic.AtomicInteger;
24  
25  import org.eclipse.jetty.http2.frames.Frame;
26  import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
27  import org.eclipse.jetty.util.Atomics;
28  import org.eclipse.jetty.util.Callback;
29  
30  /**
31   * <p>A flow control strategy that accumulates updates and emits window control
32   * frames when the accumulated value reaches a threshold.</p>
33   * <p>The sender flow control window is represented in the receiver as two
34   * buckets: a bigger bucket, initially full, that is drained when data is
35   * received, and a smaller bucket, initially empty, that is filled when data is
36   * consumed. Only the smaller bucket can refill the bigger bucket.</p>
37   * <p>The smaller bucket is defined as a fraction of the bigger bucket.</p>
38   * <p>For a more visual representation, see the
39   * <a href="http://en.wikipedia.org/wiki/Shishi-odoshi">rocking bamboo fountain</a>.</p>
40   * <p>The algorithm works in this way.</p>
41   * <p>The initial bigger bucket (BB) capacity is 100, and let's imagine the smaller
42   * bucket (SB) being 40% of the bigger bucket: 40.</p>
43   * <p>The receiver receives a data frame of 60, so now BB=40; the data frame is
44   * passed to the application that consumes 25, so now SB=25. Since SB is not full,
45   * no window control frames are emitted.</p>
46   * <p>The application consumes other 20, so now SB=45. Since SB is full, its 45
47   * are transferred to BB, which is now BB=85, and a window control frame is sent
48   * with delta=45.</p>
49   * <p>The application consumes the remaining 15, so now SB=15, and no window
50   * control frame is emitted.</p>
51   */
52  public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
53  {
54      private final AtomicInteger maxSessionRecvWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE);
55      private final AtomicInteger sessionLevel = new AtomicInteger();
56      private final Map<IStream, AtomicInteger> streamLevels = new ConcurrentHashMap<>();
57      private final float bufferRatio;
58  
59      public BufferingFlowControlStrategy(float bufferRatio)
60      {
61          this(DEFAULT_WINDOW_SIZE, bufferRatio);
62      }
63  
64      public BufferingFlowControlStrategy(int initialStreamSendWindow, float bufferRatio)
65      {
66          super(initialStreamSendWindow);
67          this.bufferRatio = bufferRatio;
68      }
69  
70      @Override
71      public void onStreamCreated(IStream stream, boolean local)
72      {
73          super.onStreamCreated(stream, local);
74          streamLevels.put(stream, new AtomicInteger());
75      }
76  
77      @Override
78      public void onStreamDestroyed(IStream stream, boolean local)
79      {
80          streamLevels.remove(stream);
81          super.onStreamDestroyed(stream, local);
82      }
83  
84      @Override
85      public void onDataConsumed(ISession session, IStream stream, int length)
86      {
87          if (length <= 0)
88              return;
89  
90          WindowUpdateFrame windowFrame = null;
91          int level = sessionLevel.addAndGet(length);
92          int maxLevel = (int)(maxSessionRecvWindow.get() * bufferRatio);
93          if (level > maxLevel)
94          {
95              level = sessionLevel.getAndSet(0);
96              session.updateRecvWindow(level);
97              if (LOG.isDebugEnabled())
98                  LOG.debug("Data consumed, updated session recv window by {}/{} for {}", level, maxLevel, session);
99              windowFrame = new WindowUpdateFrame(0, level);
100         }
101         else
102         {
103             if (LOG.isDebugEnabled())
104                 LOG.debug("Data consumed, session recv window level {}/{} for {}", level, maxLevel, session);
105         }
106 
107         Frame[] windowFrames = Frame.EMPTY_ARRAY;
108         if (stream != null)
109         {
110             if (stream.isClosed())
111             {
112                 if (LOG.isDebugEnabled())
113                     LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream);
114             }
115             else
116             {
117                 AtomicInteger streamLevel = streamLevels.get(stream);
118                 if (streamLevel != null)
119                 {
120                     level = streamLevel.addAndGet(length);
121                     maxLevel = (int)(getInitialStreamRecvWindow() * bufferRatio);
122                     if (level > maxLevel)
123                     {
124                         level = streamLevel.getAndSet(0);
125                         stream.updateRecvWindow(level);
126                         if (LOG.isDebugEnabled())
127                             LOG.debug("Data consumed, updated stream recv window by {}/{} for {}", level, maxLevel, stream);
128                         WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
129                         if (windowFrame == null)
130                             windowFrame = frame;
131                         else
132                             windowFrames = new Frame[]{frame};
133                     }
134                     else
135                     {
136                         if (LOG.isDebugEnabled())
137                             LOG.debug("Data consumed, stream recv window level {}/{} for {}", level, maxLevel, session);
138                     }
139                 }
140             }
141         }
142 
143         if (windowFrame != null)
144             session.frames(stream, Callback.NOOP, windowFrame, windowFrames);
145     }
146 
147     @Override
148     public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
149     {
150         super.windowUpdate(session, stream, frame);
151 
152         // Window updates cannot be negative.
153         // The SettingsFrame.INITIAL_WINDOW_SIZE setting
154         // only influences the *stream* window size.
155         // Therefore the session window can only be enlarged,
156         // and here we keep track of its max value.
157 
158         // Updating the max session recv window is done here
159         // so that if a peer decides to send an unilateral
160         // window update to enlarge the session window,
161         // without the corresponding data consumption, here
162         // we can track it.
163         // Note that it is not perfect, since there is a time
164         // window between the session recv window being updated
165         // before the window update frame is sent, and the
166         // invocation of this method: in between data may arrive
167         // and reduce the session recv window size.
168         // But eventually the max value will be seen.
169 
170         // Note that we cannot avoid the time window described
171         // above by updating the session recv window from here
172         // because there is a race between the sender and the
173         // receiver: the sender may receive a window update and
174         // send more data, while this method has not yet been
175         // invoked; when the data is received the session recv
176         // window may become negative and the connection will
177         // be closed (per specification).
178 
179         if (frame.getStreamId() == 0)
180         {
181             int sessionWindow = session.updateRecvWindow(0);
182             Atomics.updateMax(maxSessionRecvWindow, sessionWindow);
183         }
184     }
185 }