1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.nio.ByteBuffer;
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Set;
27
28 import org.eclipse.jetty.spdy.api.SPDYException;
29 import org.eclipse.jetty.spdy.api.Stream;
30 import org.eclipse.jetty.spdy.api.StreamStatus;
31 import org.eclipse.jetty.util.IteratingCallback;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34
35 public class Flusher
36 {
37 private static final Logger LOG = Log.getLogger(Flusher.class);
38
39 private final IteratingCallback iteratingCallback = new SessionIteratingCallback();
40 private final Controller controller;
41 private final LinkedList<StandardSession.FrameBytes> queue = new LinkedList<>();
42 private Throwable failure;
43 private StandardSession.FrameBytes active;
44
45 private boolean flushing;
46
47 public Flusher(Controller controller)
48 {
49 this.controller = controller;
50 }
51
52 void removeFrameBytesFromQueue(Stream stream)
53 {
54 synchronized (queue)
55 {
56 for (StandardSession.FrameBytes frameBytes : queue)
57 if (frameBytes.getStream() == stream)
58 queue.remove(frameBytes);
59 }
60 }
61
62 void append(StandardSession.FrameBytes frameBytes)
63 {
64 Throwable failure;
65 synchronized (queue)
66 {
67 failure = this.failure;
68 if (failure == null)
69 {
70
71
72 if (frameBytes instanceof StandardSession.ControlFrameBytes)
73 queue.addLast(frameBytes);
74 else
75 {
76 int index = queue.size();
77 while (index > 0)
78 {
79 StandardSession.FrameBytes element = queue.get(index - 1);
80 if (element.compareTo(frameBytes) >= 0)
81 break;
82 --index;
83 }
84 queue.add(index, frameBytes);
85 }
86 }
87 }
88 if (failure == null)
89 iteratingCallback.iterate();
90 else
91 frameBytes.failed(new SPDYException(failure));
92 }
93
94 void prepend(StandardSession.FrameBytes frameBytes)
95 {
96 Throwable failure;
97 synchronized (queue)
98 {
99 failure = this.failure;
100 if (failure == null)
101 {
102 int index = 0;
103 while (index < queue.size())
104 {
105 StandardSession.FrameBytes element = queue.get(index);
106 if (element.compareTo(frameBytes) <= 0)
107 break;
108 ++index;
109 }
110 queue.add(index, frameBytes);
111 }
112 }
113
114 if (failure == null)
115 iteratingCallback.iterate();
116 else
117 frameBytes.failed(new SPDYException(failure));
118 }
119
120 void flush()
121 {
122 StandardSession.FrameBytes frameBytes = null;
123 ByteBuffer buffer = null;
124 boolean failFrameBytes = false;
125 synchronized (queue)
126 {
127 if (flushing || queue.isEmpty())
128 return;
129
130 Set<IStream> stalledStreams = null;
131 for (int i = 0; i < queue.size(); ++i)
132 {
133 frameBytes = queue.get(i);
134
135 IStream stream = frameBytes.getStream();
136 if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
137 continue;
138
139 buffer = frameBytes.getByteBuffer();
140 if (buffer != null)
141 {
142 queue.remove(i);
143 if (stream != null && stream.isReset() && !(frameBytes instanceof StandardSession
144 .ControlFrameBytes))
145 failFrameBytes = true;
146 break;
147 }
148
149 if (stalledStreams == null)
150 stalledStreams = new HashSet<>();
151 if (stream != null)
152 stalledStreams.add(stream);
153
154 LOG.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
155 }
156
157 if (buffer == null)
158 return;
159
160 if (!failFrameBytes)
161 {
162 flushing = true;
163 LOG.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size());
164 }
165 }
166 if (failFrameBytes)
167 {
168 frameBytes.failed(new StreamException(frameBytes.getStream().getId(), StreamStatus.INVALID_STREAM,
169 "Stream: " + frameBytes.getStream() + " is reset!"));
170 }
171 else
172 {
173 write(buffer, frameBytes);
174 }
175 }
176
177 private void write(ByteBuffer buffer, StandardSession.FrameBytes frameBytes)
178 {
179 active = frameBytes;
180 if (controller != null)
181 {
182 LOG.debug("Writing {} frame bytes of {}", buffer.remaining(), buffer.limit());
183 controller.write(buffer, iteratingCallback);
184 }
185 }
186
187 public int getQueueSize()
188 {
189 return queue.size();
190 }
191
192 private class SessionIteratingCallback extends IteratingCallback
193 {
194 @Override
195 protected boolean process() throws Exception
196 {
197 flush();
198 return false;
199 }
200
201 @Override
202 protected void completed()
203 {
204
205 }
206
207 @Override
208 public void succeeded()
209 {
210 if (LOG.isDebugEnabled())
211 {
212 synchronized (queue)
213 {
214 LOG.debug("Completed write of {}, {} frame(s) in queue", active, queue.size());
215 }
216 }
217 active.succeeded();
218 synchronized (queue)
219 {
220 flushing = false;
221 }
222 super.succeeded();
223 }
224
225 @Override
226 public void failed(Throwable x)
227 {
228 List<StandardSession.FrameBytes> frameBytesToFail = new ArrayList<>();
229
230 synchronized (queue)
231 {
232 failure = x;
233 if (LOG.isDebugEnabled())
234 {
235 String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
236 LOG.debug(logMessage, x);
237 }
238 frameBytesToFail.addAll(queue);
239 queue.clear();
240 }
241
242 active.failed(x);
243 for (StandardSession.FrameBytes fb : frameBytesToFail)
244 fb.failed(x);
245 super.failed(x);
246 }
247 }
248
249 }