1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.LinkedList;
25 import java.util.Objects;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 import org.eclipse.jetty.io.AbstractConnection;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.BufferUtil;
31 import org.eclipse.jetty.util.Callback;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34 import org.eclipse.jetty.websocket.api.extensions.Frame;
35 import org.eclipse.jetty.websocket.common.Generator;
36
37
38
39
40 public class WriteBytesProvider implements Callback
41 {
42 private class FrameEntry
43 {
44 protected final AtomicBoolean failed = new AtomicBoolean(false);
45 protected final Frame frame;
46 protected final Callback callback;
47
48 public FrameEntry(Frame frame, Callback callback)
49 {
50 this.frame = frame;
51 this.callback = callback;
52 }
53
54 public ByteBuffer getByteBuffer()
55 {
56 ByteBuffer buffer = generator.generate(bufferSize,frame);
57 if (LOG.isDebugEnabled())
58 {
59 LOG.debug("getByteBuffer() - {}",BufferUtil.toDetailString(buffer));
60 }
61 return buffer;
62 }
63
64 public void notifyFailure(Throwable t)
65 {
66 if (failed.getAndSet(true) == false)
67 {
68 notifySafeFailure(callback,t);
69 }
70 }
71 }
72
73 private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
74
75
76 private final Generator generator;
77
78 private final Callback flushCallback;
79
80 private LinkedList<FrameEntry> queue;
81
82 private int bufferSize = 2048;
83
84 private FrameEntry active;
85
86 private Throwable failure;
87
88 private ByteBuffer buffer;
89
90 private AtomicBoolean closed;
91
92
93
94
95
96
97
98
99
100
101
102 public WriteBytesProvider(Generator generator, Callback flushCallback)
103 {
104 this.generator = Objects.requireNonNull(generator);
105 this.flushCallback = Objects.requireNonNull(flushCallback);
106 this.queue = new LinkedList<>();
107 this.closed = new AtomicBoolean(false);
108 }
109
110
111
112
113 public void close()
114 {
115
116 this.closed.set(true);
117
118 failAll(new EOFException("Connection has been disconnected"));
119 }
120
121 public void enqueue(Frame frame, Callback callback)
122 {
123 Objects.requireNonNull(frame);
124 LOG.debug("enqueue({}, {})",frame,callback);
125 synchronized (this)
126 {
127 if (closed.get())
128 {
129
130 LOG.debug("Write is closed: {} {}",frame,callback);
131 if (callback != null)
132 {
133 callback.failed(new IOException("Write is closed"));
134 }
135 return;
136 }
137
138 if (failure != null)
139 {
140
141 LOG.debug("Write is in failure: {} {}",frame,callback);
142 notifySafeFailure(callback,failure);
143 return;
144 }
145
146 FrameEntry entry = new FrameEntry(frame,callback);
147
148 switch (frame.getType())
149 {
150 case PING:
151 queue.addFirst(entry);
152 break;
153 case CLOSE:
154 closed.set(true);
155
156 queue.addLast(entry);
157 break;
158 default:
159 queue.addLast(entry);
160 }
161 }
162 }
163
164 public void failAll(Throwable t)
165 {
166 synchronized (this)
167 {
168 boolean notified = false;
169
170
171 if (active != null)
172 {
173 active.notifyFailure(t);
174 notified = true;
175 }
176
177 failure = t;
178
179
180 for (FrameEntry fe : queue)
181 {
182 fe.notifyFailure(t);
183 notified = true;
184 }
185
186 queue.clear();
187
188 if (notified)
189 {
190
191 flushCallback.failed(t);
192 }
193 }
194 }
195
196
197
198
199
200
201
202
203
204 @Override
205 public void failed(Throwable cause)
206 {
207 failAll(cause);
208 }
209
210 public int getBufferSize()
211 {
212 return bufferSize;
213 }
214
215
216
217
218
219
220 public ByteBuffer getByteBuffer()
221 {
222 synchronized (this)
223 {
224 if (active == null)
225 {
226 if (queue.isEmpty())
227 {
228
229 return null;
230 }
231
232 active = queue.pop();
233 }
234
235 if (active == null)
236 {
237
238 return null;
239 }
240
241 buffer = active.getByteBuffer();
242 }
243 return buffer;
244 }
245
246
247
248
249
250
251 public boolean isClosed()
252 {
253 synchronized (this)
254 {
255 return closed.get();
256 }
257 }
258
259 private void notifySafeFailure(Callback callback, Throwable t)
260 {
261 if (callback == null)
262 {
263 return;
264 }
265 try
266 {
267 callback.failed(t);
268 }
269 catch (Throwable e)
270 {
271 LOG.warn("Uncaught exception",e);
272 }
273 }
274
275
276
277
278
279
280
281
282
283 public void setBufferSize(int bufferSize)
284 {
285 this.bufferSize = bufferSize;
286 }
287
288
289
290
291 @Override
292 public void succeeded()
293 {
294 Callback successCallback = null;
295
296 synchronized (this)
297 {
298
299 generator.getBufferPool().release(buffer);
300
301 if (active == null)
302 {
303 return;
304 }
305
306 if (active.frame.remaining() <= 0)
307 {
308
309 successCallback = active.callback;
310
311 active = null;
312 }
313
314
315 flushCallback.succeeded();
316 }
317
318
319 if (successCallback != null)
320 {
321 try
322 {
323
324 successCallback.succeeded();
325 }
326 catch (Throwable t)
327 {
328 LOG.warn("Callback failure",t);
329 }
330 }
331 }
332
333 @Override
334 public String toString()
335 {
336 StringBuilder b = new StringBuilder();
337 b.append("WriteBytesProvider[");
338 b.append("flushCallback=").append(flushCallback);
339 if (failure != null)
340 {
341 b.append(",failure=").append(failure.getClass().getName());
342 b.append(":").append(failure.getMessage());
343 }
344 else
345 {
346 b.append(",active=").append(active);
347 b.append(",queue.size=").append(queue.size());
348 }
349 b.append(']');
350 return b.toString();
351 }
352 }