View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.nio.ByteBuffer;
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Set;
26  
27  import org.eclipse.jetty.spdy.StandardSession.FrameBytes;
28  import org.eclipse.jetty.spdy.api.Stream;
29  import org.eclipse.jetty.spdy.api.StreamStatus;
30  import org.eclipse.jetty.util.ArrayQueue;
31  import org.eclipse.jetty.util.IteratingCallback;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  
35  public class Flusher
36  {
37      private static final Logger LOG = Log.getLogger(Flusher.class);
38  
39      private final IteratingCallback callback = new FlusherCallback();
40      private final Object lock = new Object();
41      private final ArrayQueue<StandardSession.FrameBytes> queue = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, lock);
42      private final Controller controller;
43      private final int maxGather;
44      private Throwable failure;
45  
46      public Flusher(Controller controller)
47      {
48          this(controller, 8);
49      }
50  
51      public Flusher(Controller controller, int maxGather)
52      {
53          this.controller = controller;
54          this.maxGather = maxGather;
55      }
56  
57      public void removeFrameBytesFromQueue(Stream stream)
58      {
59          synchronized (lock)
60          {
61              for (StandardSession.FrameBytes frameBytes : queue)
62                  if (frameBytes.getStream() == stream)
63                      queue.remove(frameBytes);
64          }
65      }
66  
67      public Throwable prepend(StandardSession.FrameBytes frameBytes)
68      {
69          synchronized (lock)
70          {
71              Throwable failure = this.failure;
72              if (failure == null)
73              {
74                  // Scan from the front of the queue looking to skip higher priority messages
75                  int index = 0;
76                  int size = queue.size();
77                  while (index < size)
78                  {
79                      StandardSession.FrameBytes element = queue.getUnsafe(index);
80                      if (element.compareTo(frameBytes) <= 0)
81                          break;
82                      ++index;
83                  }
84                  queue.add(index, frameBytes);
85              }
86              return failure;
87          }
88      }
89  
90      public Throwable append(StandardSession.FrameBytes frameBytes)
91      {
92          synchronized (lock)
93          {
94              Throwable failure = this.failure;
95              if (failure == null)
96              {
97                  // Non DataFrameBytes are inserted last
98                  queue.add(frameBytes);
99              }
100             return failure;
101         }
102     }
103 
104     public Throwable append(StandardSession.DataFrameBytes frameBytes)
105     {
106         synchronized (lock)
107         {
108             Throwable failure = this.failure;
109             if (failure == null)
110             {
111                 // DataFrameBytes are inserted by priority
112                 int index = queue.size();
113                 while (index > 0)
114                 {
115                     StandardSession.FrameBytes element = queue.getUnsafe(index - 1);
116                     if (element.compareTo(frameBytes) >= 0)
117                         break;
118                     --index;
119                 }
120                 queue.add(index, frameBytes);
121             }
122             return failure;
123         }
124     }
125 
126     public void flush()
127     {
128         callback.iterate();
129     }
130 
131     public int getQueueSize()
132     {
133         synchronized (lock)
134         {
135             return queue.size();
136         }
137     }
138 
139     private class FlusherCallback extends IteratingCallback
140     {
141         private final List<StandardSession.FrameBytes> active = new ArrayList<>(maxGather);
142         private final List<StandardSession.FrameBytes> succeeded = new ArrayList<>(maxGather);
143         private final Set<IStream> stalled = new HashSet<>();
144 
145         @Override
146         protected Action process() throws Exception
147         {
148             synchronized (lock)
149             {
150                 // Scan queue for data to write from first non stalled stream.
151                 int index = 0; // The index of the first non-stalled frame.
152                 int size = queue.size();
153                 while (index < size)
154                 {
155                     FrameBytes frameBytes = queue.getUnsafe(index);
156                     IStream stream = frameBytes.getStream();
157 
158                     if (stream != null)
159                     {
160                         // Is it a frame belonging to an already stalled stream ?
161                         if (stalled.size() > 0 && stalled.contains(stream))
162                         {
163                             ++index;
164                             continue;
165                         }
166 
167                         // Has the stream consumed all its flow control window ?
168                         if (stream.getWindowSize() <= 0)
169                         {
170                             stalled.add(stream);
171                             ++index;
172                             continue;
173                         }
174                     }
175 
176                     // We will be possibly writing this frame, so take the frame off the queue.
177                     queue.remove(index);
178                     --size;
179 
180                     // Has the stream been reset for this data frame ?
181                     if (stream != null && stream.isReset() && frameBytes instanceof StandardSession.DataFrameBytes)
182                     {
183                         // TODO: notify from within sync block !
184                         frameBytes.failed(new StreamException(frameBytes.getStream().getId(),
185                                 StreamStatus.INVALID_STREAM, "Stream: " + frameBytes.getStream() + " is reset!"));
186                         continue;
187                     }
188 
189                     active.add(frameBytes);
190                 }
191                 stalled.clear();
192 
193                 if (LOG.isDebugEnabled())
194                     LOG.debug("Flushing {} of {} frame(s) in queue", active.size(), queue.size());
195             }
196 
197             if (active.isEmpty())
198                 return Action.IDLE;
199 
200             // Get the bytes to write
201             ByteBuffer[] buffers = new ByteBuffer[active.size()];
202             for (int i = 0; i < buffers.length; i++)
203                 buffers[i] = active.get(i).getByteBuffer();
204 
205             if (controller != null)
206                 controller.write(this, buffers);
207 
208             // TODO: optimization
209             // If the callback completely immediately, it means that
210             // the connection is not congested, and therefore we can
211             // write more data without blocking.
212             // Therefore we should check this condition and increase
213             // the write window, which means two things: autotune the
214             // maxGather parameter, and/or autotune the buffer returned
215             // by FrameBytes.getByteBuffer() (see also comment there).
216 
217             return Action.SCHEDULED;
218         }
219 
220         @Override
221         protected void onCompleteSuccess()
222         {
223             // will never be called as process always returns SCHEDULED or IDLE
224             throw new IllegalStateException();
225         }
226 
227         @Override
228         public void succeeded()
229         {
230             synchronized (lock)
231             {
232                 if (LOG.isDebugEnabled())
233                     LOG.debug("Succeeded write of {}, q={}", active, queue.size());
234                 succeeded.addAll(active);
235                 active.clear();
236             }
237             // Notify outside the synchronized block.
238             for (FrameBytes frame : succeeded)
239                 frame.succeeded();
240             succeeded.clear();
241             super.succeeded();
242         }
243 
244         @Override
245         public void onCompleteFailure(Throwable x)
246         {
247             List<StandardSession.FrameBytes> failed = new ArrayList<>();
248             synchronized (lock)
249             {
250                 failure = x;
251                 if (LOG.isDebugEnabled())
252                 {
253                     String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
254                     LOG.debug(logMessage, x);
255                 }
256                 failed.addAll(active);
257                 active.clear();
258                 failed.addAll(queue);
259                 queue.clear();
260             }
261             // Notify outside the synchronized block.
262             for (StandardSession.FrameBytes frame : failed)
263                 frame.failed(x);
264         }
265     }
266 }