1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Objects;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.eclipse.jetty.io.AbstractConnection;
31 import org.eclipse.jetty.io.EndPoint;
32 import org.eclipse.jetty.util.Callback;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35 import org.eclipse.jetty.websocket.api.extensions.Frame;
36 import org.eclipse.jetty.websocket.common.Generator;
37 import org.eclipse.jetty.websocket.common.OpCode;
38 import org.eclipse.jetty.websocket.common.frames.DataFrame;
39
40
41
42
43 public class WriteBytesProvider implements Callback
44 {
45 private class FrameEntry
46 {
47 protected final AtomicBoolean failed = new AtomicBoolean(false);
48 protected final Frame frame;
49 protected final Callback callback;
50
51 private ByteBuffer headerBuffer;
52
53 public FrameEntry(Frame frame, Callback callback)
54 {
55 this.frame = frame;
56 this.callback = callback;
57 }
58
59 public ByteBuffer getHeaderBytes()
60 {
61 ByteBuffer buf = generator.generateHeaderBytes(frame);
62 headerBuffer = buf;
63 return buf;
64 }
65
66 public ByteBuffer getPayloadWindow()
67 {
68
69 return generator.getPayloadWindow(bufferSize,frame);
70 }
71
72 public void notifyFailure(Throwable t)
73 {
74 freeBuffers();
75 if (failed.getAndSet(true) == false)
76 {
77 notifySafeFailure(callback,t);
78 }
79 }
80
81 public void notifySucceeded()
82 {
83 freeBuffers();
84 if (callback == null)
85 {
86 return;
87 }
88 try
89 {
90 callback.succeeded();
91 }
92 catch (Throwable t)
93 {
94 LOG.debug(t);
95 }
96 }
97
98 public void freeBuffers()
99 {
100 if (headerBuffer != null)
101 {
102 generator.getBufferPool().release(headerBuffer);
103 headerBuffer = null;
104 }
105 releasePayloadBuffer(frame);
106 }
107
108
109
110
111 public boolean isDone()
112 {
113 return frame.remaining() <= 0;
114 }
115 }
116
117 private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
118
119
120 private final Generator generator;
121
122 private final Callback flushCallback;
123
124 private LinkedList<FrameEntry> queue;
125
126 private int bufferSize = 2048;
127
128 private int gatheredBufferLimit = 10;
129
130 private LinkedList<FrameEntry> past;
131
132 private FrameEntry active;
133
134 private Throwable failure;
135
136 private AtomicBoolean closed;
137
138
139
140
141
142
143
144
145
146
147
148 public WriteBytesProvider(Generator generator, Callback flushCallback)
149 {
150 this.generator = Objects.requireNonNull(generator);
151 this.flushCallback = Objects.requireNonNull(flushCallback);
152 this.queue = new LinkedList<>();
153 this.past = new LinkedList<>();
154 this.closed = new AtomicBoolean(false);
155 }
156
157
158
159
160 public void close()
161 {
162 LOG.debug(".close()");
163
164 this.closed.set(true);
165
166 failAll(new EOFException("Connection has been disconnected"));
167 }
168
169 public void enqueue(Frame frame, Callback callback)
170 {
171 Objects.requireNonNull(frame);
172 LOG.debug("enqueue({}, {})",frame,callback);
173 synchronized (this)
174 {
175 if (closed.get())
176 {
177
178 LOG.debug("Write is closed: {} {}",frame,callback);
179 if (callback != null)
180 {
181 callback.failed(new IOException("Write is closed"));
182 }
183 return;
184 }
185
186 if (failure != null)
187 {
188
189 LOG.debug("Write is in failure: {} {}",frame,callback);
190 notifySafeFailure(callback,failure);
191 return;
192 }
193
194 FrameEntry entry = new FrameEntry(frame,callback);
195
196 switch (frame.getOpCode())
197 {
198 case OpCode.PING:
199 queue.addFirst(entry);
200 break;
201 case OpCode.CLOSE:
202 closed.set(true);
203
204 queue.addLast(entry);
205 break;
206 default:
207 queue.addLast(entry);
208 }
209 }
210 }
211
212 public void failAll(Throwable t)
213 {
214
215 List<FrameEntry> callbacks = new ArrayList<>();
216
217 synchronized (this)
218 {
219
220 if (active != null)
221 {
222 FrameEntry entry = active;
223 active = null;
224 callbacks.add(entry);
225 }
226
227 callbacks.addAll(past);
228 callbacks.addAll(queue);
229
230 past.clear();
231 queue.clear();
232 }
233
234
235 if (!callbacks.isEmpty())
236 {
237
238 flushCallback.failed(t);
239
240
241 for (FrameEntry entry : callbacks)
242 {
243 entry.notifyFailure(t);
244 }
245 }
246 }
247
248
249
250
251
252
253
254
255
256 @Override
257 public void failed(Throwable cause)
258 {
259 failAll(cause);
260 }
261
262 public int getBufferSize()
263 {
264 return bufferSize;
265 }
266
267
268
269
270
271
272 public List<ByteBuffer> getByteBuffers()
273 {
274 List<ByteBuffer> bufs = null;
275 int count = 0;
276 synchronized (this)
277 {
278 for (; count < gatheredBufferLimit; count++)
279 {
280 if (active == null)
281 {
282 if (queue.isEmpty())
283 {
284
285 return bufs;
286 }
287
288
289 active = queue.pop();
290
291
292 if (bufs == null)
293 {
294 bufs = new ArrayList<>();
295 }
296 bufs.add(active.getHeaderBytes());
297 count++;
298 }
299
300
301 if (bufs == null)
302 {
303 bufs = new ArrayList<>();
304 }
305 bufs.add(active.getPayloadWindow());
306 if (active.isDone())
307 {
308 past.add(active);
309 active = null;
310 }
311 }
312 }
313
314 LOG.debug("Collected {} ByteBuffers",bufs.size());
315 return bufs;
316 }
317
318
319
320
321
322
323 public boolean isClosed()
324 {
325 synchronized (this)
326 {
327 return closed.get();
328 }
329 }
330
331 private void notifySafeFailure(Callback callback, Throwable t)
332 {
333 if (callback == null)
334 {
335 return;
336 }
337 try
338 {
339 callback.failed(t);
340 }
341 catch (Throwable e)
342 {
343 LOG.warn("Uncaught exception",e);
344 }
345 }
346
347 public void releasePayloadBuffer(Frame frame)
348 {
349 if (!frame.hasPayload())
350 {
351 return;
352 }
353
354 if (frame instanceof DataFrame)
355 {
356 DataFrame data = (DataFrame)frame;
357 if (data.isPooledBuffer())
358 {
359 ByteBuffer payload = frame.getPayload();
360 generator.getBufferPool().release(payload);
361 }
362 }
363 }
364
365
366
367
368
369
370
371
372
373 public void setBufferSize(int bufferSize)
374 {
375 this.bufferSize = bufferSize;
376 }
377
378
379
380
381 @Override
382 public void succeeded()
383 {
384
385 List<FrameEntry> callbacks = new ArrayList<>();
386
387 synchronized (this)
388 {
389 if ((active != null) && (active.frame.remaining() <= 0))
390 {
391
392 FrameEntry entry = active;
393 active = null;
394 callbacks.add(entry);
395 }
396
397 callbacks.addAll(past);
398 past.clear();
399 }
400
401
402 flushCallback.succeeded();
403
404
405 for (FrameEntry entry : callbacks)
406 {
407 entry.notifySucceeded();
408 }
409 }
410
411 @Override
412 public String toString()
413 {
414 StringBuilder b = new StringBuilder();
415 b.append("WriteBytesProvider[");
416 b.append("flushCallback=").append(flushCallback);
417 if (failure != null)
418 {
419 b.append(",failure=").append(failure.getClass().getName());
420 b.append(":").append(failure.getMessage());
421 }
422 else
423 {
424 b.append(",active=").append(active);
425 b.append(",queue.size=").append(queue.size());
426 b.append(",past.size=").append(past.size());
427 }
428 b.append(']');
429 return b.toString();
430 }
431 }