1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.NoSuchElementException;
29 import java.util.Objects;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.eclipse.jetty.client.AsyncContentProvider;
34 import org.eclipse.jetty.client.Synchronizable;
35 import org.eclipse.jetty.client.api.ContentProvider;
36 import org.eclipse.jetty.client.api.Request;
37 import org.eclipse.jetty.client.api.Response;
38 import org.eclipse.jetty.util.ArrayQueue;
39 import org.eclipse.jetty.util.BufferUtil;
40 import org.eclipse.jetty.util.Callback;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable
89 {
90 private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
91
92 private final Object lock = this;
93 private final ArrayQueue<Chunk> chunks = new ArrayQueue<>(4, 64, lock);
94 private final AtomicReference<Listener> listener = new AtomicReference<>();
95 private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
96 private final AtomicBoolean closed = new AtomicBoolean();
97 private long length = -1;
98 private int size;
99 private Throwable failure;
100
101
102
103
104
105
106 public DeferredContentProvider(ByteBuffer... buffers)
107 {
108 for (ByteBuffer buffer : buffers)
109 offer(buffer);
110 }
111
112 @Override
113 public void setListener(Listener listener)
114 {
115 if (!this.listener.compareAndSet(null, listener))
116 throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
117 AsyncContentProvider.class.getName()));
118
119 if (isClosed())
120 {
121 synchronized (lock)
122 {
123 long total = 0;
124 for (Chunk chunk : chunks)
125 total += chunk.buffer.remaining();
126 length = total;
127 }
128 }
129 }
130
131 @Override
132 public long getLength()
133 {
134 return length;
135 }
136
137
138
139
140
141
142
143
144 public boolean offer(ByteBuffer buffer)
145 {
146 return offer(buffer, Callback.NOOP);
147 }
148
149 public boolean offer(ByteBuffer buffer, Callback callback)
150 {
151 return offer(new Chunk(buffer, callback));
152 }
153
154 private boolean offer(Chunk chunk)
155 {
156 Throwable failure;
157 boolean result = false;
158 synchronized (lock)
159 {
160 failure = this.failure;
161 if (failure == null)
162 {
163 result = chunks.offer(chunk);
164 if (result && chunk != CLOSE)
165 ++size;
166 }
167 }
168 if (failure != null)
169 chunk.callback.failed(failure);
170 else if (result)
171 notifyListener();
172 return result;
173 }
174
175 private void clear()
176 {
177 synchronized (lock)
178 {
179 chunks.clear();
180 }
181 }
182
183 public void flush() throws IOException
184 {
185 synchronized (lock)
186 {
187 try
188 {
189 while (true)
190 {
191 if (failure != null)
192 throw new IOException(failure);
193 if (size == 0)
194 break;
195 lock.wait();
196 }
197 }
198 catch (InterruptedException x)
199 {
200 throw new InterruptedIOException();
201 }
202 }
203 }
204
205
206
207
208
209 public void close()
210 {
211 if (closed.compareAndSet(false, true))
212 offer(CLOSE);
213 }
214
215 public boolean isClosed()
216 {
217 return closed.get();
218 }
219
220 @Override
221 public void succeeded()
222 {
223 }
224
225 @Override
226 public void failed(Throwable failure)
227 {
228 iterator.failed(failure);
229 }
230
231 private void notifyListener()
232 {
233 Listener listener = this.listener.get();
234 if (listener != null)
235 listener.onContent();
236 }
237
238 @Override
239 public Iterator<ByteBuffer> iterator()
240 {
241 return iterator;
242 }
243
244 private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback, Synchronizable
245 {
246 private Chunk current;
247
248 @Override
249 public boolean hasNext()
250 {
251 synchronized (lock)
252 {
253 return chunks.peek() != CLOSE;
254 }
255 }
256
257 @Override
258 public ByteBuffer next()
259 {
260 synchronized (lock)
261 {
262 Chunk chunk = current = chunks.poll();
263 if (chunk == CLOSE)
264 {
265
266
267 chunks.add(0, CLOSE);
268 throw new NoSuchElementException();
269 }
270 return chunk == null ? null : chunk.buffer;
271 }
272 }
273
274 @Override
275 public void remove()
276 {
277 throw new UnsupportedOperationException();
278 }
279
280 @Override
281 public void succeeded()
282 {
283 Chunk chunk;
284 synchronized (lock)
285 {
286 chunk = current;
287 if (chunk != null)
288 {
289 --size;
290 lock.notify();
291 }
292 }
293 if (chunk != null)
294 chunk.callback.succeeded();
295 }
296
297 @Override
298 public void failed(Throwable x)
299 {
300 List<Chunk> chunks = new ArrayList<>();
301 synchronized (lock)
302 {
303 failure = x;
304
305 Chunk chunk = current;
306 current = null;
307 if (chunk != null)
308 chunks.add(chunk);
309 chunks.addAll(DeferredContentProvider.this.chunks);
310 clear();
311 lock.notify();
312 }
313 for (Chunk chunk : chunks)
314 chunk.callback.failed(x);
315 }
316
317 @Override
318 public Object getLock()
319 {
320 return lock;
321 }
322 }
323
324 public static class Chunk
325 {
326 public final ByteBuffer buffer;
327 public final Callback callback;
328
329 public Chunk(ByteBuffer buffer, Callback callback)
330 {
331 this.buffer = Objects.requireNonNull(buffer);
332 this.callback = Objects.requireNonNull(callback);
333 }
334
335 @Override
336 public String toString()
337 {
338 return String.format("%s@%x", getClass().getSimpleName(), hashCode());
339 }
340 }
341 }