View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import org.eclipse.jetty.http2.api.Stream;
22  import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
23  import org.eclipse.jetty.util.log.Log;
24  import org.eclipse.jetty.util.log.Logger;
25  
26  public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
27  {
28      protected static final Logger LOG = Log.getLogger(FlowControlStrategy.class);
29  
30      private int initialStreamSendWindow;
31      private int initialStreamRecvWindow;
32  
33      public AbstractFlowControlStrategy(int initialStreamSendWindow)
34      {
35          this.initialStreamSendWindow = initialStreamSendWindow;
36          this.initialStreamRecvWindow = DEFAULT_WINDOW_SIZE;
37      }
38  
39      protected int getInitialStreamSendWindow()
40      {
41          return initialStreamSendWindow;
42      }
43  
44      protected int getInitialStreamRecvWindow()
45      {
46          return initialStreamRecvWindow;
47      }
48  
49      @Override
50      public void onStreamCreated(IStream stream, boolean local)
51      {
52          stream.updateSendWindow(initialStreamSendWindow);
53          stream.updateRecvWindow(initialStreamRecvWindow);
54      }
55  
56      @Override
57      public void onStreamDestroyed(IStream stream, boolean local)
58      {
59      }
60  
61      @Override
62      public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local)
63      {
64          int previousInitialStreamWindow;
65          if (local)
66          {
67              previousInitialStreamWindow = getInitialStreamRecvWindow();
68              this.initialStreamRecvWindow = initialStreamWindow;
69          }
70          else
71          {
72              previousInitialStreamWindow = getInitialStreamSendWindow();
73              this.initialStreamSendWindow = initialStreamWindow;
74          }
75          int delta = initialStreamWindow - previousInitialStreamWindow;
76  
77          // SPEC: updates of the initial window size only affect stream windows, not session's.
78          for (Stream stream : session.getStreams())
79          {
80              if (local)
81              {
82                  ((IStream)stream).updateRecvWindow(delta);
83                  if (LOG.isDebugEnabled())
84                      LOG.debug("Updated initial stream recv window {} -> {} for {}", previousInitialStreamWindow, initialStreamWindow, stream);
85              }
86              else
87              {
88                  session.onWindowUpdate((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
89              }
90          }
91      }
92  
93      @Override
94      public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
95      {
96          int delta = frame.getWindowDelta();
97          if (frame.getStreamId() > 0)
98          {
99              // The stream may have been removed concurrently.
100             if (stream != null)
101             {
102                 int oldSize = stream.updateSendWindow(delta);
103                 if (LOG.isDebugEnabled())
104                     LOG.debug("Updated stream send window {} -> {} for {}", oldSize, oldSize + delta, stream);
105             }
106         }
107         else
108         {
109             int oldSize = session.updateSendWindow(delta);
110             if (LOG.isDebugEnabled())
111                 LOG.debug("Updated session send window {} -> {} for {}", oldSize, oldSize + delta, session);
112         }
113     }
114 
115     @Override
116     public void onDataReceived(ISession session, IStream stream, int length)
117     {
118         int oldSize = session.updateRecvWindow(-length);
119         if (LOG.isDebugEnabled())
120             LOG.debug("Data received, updated session recv window {} -> {} for {}", oldSize, oldSize - length, session);
121 
122         if (stream != null)
123         {
124             oldSize = stream.updateRecvWindow(-length);
125             if (LOG.isDebugEnabled())
126                 LOG.debug("Data received, updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream);
127         }
128     }
129 
130     @Override
131     public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
132     {
133     }
134 
135     @Override
136     public void onDataSending(IStream stream, int length)
137     {
138         if (length == 0)
139             return;
140 
141         ISession session = stream.getSession();
142         int oldSize = session.updateSendWindow(-length);
143         if (LOG.isDebugEnabled())
144             LOG.debug("Updated session send window {} -> {} for {}", oldSize, oldSize - length, session);
145 
146         oldSize = stream.updateSendWindow(-length);
147         if (LOG.isDebugEnabled())
148             LOG.debug("Updated stream send window {} -> {} for {}", oldSize, oldSize - length, stream);
149     }
150 
151     @Override
152     public void onDataSent(IStream stream, int length)
153     {
154     }
155 
156     @Override
157     public void onSessionStalled(ISession session)
158     {
159         if (LOG.isDebugEnabled())
160             LOG.debug("Session stalled {}", session);
161     }
162 
163     @Override
164     public void onStreamStalled(IStream stream)
165     {
166         if (LOG.isDebugEnabled())
167             LOG.debug("Stream stalled {}", stream);
168     }
169 }