1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Objects;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 import org.eclipse.jetty.io.ByteBufferPool;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.ArrayQueue;
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.IteratingCallback;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35 import org.eclipse.jetty.websocket.api.BatchMode;
36 import org.eclipse.jetty.websocket.api.WriteCallback;
37 import org.eclipse.jetty.websocket.api.extensions.Frame;
38 import org.eclipse.jetty.websocket.common.Generator;
39 import org.eclipse.jetty.websocket.common.OpCode;
40 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
41
42
43
44
45 public class FrameFlusher
46 {
47 private class Flusher extends IteratingCallback
48 {
49 private final List<FrameEntry> entries;
50 private final List<ByteBuffer> buffers;
51 private ByteBuffer aggregate;
52 private BatchMode batchMode;
53
54 public Flusher(int maxGather)
55 {
56 entries = new ArrayList<>(maxGather);
57 buffers = new ArrayList<>((maxGather * 2) + 1);
58 }
59
60 private Action batch()
61 {
62 if (aggregate == null)
63 {
64 aggregate = bufferPool.acquire(bufferSize,true);
65 if (LOG.isDebugEnabled())
66 {
67 LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
68 }
69 }
70
71
72 for (int i = 0; i < entries.size(); ++i)
73 {
74 FrameEntry entry = entries.get(i);
75
76 entry.generateHeaderBytes(aggregate);
77
78 ByteBuffer payload = entry.frame.getPayload();
79 if (BufferUtil.hasContent(payload))
80 {
81 BufferUtil.append(aggregate,payload);
82 }
83 }
84 if (LOG.isDebugEnabled())
85 {
86 LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
87 }
88 succeeded();
89 return Action.SCHEDULED;
90 }
91
92 @Override
93 protected void onCompleteSuccess()
94 {
95
96 }
97
98 @Override
99 public void onCompleteFailure(Throwable x)
100 {
101 for (FrameEntry entry : entries)
102 {
103 notifyCallbackFailure(entry.callback,x);
104 entry.release();
105 }
106 entries.clear();
107 failure = x;
108 onFailure(x);
109 }
110
111 private Action flush()
112 {
113 if (!BufferUtil.isEmpty(aggregate))
114 {
115 buffers.add(aggregate);
116 if (LOG.isDebugEnabled())
117 {
118 LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
119 }
120 }
121
122
123 for (int i = 0; i < entries.size(); ++i)
124 {
125 FrameEntry entry = entries.get(i);
126
127 if (entry.frame == FLUSH_FRAME)
128 {
129 continue;
130 }
131 buffers.add(entry.generateHeaderBytes());
132 ByteBuffer payload = entry.frame.getPayload();
133 if (BufferUtil.hasContent(payload))
134 {
135 buffers.add(payload);
136 }
137 }
138
139 if (LOG.isDebugEnabled())
140 {
141 LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
142 }
143
144 if (buffers.isEmpty())
145 {
146 releaseAggregate();
147
148 succeedEntries();
149 return Action.IDLE;
150 }
151
152 endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
153 buffers.clear();
154 return Action.SCHEDULED;
155 }
156
157 @Override
158 protected Action process() throws Exception
159 {
160 int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
161 BatchMode currentBatchMode = BatchMode.AUTO;
162 synchronized (lock)
163 {
164 while ((entries.size() <= maxGather) && !queue.isEmpty())
165 {
166 FrameEntry entry = queue.remove(0);
167 currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
168
169
170 if (entry.frame == FLUSH_FRAME)
171 {
172 currentBatchMode = BatchMode.OFF;
173 }
174
175 int payloadLength = BufferUtil.length(entry.frame.getPayload());
176 int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
177
178
179 if (approxFrameLength > (bufferSize >> 2))
180 {
181 currentBatchMode = BatchMode.OFF;
182 }
183
184
185 space -= approxFrameLength;
186 if (space <= 0)
187 {
188 currentBatchMode = BatchMode.OFF;
189 }
190
191 entries.add(entry);
192 }
193 }
194
195 if (LOG.isDebugEnabled())
196 {
197 LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
198 }
199
200 if (entries.isEmpty())
201 {
202 if (batchMode != BatchMode.AUTO)
203 {
204
205
206 releaseAggregate();
207 return Action.IDLE;
208 }
209
210 LOG.debug("{} auto flushing",FrameFlusher.this);
211 return flush();
212 }
213
214 batchMode = currentBatchMode;
215
216 return currentBatchMode == BatchMode.OFF?flush():batch();
217 }
218
219 private void releaseAggregate()
220 {
221 if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
222 {
223 bufferPool.release(aggregate);
224 aggregate = null;
225 }
226 }
227
228 @Override
229 public void succeeded()
230 {
231 succeedEntries();
232 super.succeeded();
233 }
234
235 private void succeedEntries()
236 {
237
238 for (int i = 0; i < entries.size(); ++i)
239 {
240 FrameEntry entry = entries.get(i);
241 notifyCallbackSuccess(entry.callback);
242 entry.release();
243 }
244 entries.clear();
245 }
246 }
247
248 private class FrameEntry
249 {
250 private final Frame frame;
251 private final WriteCallback callback;
252 private final BatchMode batchMode;
253 private ByteBuffer headerBuffer;
254
255 private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
256 {
257 this.frame = Objects.requireNonNull(frame);
258 this.callback = callback;
259 this.batchMode = batchMode;
260 }
261
262 private ByteBuffer generateHeaderBytes()
263 {
264 return headerBuffer = generator.generateHeaderBytes(frame);
265 }
266
267 private void generateHeaderBytes(ByteBuffer buffer)
268 {
269 generator.generateHeaderBytes(frame,buffer);
270 }
271
272 private void release()
273 {
274 if (headerBuffer != null)
275 {
276 generator.getBufferPool().release(headerBuffer);
277 headerBuffer = null;
278 }
279 }
280
281 @Override
282 public String toString()
283 {
284 return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure);
285 }
286 }
287
288 public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
289 private static final Logger LOG = Log.getLogger(FrameFlusher.class);
290 private final ByteBufferPool bufferPool;
291 private final EndPoint endpoint;
292 private final int bufferSize;
293 private final Generator generator;
294 private final int maxGather;
295 private final Object lock = new Object();
296 private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16,16,lock);
297 private final Flusher flusher;
298 private final AtomicBoolean closed = new AtomicBoolean();
299 private volatile Throwable failure;
300
301 public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
302 {
303 this.bufferPool = bufferPool;
304 this.endpoint = endpoint;
305 this.bufferSize = bufferSize;
306 this.generator = Objects.requireNonNull(generator);
307 this.maxGather = maxGather;
308 this.flusher = new Flusher(maxGather);
309 }
310
311 public void close()
312 {
313 if (closed.compareAndSet(false,true))
314 {
315 LOG.debug("{} closing {}",this);
316 EOFException eof = new EOFException("Connection has been closed locally");
317 flusher.failed(eof);
318
319
320 List<FrameEntry> entries = new ArrayList<>();
321 synchronized (lock)
322 {
323 entries.addAll(queue);
324 queue.clear();
325 }
326
327 for (FrameEntry entry : entries)
328 {
329 notifyCallbackFailure(entry.callback,eof);
330 }
331 }
332 }
333
334 public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
335 {
336 if (closed.get())
337 {
338 notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
339 return;
340 }
341 if (flusher.isFailed())
342 {
343 notifyCallbackFailure(callback,failure);
344 return;
345 }
346
347 FrameEntry entry = new FrameEntry(frame,callback,batchMode);
348
349 synchronized (lock)
350 {
351 switch (frame.getOpCode())
352 {
353 case OpCode.PING:
354 {
355
356 queue.add(0,entry);
357 break;
358 }
359 case OpCode.CLOSE:
360 {
361
362
363
364 closed.set(true);
365 queue.add(entry);
366 break;
367 }
368 default:
369 {
370 queue.add(entry);
371 break;
372 }
373 }
374 }
375
376 if (LOG.isDebugEnabled())
377 {
378 LOG.debug("{} queued {}",this,entry);
379 }
380
381 flusher.iterate();
382 }
383
384 protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
385 {
386 try
387 {
388 if (callback != null)
389 {
390 callback.writeFailed(failure);
391 }
392 }
393 catch (Throwable x)
394 {
395 if (LOG.isDebugEnabled())
396 LOG.debug("Exception while notifying failure of callback " + callback,x);
397 }
398 }
399
400 protected void notifyCallbackSuccess(WriteCallback callback)
401 {
402 try
403 {
404 if (callback != null)
405 {
406 callback.writeSuccess();
407 }
408 }
409 catch (Throwable x)
410 {
411 if (LOG.isDebugEnabled())
412 LOG.debug("Exception while notifying success of callback " + callback,x);
413 }
414 }
415
416 protected void onFailure(Throwable x)
417 {
418 LOG.warn(x);
419 }
420
421 @Override
422 public String toString()
423 {
424 ByteBuffer aggregate = flusher.aggregate;
425 return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
426 failure);
427 }
428 }