1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Objects;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 import org.eclipse.jetty.io.ByteBufferPool;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.ArrayQueue;
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.Callback;
33 import org.eclipse.jetty.util.IteratingCallback;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.websocket.api.BatchMode;
37 import org.eclipse.jetty.websocket.api.WriteCallback;
38 import org.eclipse.jetty.websocket.api.extensions.Frame;
39 import org.eclipse.jetty.websocket.common.Generator;
40 import org.eclipse.jetty.websocket.common.OpCode;
41 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
42
43
44
45
46 public class FrameFlusher
47 {
48 public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
49 private static final Logger LOG = Log.getLogger(FrameFlusher.class);
50
51 private final ByteBufferPool bufferPool;
52 private final EndPoint endpoint;
53 private final int bufferSize;
54 private final Generator generator;
55 private final int maxGather;
56 private final Object lock = new Object();
57 private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16, 16, lock);
58 private final Flusher flusher = new Flusher();
59 private final AtomicBoolean closed = new AtomicBoolean();
60 private volatile Throwable failure;
61
62 public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
63 {
64 this.bufferPool = bufferPool;
65 this.endpoint = endpoint;
66 this.bufferSize = bufferSize;
67 this.generator = Objects.requireNonNull(generator);
68 this.maxGather = maxGather;
69 }
70
71 public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
72 {
73 if (closed.get())
74 {
75 notifyCallbackFailure(callback, new EOFException("Connection has been closed locally"));
76 return;
77 }
78 if (flusher.isFailed())
79 {
80 notifyCallbackFailure(callback, failure);
81 return;
82 }
83
84 FrameEntry entry = new FrameEntry(frame, callback, batchMode);
85
86 synchronized (lock)
87 {
88 switch (frame.getOpCode())
89 {
90 case OpCode.PING:
91 {
92
93 queue.add(0, entry);
94 break;
95 }
96 case OpCode.CLOSE:
97 {
98
99
100
101 closed.set(true);
102 queue.add(entry);
103 break;
104 }
105 default:
106 {
107 queue.add(entry);
108 break;
109 }
110 }
111 }
112
113 if (LOG.isDebugEnabled())
114 LOG.debug("{} queued {}", this, entry);
115
116 flusher.iterate();
117 }
118
119 public void close()
120 {
121 if (closed.compareAndSet(false, true))
122 {
123 LOG.debug("{} closing {}", this);
124 EOFException eof = new EOFException("Connection has been closed locally");
125 flusher.failed(eof);
126
127
128 List<FrameEntry> entries = new ArrayList<>();
129 synchronized (lock)
130 {
131 entries.addAll(queue);
132 queue.clear();
133 }
134
135 for (FrameEntry entry : entries)
136 notifyCallbackFailure(entry.callback, eof);
137 }
138 }
139
140 protected void onFailure(Throwable x)
141 {
142 LOG.warn(x);
143 }
144
145 protected void notifyCallbackSuccess(WriteCallback callback)
146 {
147 try
148 {
149 if (callback != null)
150 callback.writeSuccess();
151 }
152 catch (Throwable x)
153 {
154 LOG.debug("Exception while notifying success of callback " + callback, x);
155 }
156 }
157
158 protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
159 {
160 try
161 {
162 if (callback != null)
163 callback.writeFailed(failure);
164 }
165 catch (Throwable x)
166 {
167 LOG.debug("Exception while notifying failure of callback " + callback, x);
168 }
169 }
170
171 @Override
172 public String toString()
173 {
174 ByteBuffer aggregate = flusher.aggregate;
175 return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",
176 getClass().getSimpleName(),
177 queue.size(),
178 aggregate == null ? 0 : aggregate.position(),
179 failure);
180 }
181
182 private class Flusher extends IteratingCallback
183 {
184 private final List<FrameEntry> entries = new ArrayList<>(maxGather);
185 private final List<ByteBuffer> buffers = new ArrayList<>(maxGather * 2 + 1);
186 private ByteBuffer aggregate;
187 private BatchMode batchMode;
188
189 @Override
190 protected Action process() throws Exception
191 {
192 int space = aggregate == null ? bufferSize : BufferUtil.space(aggregate);
193 BatchMode currentBatchMode = BatchMode.AUTO;
194 synchronized (lock)
195 {
196 while (entries.size() <= maxGather && !queue.isEmpty())
197 {
198 FrameEntry entry = queue.remove(0);
199 currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode);
200
201
202 if (entry.frame == FLUSH_FRAME)
203 currentBatchMode = BatchMode.OFF;
204
205 int payloadLength = BufferUtil.length(entry.frame.getPayload());
206 int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
207
208
209 if (approxFrameLength > (bufferSize >> 2))
210 currentBatchMode = BatchMode.OFF;
211
212
213 space -= approxFrameLength;
214 if (space <= 0)
215 currentBatchMode = BatchMode.OFF;
216
217 entries.add(entry);
218 }
219 }
220
221 if (LOG.isDebugEnabled())
222 LOG.debug("{} processing {} entries: {}", FrameFlusher.this, entries.size(), entries);
223
224 if (entries.isEmpty())
225 {
226 if (batchMode != BatchMode.AUTO)
227 {
228
229
230 releaseAggregate();
231 return Action.IDLE;
232 }
233
234 LOG.debug("{} auto flushing", FrameFlusher.this);
235 return flush();
236 }
237
238 batchMode = currentBatchMode;
239
240 return currentBatchMode == BatchMode.OFF ? flush() : batch();
241 }
242
243 private Action flush()
244 {
245 if (!BufferUtil.isEmpty(aggregate))
246 {
247 buffers.add(aggregate);
248 if (LOG.isDebugEnabled())
249 LOG.debug("{} flushing aggregate {}", FrameFlusher.this, aggregate);
250 }
251
252
253 for (int i = 0; i < entries.size(); ++i)
254 {
255 FrameEntry entry = entries.get(i);
256
257 if (entry.frame == FLUSH_FRAME)
258 continue;
259 buffers.add(entry.generateHeaderBytes());
260 ByteBuffer payload = entry.frame.getPayload();
261 if (BufferUtil.hasContent(payload))
262 buffers.add(payload);
263 }
264
265 if (LOG.isDebugEnabled())
266 LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries);
267
268 if (buffers.isEmpty())
269 {
270 releaseAggregate();
271
272 succeedEntries();
273 return Action.IDLE;
274 }
275
276 endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
277 buffers.clear();
278 return Action.SCHEDULED;
279 }
280
281 private Action batch()
282 {
283 if (aggregate == null)
284 {
285 aggregate = bufferPool.acquire(bufferSize, true);
286 if (LOG.isDebugEnabled())
287 LOG.debug("{} acquired aggregate buffer {}", FrameFlusher.this, aggregate);
288 }
289
290
291 for (int i = 0; i < entries.size(); ++i)
292 {
293 FrameEntry entry = entries.get(i);
294
295 entry.generateHeaderBytes(aggregate);
296
297 ByteBuffer payload = entry.frame.getPayload();
298 if (BufferUtil.hasContent(payload))
299 BufferUtil.append(aggregate, payload);
300 }
301 if (LOG.isDebugEnabled())
302 LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries);
303 succeeded();
304 return Action.SCHEDULED;
305 }
306
307 private void releaseAggregate()
308 {
309 if (aggregate != null && BufferUtil.isEmpty(aggregate))
310 {
311 bufferPool.release(aggregate);
312 aggregate = null;
313 }
314 }
315
316 @Override
317 public void succeeded()
318 {
319 succeedEntries();
320 super.succeeded();
321 }
322
323 private void succeedEntries()
324 {
325
326 for (int i = 0; i < entries.size(); ++i)
327 {
328 FrameEntry entry = entries.get(i);
329 notifyCallbackSuccess(entry.callback);
330 entry.release();
331 }
332 entries.clear();
333 }
334
335 @Override
336 protected void completed()
337 {
338
339 }
340
341 @Override
342 public void failed(Throwable x)
343 {
344 for (FrameEntry entry : entries)
345 {
346 notifyCallbackFailure(entry.callback, x);
347 entry.release();
348 }
349 entries.clear();
350 super.failed(x);
351 failure = x;
352 onFailure(x);
353 }
354 }
355
356 private class FrameEntry
357 {
358 private final Frame frame;
359 private final WriteCallback callback;
360 private final BatchMode batchMode;
361 private ByteBuffer headerBuffer;
362
363 private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
364 {
365 this.frame = Objects.requireNonNull(frame);
366 this.callback = callback;
367 this.batchMode = batchMode;
368 }
369
370 private ByteBuffer generateHeaderBytes()
371 {
372 return headerBuffer = generator.generateHeaderBytes(frame);
373 }
374
375 private void generateHeaderBytes(ByteBuffer buffer)
376 {
377 generator.generateHeaderBytes(frame, buffer);
378 }
379
380 private void release()
381 {
382 if (headerBuffer != null)
383 {
384 generator.getBufferPool().release(headerBuffer);
385 headerBuffer = null;
386 }
387 }
388
389 @Override
390 public String toString()
391 {
392 return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, failure);
393 }
394 }
395 }