1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.AsynchronousCloseException;
26 import java.util.ArrayDeque;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.Queue;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.eclipse.jetty.client.HttpClient;
38 import org.eclipse.jetty.client.api.Response;
39 import org.eclipse.jetty.client.api.Response.Listener;
40 import org.eclipse.jetty.client.api.Result;
41 import org.eclipse.jetty.util.BufferUtil;
42 import org.eclipse.jetty.util.Callback;
43 import org.eclipse.jetty.util.IO;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class InputStreamResponseListener extends Listener.Adapter
77 {
78 private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
79 private static final DeferredContentProvider.Chunk EOF = new DeferredContentProvider.Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
80 private final Object lock = this;
81 private final CountDownLatch responseLatch = new CountDownLatch(1);
82 private final CountDownLatch resultLatch = new CountDownLatch(1);
83 private final AtomicReference<InputStream> stream = new AtomicReference<>();
84 private final Queue<DeferredContentProvider.Chunk> chunks = new ArrayDeque<>();
85 private Response response;
86 private Result result;
87 private Throwable failure;
88 private boolean closed;
89
90 public InputStreamResponseListener()
91 {
92 }
93
94
95
96
97 @Deprecated
98 public InputStreamResponseListener(long maxBufferSize)
99 {
100 }
101
102 @Override
103 public void onHeaders(Response response)
104 {
105 synchronized (lock)
106 {
107 this.response = response;
108 responseLatch.countDown();
109 }
110 }
111
112 @Override
113 public void onContent(Response response, ByteBuffer content, Callback callback)
114 {
115 if (content.remaining() == 0)
116 {
117 if (LOG.isDebugEnabled())
118 LOG.debug("Skipped empty content {}", content);
119 callback.succeeded();
120 return;
121 }
122
123 boolean closed;
124 synchronized (lock)
125 {
126 closed = this.closed;
127 if (!closed)
128 {
129 if (LOG.isDebugEnabled())
130 LOG.debug("Queueing content {}", content);
131 chunks.add(new DeferredContentProvider.Chunk(content, callback));
132 lock.notifyAll();
133 }
134 }
135
136 if (closed)
137 {
138 if (LOG.isDebugEnabled())
139 LOG.debug("InputStream closed, ignored content {}", content);
140 callback.failed(new AsynchronousCloseException());
141 }
142 }
143
144 @Override
145 public void onSuccess(Response response)
146 {
147 synchronized (lock)
148 {
149 if (!closed)
150 chunks.add(EOF);
151 lock.notifyAll();
152 }
153
154 if (LOG.isDebugEnabled())
155 LOG.debug("End of content");
156 }
157
158 @Override
159 public void onFailure(Response response, Throwable failure)
160 {
161 List<Callback> callbacks;
162 synchronized (lock)
163 {
164 if (this.failure != null)
165 return;
166 this.failure = failure;
167 callbacks = drain();
168 lock.notifyAll();
169 }
170
171 if (LOG.isDebugEnabled())
172 LOG.debug("Content failure", failure);
173
174 callbacks.forEach(callback -> callback.failed(failure));
175 }
176
177 @Override
178 public void onComplete(Result result)
179 {
180 Throwable failure = result.getFailure();
181 List<Callback> callbacks = Collections.emptyList();
182 synchronized (lock)
183 {
184 this.result = result;
185 if (result.isFailed() && this.failure == null)
186 {
187 this.failure = failure;
188 callbacks = drain();
189 }
190
191 responseLatch.countDown();
192 resultLatch.countDown();
193 lock.notifyAll();
194 }
195
196 if (LOG.isDebugEnabled())
197 {
198 if (failure == null)
199 LOG.debug("Result success");
200 else
201 LOG.debug("Result failure", failure);
202 }
203
204 callbacks.forEach(callback -> callback.failed(failure));
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220 public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
221 {
222 boolean expired = !responseLatch.await(timeout, unit);
223 if (expired)
224 throw new TimeoutException();
225 synchronized (lock)
226 {
227
228 if (response == null)
229 throw new ExecutionException(failure);
230 return response;
231 }
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245
246 public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
247 {
248 boolean expired = !resultLatch.await(timeout, unit);
249 if (expired)
250 throw new TimeoutException();
251 synchronized (lock)
252 {
253 return result;
254 }
255 }
256
257
258
259
260
261
262
263
264 public InputStream getInputStream()
265 {
266 InputStream result = new Input();
267 if (stream.compareAndSet(null, result))
268 return result;
269 return IO.getClosedStream();
270 }
271
272 private List<Callback> drain()
273 {
274 List<Callback> callbacks = new ArrayList<>();
275 synchronized (lock)
276 {
277 while (true)
278 {
279 DeferredContentProvider.Chunk chunk = chunks.peek();
280 if (chunk == null || chunk == EOF)
281 break;
282 callbacks.add(chunk.callback);
283 chunks.poll();
284 }
285 }
286 return callbacks;
287 }
288
289 private class Input extends InputStream
290 {
291 @Override
292 public int read() throws IOException
293 {
294 byte[] tmp = new byte[1];
295 int read = read(tmp);
296 if (read < 0)
297 return read;
298 return tmp[0] & 0xFF;
299 }
300
301 @Override
302 public int read(byte[] b, int offset, int length) throws IOException
303 {
304 try
305 {
306 int result;
307 Callback callback = null;
308 synchronized (lock)
309 {
310 DeferredContentProvider.Chunk chunk;
311 while (true)
312 {
313 chunk = chunks.peek();
314 if (chunk == EOF)
315 return -1;
316
317 if (chunk != null)
318 break;
319
320 if (failure != null)
321 throw toIOException(failure);
322
323 if (closed)
324 throw new AsynchronousCloseException();
325
326 lock.wait();
327 }
328
329 ByteBuffer buffer = chunk.buffer;
330 result = Math.min(buffer.remaining(), length);
331 buffer.get(b, offset, result);
332 if (!buffer.hasRemaining())
333 {
334 callback = chunk.callback;
335 chunks.poll();
336 }
337 }
338 if (callback != null)
339 callback.succeeded();
340 return result;
341 }
342 catch (InterruptedException x)
343 {
344 throw new InterruptedIOException();
345 }
346 }
347
348 private IOException toIOException(Throwable failure)
349 {
350 if (failure instanceof IOException)
351 return (IOException)failure;
352 else
353 return new IOException(failure);
354 }
355
356 @Override
357 public void close() throws IOException
358 {
359 List<Callback> callbacks;
360 synchronized (lock)
361 {
362 if (closed)
363 return;
364 closed = true;
365 callbacks = drain();
366 lock.notifyAll();
367 }
368
369 if (LOG.isDebugEnabled())
370 LOG.debug("InputStream close");
371
372 Throwable failure = new AsynchronousCloseException();
373 callbacks.forEach(callback -> callback.failed(failure));
374
375 super.close();
376 }
377 }
378 }