1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.io;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.LinkedList;
24 import java.util.Objects;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.eclipse.jetty.io.AbstractConnection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.Callback;
31 import org.eclipse.jetty.util.log.Log;
32 import org.eclipse.jetty.util.log.Logger;
33 import org.eclipse.jetty.websocket.api.extensions.Frame;
34 import org.eclipse.jetty.websocket.common.Generator;
35
36
37
38
39 public class WriteBytesProvider implements Callback
40 {
41 private class FrameEntry
42 {
43 protected final Frame frame;
44 protected final Callback callback;
45
46 public FrameEntry(Frame frame, Callback callback)
47 {
48 this.frame = frame;
49 this.callback = callback;
50 }
51
52 public ByteBuffer getByteBuffer()
53 {
54 ByteBuffer buffer = generator.generate(bufferSize,frame);
55 if (LOG.isDebugEnabled())
56 {
57 LOG.debug("getByteBuffer() - {}",BufferUtil.toDetailString(buffer));
58 }
59 return buffer;
60 }
61 }
62
63 private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
64
65
66 private final Generator generator;
67
68 private final Callback flushCallback;
69
70 private LinkedList<FrameEntry> queue;
71
72 private int bufferSize = 2048;
73
74 private FrameEntry active;
75
76 private Throwable failure;
77
78 private ByteBuffer buffer;
79
80 private AtomicBoolean closed;
81
82
83
84
85
86
87
88
89
90
91
92 public WriteBytesProvider(Generator generator, Callback flushCallback)
93 {
94 this.generator = Objects.requireNonNull(generator);
95 this.flushCallback = Objects.requireNonNull(flushCallback);
96 this.queue = new LinkedList<>();
97 this.closed = new AtomicBoolean(false);
98 }
99
100 public void enqueue(Frame frame, Callback callback)
101 {
102 Objects.requireNonNull(frame);
103 LOG.debug("enqueue({}, {})",frame,callback);
104 synchronized (this)
105 {
106 if (closed.get())
107 {
108
109 LOG.debug("Write is closed: {}",frame,callback);
110 if (callback != null)
111 {
112 callback.failed(new IOException("Write is closed"));
113 }
114 return;
115 }
116
117 if (isFailed())
118 {
119
120 notifyFailure(callback);
121 return;
122 }
123
124 FrameEntry entry = new FrameEntry(frame,callback);
125
126 switch (frame.getType())
127 {
128 case PING:
129 queue.addFirst(entry);
130 break;
131 case CLOSE:
132 closed.set(true);
133
134 queue.addLast(entry);
135 break;
136 default:
137 queue.addLast(entry);
138 }
139 }
140 }
141
142 public void failAll(Throwable t)
143 {
144 synchronized (this)
145 {
146 if (isFailed())
147 {
148
149 return;
150 }
151
152 failure = t;
153
154 for (FrameEntry fe : queue)
155 {
156 notifyFailure(fe.callback);
157 }
158
159 queue.clear();
160
161
162 flushCallback.failed(failure);
163 }
164 }
165
166
167
168
169
170
171
172 @Override
173 public void failed(Throwable cause)
174 {
175 failAll(cause);
176 }
177
178 public int getBufferSize()
179 {
180 return bufferSize;
181 }
182
183
184
185
186
187
188 public ByteBuffer getByteBuffer()
189 {
190 synchronized (this)
191 {
192 if (active == null)
193 {
194 if (queue.isEmpty())
195 {
196
197 return null;
198 }
199
200 active = queue.pop();
201 }
202
203 if (active == null)
204 {
205
206 return null;
207 }
208
209 buffer = active.getByteBuffer();
210 }
211 return buffer;
212 }
213
214 public Throwable getFailure()
215 {
216 return failure;
217 }
218
219
220
221
222
223
224 public boolean isClosed()
225 {
226 synchronized (this)
227 {
228 return closed.get();
229 }
230 }
231
232 public boolean isFailed()
233 {
234 return (failure != null);
235 }
236
237
238
239
240
241
242
243 private void notifyFailure(Callback callback)
244 {
245 if (callback == null)
246 {
247 return;
248 }
249 callback.failed(failure);
250 }
251
252
253
254
255
256
257
258
259
260 public void setBufferSize(int bufferSize)
261 {
262 this.bufferSize = bufferSize;
263 }
264
265
266
267
268 @Override
269 public void succeeded()
270 {
271 synchronized (this)
272 {
273
274 generator.getBufferPool().release(buffer);
275
276 if (active == null)
277 {
278 return;
279 }
280
281 if (active.frame.remaining() <= 0)
282 {
283
284 if (active.callback != null)
285 {
286 try
287 {
288
289
290 active.callback.succeeded();
291 }
292 catch (Throwable t)
293 {
294 LOG.warn("Callback failure",t);
295 }
296 }
297
298
299 active = null;
300 }
301
302
303 flushCallback.succeeded();
304 }
305 }
306
307 @Override
308 public String toString()
309 {
310 StringBuilder b = new StringBuilder();
311 b.append("WriteBytesProvider[");
312 b.append("flushCallback=").append(flushCallback);
313 if (isFailed())
314 {
315 b.append(",FAILURE=").append(failure.getClass().getName());
316 b.append(",").append(failure.getMessage());
317 }
318 else
319 {
320 b.append(",active=").append(active);
321 b.append(",queue.size=").append(queue.size());
322 }
323 b.append(']');
324 return b.toString();
325 }
326 }