View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.nio.ByteBuffer;
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Set;
27  
28  import org.eclipse.jetty.spdy.api.SPDYException;
29  import org.eclipse.jetty.spdy.api.Stream;
30  import org.eclipse.jetty.spdy.api.StreamStatus;
31  import org.eclipse.jetty.util.IteratingCallback;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  
35  public class Flusher
36  {
37      private static final Logger LOG = Log.getLogger(Flusher.class);
38  
39      private final IteratingCallback iteratingCallback = new SessionIteratingCallback();
40      private final Controller controller;
41      private final LinkedList<StandardSession.FrameBytes> queue = new LinkedList<>();
42      private Throwable failure;
43      private StandardSession.FrameBytes active;
44  
45      private boolean flushing;
46  
47      public Flusher(Controller controller)
48      {
49          this.controller = controller;
50      }
51  
52      void removeFrameBytesFromQueue(Stream stream)
53      {
54          synchronized (queue)
55          {
56              for (StandardSession.FrameBytes frameBytes : queue)
57                  if (frameBytes.getStream() == stream)
58                      queue.remove(frameBytes);
59          }
60      }
61  
62      void append(StandardSession.FrameBytes frameBytes)
63      {
64          Throwable failure;
65          synchronized (queue)
66          {
67              failure = this.failure;
68              if (failure == null)
69              {
70                  // Frames containing headers must be send in the order the headers have been generated. We don't need
71                  // to do this check in StandardSession.prepend() as no frames containing headers will be prepended.
72                  if (frameBytes instanceof StandardSession.ControlFrameBytes)
73                      queue.addLast(frameBytes);
74                  else
75                  {
76                      int index = queue.size();
77                      while (index > 0)
78                      {
79                          StandardSession.FrameBytes element = queue.get(index - 1);
80                          if (element.compareTo(frameBytes) >= 0)
81                              break;
82                          --index;
83                      }
84                      queue.add(index, frameBytes);
85                  }
86              }
87          }
88          if (failure == null)
89              iteratingCallback.iterate();
90          else
91              frameBytes.failed(new SPDYException(failure));
92      }
93  
94      void prepend(StandardSession.FrameBytes frameBytes)
95      {
96          Throwable failure;
97          synchronized (queue)
98          {
99              failure = this.failure;
100             if (failure == null)
101             {
102                 int index = 0;
103                 while (index < queue.size())
104                 {
105                     StandardSession.FrameBytes element = queue.get(index);
106                     if (element.compareTo(frameBytes) <= 0)
107                         break;
108                     ++index;
109                 }
110                 queue.add(index, frameBytes);
111             }
112         }
113 
114         if (failure == null)
115             iteratingCallback.iterate();
116         else
117             frameBytes.failed(new SPDYException(failure));
118     }
119 
120     void flush()
121     {
122         StandardSession.FrameBytes frameBytes = null;
123         ByteBuffer buffer = null;
124         boolean failFrameBytes = false;
125         synchronized (queue)
126         {
127             if (flushing || queue.isEmpty())
128                 return;
129 
130             Set<IStream> stalledStreams = null;
131             for (int i = 0; i < queue.size(); ++i)
132             {
133                 frameBytes = queue.get(i);
134 
135                 IStream stream = frameBytes.getStream();
136                 if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
137                     continue;
138 
139                 buffer = frameBytes.getByteBuffer();
140                 if (buffer != null)
141                 {
142                     queue.remove(i);
143                     if (stream != null && stream.isReset() && !(frameBytes instanceof StandardSession
144                             .ControlFrameBytes))
145                         failFrameBytes = true;
146                     break;
147                 }
148 
149                 if (stalledStreams == null)
150                     stalledStreams = new HashSet<>();
151                 if (stream != null)
152                     stalledStreams.add(stream);
153 
154                 LOG.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
155             }
156 
157             if (buffer == null)
158                 return;
159 
160             if (!failFrameBytes)
161             {
162                 flushing = true;
163                 LOG.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size());
164             }
165         }
166         if (failFrameBytes)
167         {
168             frameBytes.failed(new StreamException(frameBytes.getStream().getId(), StreamStatus.INVALID_STREAM,
169                     "Stream: " + frameBytes.getStream() + " is reset!"));
170         }
171         else
172         {
173             write(buffer, frameBytes);
174         }
175     }
176 
177     private void write(ByteBuffer buffer, StandardSession.FrameBytes frameBytes)
178     {
179         active = frameBytes;
180         if (controller != null)
181         {
182             LOG.debug("Writing {} frame bytes of {}", buffer.remaining(), buffer.limit());
183             controller.write(buffer, iteratingCallback);
184         }
185     }
186 
187     public int getQueueSize()
188     {
189         return queue.size();
190     }
191 
192     private class SessionIteratingCallback extends IteratingCallback
193     {
194         @Override
195         protected boolean process() throws Exception
196         {
197             flush();
198             return false;
199         }
200 
201         @Override
202         protected void completed()
203         {
204             // will never be called as process always returns false!
205         }
206 
207         @Override
208         public void succeeded()
209         {
210             if (LOG.isDebugEnabled())
211             {
212                 synchronized (queue)
213                 {
214                     LOG.debug("Completed write of {}, {} frame(s) in queue", active, queue.size());
215                 }
216             }
217             active.succeeded();
218             synchronized (queue)
219             {
220                 flushing = false;
221             }
222             super.succeeded();
223         }
224 
225         @Override
226         public void failed(Throwable x)
227         {
228             List<StandardSession.FrameBytes> frameBytesToFail = new ArrayList<>();
229 
230             synchronized (queue)
231             {
232                 failure = x;
233                 if (LOG.isDebugEnabled())
234                 {
235                     String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
236                     LOG.debug(logMessage, x);
237                 }
238                 frameBytesToFail.addAll(queue);
239                 queue.clear();
240             }
241 
242             active.failed(x);
243             for (StandardSession.FrameBytes fb : frameBytesToFail)
244                 fb.failed(x);
245             super.failed(x);
246         }
247     }
248 
249 }