1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Deque;
26 import java.util.List;
27 import java.util.Objects;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.eclipse.jetty.io.ByteBufferPool;
31 import org.eclipse.jetty.io.EndPoint;
32 import org.eclipse.jetty.util.BufferUtil;
33 import org.eclipse.jetty.util.IteratingCallback;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.websocket.api.BatchMode;
37 import org.eclipse.jetty.websocket.api.WriteCallback;
38 import org.eclipse.jetty.websocket.api.extensions.Frame;
39 import org.eclipse.jetty.websocket.common.Generator;
40 import org.eclipse.jetty.websocket.common.OpCode;
41 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
42
43
44
45
46 public class FrameFlusher
47 {
48 private class Flusher extends IteratingCallback
49 {
50 private final List<FrameEntry> entries;
51 private final List<ByteBuffer> buffers;
52 private ByteBuffer aggregate;
53 private BatchMode batchMode;
54
55 public Flusher(int maxGather)
56 {
57 entries = new ArrayList<>(maxGather);
58 buffers = new ArrayList<>((maxGather * 2) + 1);
59 }
60
61 private Action batch()
62 {
63 if (aggregate == null)
64 {
65 aggregate = bufferPool.acquire(bufferSize,true);
66 if (LOG.isDebugEnabled())
67 {
68 LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
69 }
70 }
71
72
73 for (int i = 0; i < entries.size(); ++i)
74 {
75 FrameEntry entry = entries.get(i);
76
77 entry.generateHeaderBytes(aggregate);
78
79 ByteBuffer payload = entry.frame.getPayload();
80 if (BufferUtil.hasContent(payload))
81 {
82 BufferUtil.append(aggregate,payload);
83 }
84 }
85 if (LOG.isDebugEnabled())
86 {
87 LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
88 }
89 succeeded();
90 return Action.SCHEDULED;
91 }
92
93 @Override
94 protected void onCompleteSuccess()
95 {
96
97 }
98
99 @Override
100 public void onCompleteFailure(Throwable x)
101 {
102 for (FrameEntry entry : entries)
103 {
104 notifyCallbackFailure(entry.callback,x);
105 entry.release();
106 }
107 entries.clear();
108 failure = x;
109 onFailure(x);
110 }
111
112 private Action flush()
113 {
114 if (!BufferUtil.isEmpty(aggregate))
115 {
116 buffers.add(aggregate);
117 if (LOG.isDebugEnabled())
118 {
119 LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
120 }
121 }
122
123
124 for (int i = 0; i < entries.size(); ++i)
125 {
126 FrameEntry entry = entries.get(i);
127
128 if (entry.frame == FLUSH_FRAME)
129 {
130 continue;
131 }
132 buffers.add(entry.generateHeaderBytes());
133 ByteBuffer payload = entry.frame.getPayload();
134 if (BufferUtil.hasContent(payload))
135 {
136 buffers.add(payload);
137 }
138 }
139
140 if (LOG.isDebugEnabled())
141 {
142 LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
143 }
144
145 if (buffers.isEmpty())
146 {
147 releaseAggregate();
148
149 succeedEntries();
150 return Action.IDLE;
151 }
152
153 endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
154 buffers.clear();
155 return Action.SCHEDULED;
156 }
157
158 @Override
159 protected Action process() throws Exception
160 {
161 int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
162 BatchMode currentBatchMode = BatchMode.AUTO;
163 synchronized (lock)
164 {
165 while ((entries.size() <= maxGather) && !queue.isEmpty())
166 {
167 FrameEntry entry = queue.poll();
168 currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
169
170
171 if (entry.frame == FLUSH_FRAME)
172 {
173 currentBatchMode = BatchMode.OFF;
174 }
175
176 int payloadLength = BufferUtil.length(entry.frame.getPayload());
177 int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
178
179
180 if (approxFrameLength > (bufferSize >> 2))
181 {
182 currentBatchMode = BatchMode.OFF;
183 }
184
185
186 space -= approxFrameLength;
187 if (space <= 0)
188 {
189 currentBatchMode = BatchMode.OFF;
190 }
191
192 entries.add(entry);
193 }
194 }
195
196 if (LOG.isDebugEnabled())
197 {
198 LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
199 }
200
201 if (entries.isEmpty())
202 {
203 if (batchMode != BatchMode.AUTO)
204 {
205
206
207 releaseAggregate();
208 return Action.IDLE;
209 }
210
211 LOG.debug("{} auto flushing",FrameFlusher.this);
212 return flush();
213 }
214
215 batchMode = currentBatchMode;
216
217 return currentBatchMode == BatchMode.OFF?flush():batch();
218 }
219
220 private void releaseAggregate()
221 {
222 if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
223 {
224 bufferPool.release(aggregate);
225 aggregate = null;
226 }
227 }
228
229 @Override
230 public void succeeded()
231 {
232 succeedEntries();
233 super.succeeded();
234 }
235
236 private void succeedEntries()
237 {
238
239 for (int i = 0; i < entries.size(); ++i)
240 {
241 FrameEntry entry = entries.get(i);
242 notifyCallbackSuccess(entry.callback);
243 entry.release();
244 }
245 entries.clear();
246 }
247 }
248
249 private class FrameEntry
250 {
251 private final Frame frame;
252 private final WriteCallback callback;
253 private final BatchMode batchMode;
254 private ByteBuffer headerBuffer;
255
256 private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
257 {
258 this.frame = Objects.requireNonNull(frame);
259 this.callback = callback;
260 this.batchMode = batchMode;
261 }
262
263 private ByteBuffer generateHeaderBytes()
264 {
265 return headerBuffer = generator.generateHeaderBytes(frame);
266 }
267
268 private void generateHeaderBytes(ByteBuffer buffer)
269 {
270 generator.generateHeaderBytes(frame,buffer);
271 }
272
273 private void release()
274 {
275 if (headerBuffer != null)
276 {
277 generator.getBufferPool().release(headerBuffer);
278 headerBuffer = null;
279 }
280 }
281
282 @Override
283 public String toString()
284 {
285 return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure);
286 }
287 }
288
289 public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
290 private static final Logger LOG = Log.getLogger(FrameFlusher.class);
291 private final ByteBufferPool bufferPool;
292 private final EndPoint endpoint;
293 private final int bufferSize;
294 private final Generator generator;
295 private final int maxGather;
296 private final Object lock = new Object();
297 private final Deque<FrameEntry> queue = new ArrayDeque<>();
298 private final Flusher flusher;
299 private final AtomicBoolean closed = new AtomicBoolean();
300 private volatile Throwable failure;
301
302 public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
303 {
304 this.bufferPool = bufferPool;
305 this.endpoint = endpoint;
306 this.bufferSize = bufferSize;
307 this.generator = Objects.requireNonNull(generator);
308 this.maxGather = maxGather;
309 this.flusher = new Flusher(maxGather);
310 }
311
312 public void close()
313 {
314 if (closed.compareAndSet(false,true))
315 {
316 LOG.debug("{} closing {}",this);
317 EOFException eof = new EOFException("Connection has been closed locally");
318 flusher.failed(eof);
319
320
321 List<FrameEntry> entries = new ArrayList<>();
322 synchronized (lock)
323 {
324 entries.addAll(queue);
325 queue.clear();
326 }
327
328 for (FrameEntry entry : entries)
329 {
330 notifyCallbackFailure(entry.callback,eof);
331 }
332 }
333 }
334
335 public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
336 {
337 if (closed.get())
338 {
339 notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
340 return;
341 }
342 if (flusher.isFailed())
343 {
344 notifyCallbackFailure(callback,failure);
345 return;
346 }
347
348 FrameEntry entry = new FrameEntry(frame,callback,batchMode);
349
350 synchronized (lock)
351 {
352 switch (frame.getOpCode())
353 {
354 case OpCode.PING:
355 {
356
357 queue.offerFirst(entry);
358 break;
359 }
360 case OpCode.CLOSE:
361 {
362
363
364
365 closed.set(true);
366 queue.offer(entry);
367 break;
368 }
369 default:
370 {
371 queue.offer(entry);
372 break;
373 }
374 }
375 }
376
377 if (LOG.isDebugEnabled())
378 {
379 LOG.debug("{} queued {}",this,entry);
380 }
381
382 flusher.iterate();
383 }
384
385 protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
386 {
387 try
388 {
389 if (callback != null)
390 {
391 callback.writeFailed(failure);
392 }
393 }
394 catch (Throwable x)
395 {
396 if (LOG.isDebugEnabled())
397 LOG.debug("Exception while notifying failure of callback " + callback,x);
398 }
399 }
400
401 protected void notifyCallbackSuccess(WriteCallback callback)
402 {
403 try
404 {
405 if (callback != null)
406 {
407 callback.writeSuccess();
408 }
409 }
410 catch (Throwable x)
411 {
412 if (LOG.isDebugEnabled())
413 LOG.debug("Exception while notifying success of callback " + callback,x);
414 }
415 }
416
417 protected void onFailure(Throwable x)
418 {
419 LOG.warn(x);
420 }
421
422 @Override
423 public String toString()
424 {
425 ByteBuffer aggregate = flusher.aggregate;
426 return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
427 failure);
428 }
429 }