View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.io.IOException;
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.eclipse.jetty.http2.api.Stream;
28  import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
29  import org.eclipse.jetty.util.annotation.ManagedAttribute;
30  import org.eclipse.jetty.util.annotation.ManagedObject;
31  import org.eclipse.jetty.util.annotation.ManagedOperation;
32  import org.eclipse.jetty.util.component.ContainerLifeCycle;
33  import org.eclipse.jetty.util.component.Dumpable;
34  import org.eclipse.jetty.util.log.Log;
35  import org.eclipse.jetty.util.log.Logger;
36  
37  @ManagedObject
38  public abstract class AbstractFlowControlStrategy implements FlowControlStrategy, Dumpable
39  {
40      protected static final Logger LOG = Log.getLogger(FlowControlStrategy.class);
41  
42      private final AtomicLong sessionStall = new AtomicLong();
43      private final AtomicLong sessionStallTime = new AtomicLong();
44      private final Map<IStream, Long> streamsStalls = new ConcurrentHashMap<>();
45      private final AtomicLong streamsStallTime = new AtomicLong();
46      private int initialStreamSendWindow;
47      private int initialStreamRecvWindow;
48  
49      public AbstractFlowControlStrategy(int initialStreamSendWindow)
50      {
51          this.initialStreamSendWindow = initialStreamSendWindow;
52          this.initialStreamRecvWindow = DEFAULT_WINDOW_SIZE;
53      }
54  
55      @ManagedAttribute(value = "The initial size of stream's flow control send window", readonly = true)
56      public int getInitialStreamSendWindow()
57      {
58          return initialStreamSendWindow;
59      }
60  
61      @ManagedAttribute(value = "The initial size of stream's flow control receive window", readonly = true)
62      public int getInitialStreamRecvWindow()
63      {
64          return initialStreamRecvWindow;
65      }
66  
67      @Override
68      public void onStreamCreated(IStream stream)
69      {
70          stream.updateSendWindow(initialStreamSendWindow);
71          stream.updateRecvWindow(initialStreamRecvWindow);
72      }
73  
74      @Override
75      public void onStreamDestroyed(IStream stream)
76      {
77      }
78  
79      @Override
80      public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local)
81      {
82          int previousInitialStreamWindow;
83          if (local)
84          {
85              previousInitialStreamWindow = getInitialStreamRecvWindow();
86              this.initialStreamRecvWindow = initialStreamWindow;
87          }
88          else
89          {
90              previousInitialStreamWindow = getInitialStreamSendWindow();
91              this.initialStreamSendWindow = initialStreamWindow;
92          }
93          int delta = initialStreamWindow - previousInitialStreamWindow;
94  
95          // SPEC: updates of the initial window size only affect stream windows, not session's.
96          for (Stream stream : session.getStreams())
97          {
98              if (local)
99              {
100                 ((IStream)stream).updateRecvWindow(delta);
101                 if (LOG.isDebugEnabled())
102                     LOG.debug("Updated initial stream recv window {} -> {} for {}", previousInitialStreamWindow, initialStreamWindow, stream);
103             }
104             else
105             {
106                 session.onWindowUpdate((IStream)stream, new WindowUpdateFrame(stream.getId(), delta));
107             }
108         }
109     }
110 
111     @Override
112     public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
113     {
114         int delta = frame.getWindowDelta();
115         if (frame.getStreamId() > 0)
116         {
117             // The stream may have been removed concurrently.
118             if (stream != null)
119             {
120                 int oldSize = stream.updateSendWindow(delta);
121                 if (LOG.isDebugEnabled())
122                     LOG.debug("Updated stream send window {} -> {} for {}", oldSize, oldSize + delta, stream);
123                 if (oldSize <= 0)
124                     onStreamUnstalled(stream);
125             }
126         }
127         else
128         {
129             int oldSize = session.updateSendWindow(delta);
130             if (LOG.isDebugEnabled())
131                 LOG.debug("Updated session send window {} -> {} for {}", oldSize, oldSize + delta, session);
132             if (oldSize <= 0)
133                 onSessionUnstalled(session);
134         }
135     }
136 
137     @Override
138     public void onDataReceived(ISession session, IStream stream, int length)
139     {
140         int oldSize = session.updateRecvWindow(-length);
141         if (LOG.isDebugEnabled())
142             LOG.debug("Data received, updated session recv window {} -> {} for {}", oldSize, oldSize - length, session);
143 
144         if (stream != null)
145         {
146             oldSize = stream.updateRecvWindow(-length);
147             if (LOG.isDebugEnabled())
148                 LOG.debug("Data received, updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream);
149         }
150     }
151 
152     @Override
153     public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
154     {
155     }
156 
157     @Override
158     public void onDataSending(IStream stream, int length)
159     {
160         if (length == 0)
161             return;
162 
163         ISession session = stream.getSession();
164         int oldSessionWindow = session.updateSendWindow(-length);
165         int newSessionWindow = oldSessionWindow - length;
166         if (LOG.isDebugEnabled())
167             LOG.debug("Sending, session send window {} -> {} for {}", oldSessionWindow, newSessionWindow, session);
168         if (newSessionWindow <= 0)
169             onSessionStalled(session);
170 
171         int oldStreamWindow = stream.updateSendWindow(-length);
172         int newStreamWindow = oldStreamWindow - length;
173         if (LOG.isDebugEnabled())
174             LOG.debug("Sending, stream send window {} -> {} for {}", oldStreamWindow, newStreamWindow, stream);
175         if (newStreamWindow <= 0)
176             onStreamStalled(stream);
177     }
178 
179     @Override
180     public void onDataSent(IStream stream, int length)
181     {
182     }
183 
184     protected void onSessionStalled(ISession session)
185     {
186         sessionStall.set(System.nanoTime());
187         if (LOG.isDebugEnabled())
188             LOG.debug("Session stalled {}", session);
189     }
190 
191     protected void onStreamStalled(IStream stream)
192     {
193         streamsStalls.put(stream, System.nanoTime());
194         if (LOG.isDebugEnabled())
195             LOG.debug("Stream stalled {}", stream);
196     }
197 
198     protected void onSessionUnstalled(ISession session)
199     {
200         sessionStallTime.addAndGet(System.nanoTime() - sessionStall.getAndSet(0));
201         if (LOG.isDebugEnabled())
202             LOG.debug("Session unstalled {}", session);
203     }
204 
205     protected void onStreamUnstalled(IStream stream)
206     {
207         Long time = streamsStalls.remove(stream);
208         if (time != null)
209             streamsStallTime.addAndGet(System.nanoTime() - time);
210         if (LOG.isDebugEnabled())
211             LOG.debug("Stream unstalled {}", stream);
212     }
213 
214     @ManagedAttribute(value = "The time, in milliseconds, that the session flow control has stalled", readonly = true)
215     public long getSessionStallTime()
216     {
217         return TimeUnit.NANOSECONDS.toMillis(sessionStallTime.get());
218     }
219 
220     @ManagedAttribute(value = "The time, in milliseconds, that the streams flow control has stalled", readonly = true)
221     public long getStreamsStallTime()
222     {
223         return TimeUnit.NANOSECONDS.toMillis(streamsStallTime.get());
224     }
225 
226     @ManagedOperation(value = "Resets the statistics", impact = "ACTION")
227     public void reset()
228     {
229         sessionStallTime.set(0);
230         streamsStallTime.set(0);
231     }
232 
233     @Override
234     public String dump()
235     {
236         return ContainerLifeCycle.dump(this);
237     }
238 
239     @Override
240     public void dump(Appendable out, String indent) throws IOException
241     {
242         out.append(toString()).append(System.lineSeparator());
243     }
244 }