1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayDeque;
26 import java.util.ArrayList;
27 import java.util.Deque;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.NoSuchElementException;
31 import java.util.Objects;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.eclipse.jetty.client.AsyncContentProvider;
36 import org.eclipse.jetty.client.Synchronizable;
37 import org.eclipse.jetty.client.api.ContentProvider;
38 import org.eclipse.jetty.client.api.Request;
39 import org.eclipse.jetty.client.api.Response;
40 import org.eclipse.jetty.util.BufferUtil;
41 import org.eclipse.jetty.util.Callback;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable
90 {
91 private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
92
93 private final Object lock = this;
94 private final Deque<Chunk> chunks = new ArrayDeque<>();
95 private final AtomicReference<Listener> listener = new AtomicReference<>();
96 private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
97 private final AtomicBoolean closed = new AtomicBoolean();
98 private long length = -1;
99 private int size;
100 private Throwable failure;
101
102
103
104
105
106
107 public DeferredContentProvider(ByteBuffer... buffers)
108 {
109 for (ByteBuffer buffer : buffers)
110 offer(buffer);
111 }
112
113 @Override
114 public void setListener(Listener listener)
115 {
116 if (!this.listener.compareAndSet(null, listener))
117 throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
118 AsyncContentProvider.class.getName()));
119
120 if (isClosed())
121 {
122 synchronized (lock)
123 {
124 long total = 0;
125 for (Chunk chunk : chunks)
126 total += chunk.buffer.remaining();
127 length = total;
128 }
129 }
130 }
131
132 @Override
133 public long getLength()
134 {
135 return length;
136 }
137
138
139
140
141
142
143
144
145 public boolean offer(ByteBuffer buffer)
146 {
147 return offer(buffer, Callback.NOOP);
148 }
149
150 public boolean offer(ByteBuffer buffer, Callback callback)
151 {
152 return offer(new Chunk(buffer, callback));
153 }
154
155 private boolean offer(Chunk chunk)
156 {
157 Throwable failure;
158 boolean result = false;
159 synchronized (lock)
160 {
161 failure = this.failure;
162 if (failure == null)
163 {
164 result = chunks.offer(chunk);
165 if (result && chunk != CLOSE)
166 ++size;
167 }
168 }
169 if (failure != null)
170 chunk.callback.failed(failure);
171 else if (result)
172 notifyListener();
173 return result;
174 }
175
176 private void clear()
177 {
178 synchronized (lock)
179 {
180 chunks.clear();
181 }
182 }
183
184 public void flush() throws IOException
185 {
186 synchronized (lock)
187 {
188 try
189 {
190 while (true)
191 {
192 if (failure != null)
193 throw new IOException(failure);
194 if (size == 0)
195 break;
196 lock.wait();
197 }
198 }
199 catch (InterruptedException x)
200 {
201 throw new InterruptedIOException();
202 }
203 }
204 }
205
206
207
208
209
210 public void close()
211 {
212 if (closed.compareAndSet(false, true))
213 offer(CLOSE);
214 }
215
216 public boolean isClosed()
217 {
218 return closed.get();
219 }
220
221 @Override
222 public void failed(Throwable failure)
223 {
224 iterator.failed(failure);
225 }
226
227 private void notifyListener()
228 {
229 Listener listener = this.listener.get();
230 if (listener != null)
231 listener.onContent();
232 }
233
234 @Override
235 public Iterator<ByteBuffer> iterator()
236 {
237 return iterator;
238 }
239
240 private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback, Synchronizable
241 {
242 private Chunk current;
243
244 @Override
245 public boolean hasNext()
246 {
247 synchronized (lock)
248 {
249 return chunks.peek() != CLOSE;
250 }
251 }
252
253 @Override
254 public ByteBuffer next()
255 {
256 synchronized (lock)
257 {
258 Chunk chunk = current = chunks.poll();
259 if (chunk == CLOSE)
260 {
261
262
263 chunks.offerFirst(CLOSE);
264 throw new NoSuchElementException();
265 }
266 return chunk == null ? null : chunk.buffer;
267 }
268 }
269
270 @Override
271 public void remove()
272 {
273 throw new UnsupportedOperationException();
274 }
275
276 @Override
277 public void succeeded()
278 {
279 Chunk chunk;
280 synchronized (lock)
281 {
282 chunk = current;
283 if (chunk != null)
284 {
285 --size;
286 lock.notify();
287 }
288 }
289 if (chunk != null)
290 chunk.callback.succeeded();
291 }
292
293 @Override
294 public void failed(Throwable x)
295 {
296 List<Chunk> chunks = new ArrayList<>();
297 synchronized (lock)
298 {
299 failure = x;
300
301 Chunk chunk = current;
302 current = null;
303 if (chunk != null)
304 chunks.add(chunk);
305 chunks.addAll(DeferredContentProvider.this.chunks);
306 clear();
307 lock.notify();
308 }
309 for (Chunk chunk : chunks)
310 chunk.callback.failed(x);
311 }
312
313 @Override
314 public Object getLock()
315 {
316 return lock;
317 }
318 }
319
320 public static class Chunk
321 {
322 public final ByteBuffer buffer;
323 public final Callback callback;
324
325 public Chunk(ByteBuffer buffer, Callback callback)
326 {
327 this.buffer = Objects.requireNonNull(buffer);
328 this.callback = Objects.requireNonNull(callback);
329 }
330
331 @Override
332 public String toString()
333 {
334 return String.format("%s@%x", getClass().getSimpleName(), hashCode());
335 }
336 }
337 }