1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.nio.ByteBuffer;
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26
27 import org.eclipse.jetty.spdy.StandardSession.FrameBytes;
28 import org.eclipse.jetty.spdy.api.Stream;
29 import org.eclipse.jetty.spdy.api.StreamStatus;
30 import org.eclipse.jetty.util.ArrayQueue;
31 import org.eclipse.jetty.util.IteratingCallback;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34
35 public class Flusher
36 {
37 private static final Logger LOG = Log.getLogger(Flusher.class);
38
39 private final IteratingCallback callback = new FlusherCallback();
40 private final Object lock = new Object();
41 private final ArrayQueue<StandardSession.FrameBytes> queue = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, lock);
42 private final Controller controller;
43 private final int maxGather;
44 private Throwable failure;
45
46 public Flusher(Controller controller)
47 {
48 this(controller, 8);
49 }
50
51 public Flusher(Controller controller, int maxGather)
52 {
53 this.controller = controller;
54 this.maxGather = maxGather;
55 }
56
57 public void removeFrameBytesFromQueue(Stream stream)
58 {
59 synchronized (lock)
60 {
61 for (StandardSession.FrameBytes frameBytes : queue)
62 if (frameBytes.getStream() == stream)
63 queue.remove(frameBytes);
64 }
65 }
66
67 public Throwable prepend(StandardSession.FrameBytes frameBytes)
68 {
69 synchronized (lock)
70 {
71 Throwable failure = this.failure;
72 if (failure == null)
73 {
74
75 int index = 0;
76 int size = queue.size();
77 while (index < size)
78 {
79 StandardSession.FrameBytes element = queue.getUnsafe(index);
80 if (element.compareTo(frameBytes) <= 0)
81 break;
82 ++index;
83 }
84 queue.add(index, frameBytes);
85 }
86 return failure;
87 }
88 }
89
90 public Throwable append(StandardSession.FrameBytes frameBytes)
91 {
92 synchronized (lock)
93 {
94 Throwable failure = this.failure;
95 if (failure == null)
96 {
97
98 queue.add(frameBytes);
99 }
100 return failure;
101 }
102 }
103
104 public Throwable append(StandardSession.DataFrameBytes frameBytes)
105 {
106 synchronized (lock)
107 {
108 Throwable failure = this.failure;
109 if (failure == null)
110 {
111
112 int index = queue.size();
113 while (index > 0)
114 {
115 StandardSession.FrameBytes element = queue.getUnsafe(index - 1);
116 if (element.compareTo(frameBytes) >= 0)
117 break;
118 --index;
119 }
120 queue.add(index, frameBytes);
121 }
122 return failure;
123 }
124 }
125
126 public void flush()
127 {
128 callback.iterate();
129 }
130
131 public int getQueueSize()
132 {
133 synchronized (lock)
134 {
135 return queue.size();
136 }
137 }
138
139 private class FlusherCallback extends IteratingCallback
140 {
141 private final List<StandardSession.FrameBytes> active = new ArrayList<>(maxGather);
142 private final List<StandardSession.FrameBytes> succeeded = new ArrayList<>(maxGather);
143 private final Set<IStream> stalled = new HashSet<>();
144
145 @Override
146 protected Action process() throws Exception
147 {
148 synchronized (lock)
149 {
150
151 int index = 0;
152 int size = queue.size();
153 while (index < size)
154 {
155 FrameBytes frameBytes = queue.getUnsafe(index);
156 IStream stream = frameBytes.getStream();
157
158 if (stream != null)
159 {
160
161 if (stalled.size() > 0 && stalled.contains(stream))
162 {
163 ++index;
164 continue;
165 }
166
167
168 if (stream.getWindowSize() <= 0)
169 {
170 stalled.add(stream);
171 ++index;
172 continue;
173 }
174 }
175
176
177 queue.remove(index);
178 --size;
179
180
181 if (stream != null && stream.isReset() && frameBytes instanceof StandardSession.DataFrameBytes)
182 {
183
184 frameBytes.failed(new StreamException(frameBytes.getStream().getId(),
185 StreamStatus.INVALID_STREAM, "Stream: " + frameBytes.getStream() + " is reset!"));
186 continue;
187 }
188
189 active.add(frameBytes);
190 }
191 stalled.clear();
192
193 if (LOG.isDebugEnabled())
194 LOG.debug("Flushing {} of {} frame(s) in queue", active.size(), queue.size());
195 }
196
197 if (active.isEmpty())
198 return Action.IDLE;
199
200
201 ByteBuffer[] buffers = new ByteBuffer[active.size()];
202 for (int i = 0; i < buffers.length; i++)
203 buffers[i] = active.get(i).getByteBuffer();
204
205 if (controller != null)
206 controller.write(this, buffers);
207
208
209
210
211
212
213
214
215
216
217 return Action.SCHEDULED;
218 }
219
220 @Override
221 protected void onCompleteSuccess()
222 {
223
224 throw new IllegalStateException();
225 }
226
227 @Override
228 public void succeeded()
229 {
230 synchronized (lock)
231 {
232 if (LOG.isDebugEnabled())
233 LOG.debug("Succeeded write of {}, q={}", active, queue.size());
234 succeeded.addAll(active);
235 active.clear();
236 }
237
238 for (FrameBytes frame : succeeded)
239 frame.succeeded();
240 succeeded.clear();
241 super.succeeded();
242 }
243
244 @Override
245 public void onCompleteFailure(Throwable x)
246 {
247 List<StandardSession.FrameBytes> failed = new ArrayList<>();
248 synchronized (lock)
249 {
250 failure = x;
251 if (LOG.isDebugEnabled())
252 {
253 String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
254 LOG.debug(logMessage, x);
255 }
256 failed.addAll(active);
257 active.clear();
258 failed.addAll(queue);
259 queue.clear();
260 }
261
262 for (StandardSession.FrameBytes frame : failed)
263 frame.failed(x);
264 }
265 }
266 }