1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.NoSuchElementException;
29 import java.util.Objects;
30 import java.util.Queue;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.eclipse.jetty.client.AsyncContentProvider;
35 import org.eclipse.jetty.client.api.ContentProvider;
36 import org.eclipse.jetty.client.api.Request;
37 import org.eclipse.jetty.client.api.Response;
38 import org.eclipse.jetty.util.ArrayQueue;
39 import org.eclipse.jetty.util.BufferUtil;
40 import org.eclipse.jetty.util.Callback;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable
89 {
90 private static final AsyncChunk CLOSE = new AsyncChunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE);
91
92 private final Object lock = this;
93 private final Queue<AsyncChunk> chunks = new ArrayQueue<>(4, 64, lock);
94 private final AtomicReference<Listener> listener = new AtomicReference<>();
95 private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
96 private final AtomicBoolean closed = new AtomicBoolean();
97 private int size;
98 private Throwable failure;
99
100
101
102
103
104
105 public DeferredContentProvider(ByteBuffer... buffers)
106 {
107 for (ByteBuffer buffer : buffers)
108 offer(buffer);
109 }
110
111 @Override
112 public void setListener(Listener listener)
113 {
114 if (!this.listener.compareAndSet(null, listener))
115 throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
116 AsyncContentProvider.class.getName()));
117 }
118
119 @Override
120 public long getLength()
121 {
122 return -1;
123 }
124
125
126
127
128
129
130
131
132 public boolean offer(ByteBuffer buffer)
133 {
134 return offer(buffer, Callback.Adapter.INSTANCE);
135 }
136
137 public boolean offer(ByteBuffer buffer, Callback callback)
138 {
139 return offer(new AsyncChunk(buffer, callback));
140 }
141
142 private boolean offer(AsyncChunk chunk)
143 {
144 Throwable failure;
145 boolean result = false;
146 synchronized (lock)
147 {
148 failure = this.failure;
149 if (failure == null)
150 {
151 result = chunks.offer(chunk);
152 if (result && chunk != CLOSE)
153 ++size;
154 }
155 }
156 if (failure != null)
157 chunk.callback.failed(failure);
158 else if (result)
159 notifyListener();
160 return result;
161 }
162
163 private void clear()
164 {
165 synchronized (lock)
166 {
167 chunks.clear();
168 }
169 }
170
171 public void flush() throws IOException
172 {
173 synchronized (lock)
174 {
175 try
176 {
177 while (true)
178 {
179 if (failure != null)
180 throw new IOException(failure);
181 if (size == 0)
182 break;
183 lock.wait();
184 }
185 }
186 catch (InterruptedException x)
187 {
188 throw new InterruptedIOException();
189 }
190 }
191 }
192
193
194
195
196
197 public void close()
198 {
199 if (closed.compareAndSet(false, true))
200 offer(CLOSE);
201 }
202
203 @Override
204 public void succeeded()
205 {
206 }
207
208 @Override
209 public void failed(Throwable failure)
210 {
211 iterator.failed(failure);
212 }
213
214 private void notifyListener()
215 {
216 Listener listener = this.listener.get();
217 if (listener != null)
218 listener.onContent();
219 }
220
221 @Override
222 public Iterator<ByteBuffer> iterator()
223 {
224 return iterator;
225 }
226
227 private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
228 {
229 private AsyncChunk current;
230
231 @Override
232 public boolean hasNext()
233 {
234 synchronized (lock)
235 {
236 return chunks.peek() != CLOSE;
237 }
238 }
239
240 @Override
241 public ByteBuffer next()
242 {
243 synchronized (lock)
244 {
245 AsyncChunk chunk = current = chunks.poll();
246 if (chunk == CLOSE)
247 throw new NoSuchElementException();
248 return chunk == null ? null : chunk.buffer;
249 }
250 }
251
252 @Override
253 public void remove()
254 {
255 throw new UnsupportedOperationException();
256 }
257
258 @Override
259 public void succeeded()
260 {
261 AsyncChunk chunk;
262 synchronized (lock)
263 {
264 chunk = current;
265 if (chunk != null)
266 {
267 --size;
268 lock.notify();
269 }
270 }
271 if (chunk != null)
272 chunk.callback.succeeded();
273 }
274
275 @Override
276 public void failed(Throwable x)
277 {
278 List<AsyncChunk> chunks = new ArrayList<>();
279 synchronized (lock)
280 {
281 failure = x;
282
283 chunks.addAll(DeferredContentProvider.this.chunks);
284 clear();
285 current = null;
286 lock.notify();
287 }
288 for (AsyncChunk chunk : chunks)
289 chunk.callback.failed(x);
290 }
291 }
292
293 private static class AsyncChunk
294 {
295 private final ByteBuffer buffer;
296 private final Callback callback;
297
298 private AsyncChunk(ByteBuffer buffer, Callback callback)
299 {
300 this.buffer = Objects.requireNonNull(buffer);
301 this.callback = Objects.requireNonNull(callback);
302 }
303 }
304 }