1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.AsynchronousCloseException;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.atomic.AtomicLong;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.eclipse.jetty.client.HttpClient;
36 import org.eclipse.jetty.client.api.Response;
37 import org.eclipse.jetty.client.api.Response.Listener;
38 import org.eclipse.jetty.client.api.Result;
39 import org.eclipse.jetty.util.IO;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public class InputStreamResponseListener extends Listener.Adapter
75 {
76 private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
77 private static final byte[] EOF = new byte[0];
78 private static final byte[] CLOSED = new byte[0];
79 private static final byte[] FAILURE = new byte[0];
80 private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
81 private final AtomicLong length = new AtomicLong();
82 private final CountDownLatch responseLatch = new CountDownLatch(1);
83 private final CountDownLatch resultLatch = new CountDownLatch(1);
84 private final AtomicReference<InputStream> stream = new AtomicReference<>();
85 private final long maxBufferSize;
86 private Response response;
87 private Result result;
88 private volatile Throwable failure;
89 private volatile boolean closed;
90
91 public InputStreamResponseListener()
92 {
93 this(16 * 1024L);
94 }
95
96 public InputStreamResponseListener(long maxBufferSize)
97 {
98 this.maxBufferSize = maxBufferSize;
99 }
100
101 @Override
102 public void onHeaders(Response response)
103 {
104 this.response = response;
105 responseLatch.countDown();
106 }
107
108 @Override
109 public void onContent(Response response, ByteBuffer content)
110 {
111 if (!closed)
112 {
113 int remaining = content.remaining();
114 if (remaining > 0)
115 {
116
117 byte[] bytes = new byte[remaining];
118 content.get(bytes);
119 if (LOG.isDebugEnabled())
120 LOG.debug("Queuing {}/{} bytes", bytes, remaining);
121 queue.offer(bytes);
122
123 long newLength = length.addAndGet(remaining);
124 while (newLength >= maxBufferSize)
125 {
126 if (LOG.isDebugEnabled())
127 LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
128
129 if (!await())
130 break;
131 newLength = length.get();
132 if (LOG.isDebugEnabled())
133 LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
134 }
135 }
136 else
137 {
138 if (LOG.isDebugEnabled())
139 LOG.debug("Queuing skipped, empty content {}", content);
140 }
141 }
142 else
143 {
144 LOG.debug("Queuing skipped, stream already closed");
145 }
146 }
147
148 @Override
149 public void onSuccess(Response response)
150 {
151 if (LOG.isDebugEnabled())
152 LOG.debug("Queuing end of content {}{}", EOF, "");
153 queue.offer(EOF);
154 signal();
155 }
156
157 @Override
158 public void onFailure(Response response, Throwable failure)
159 {
160 fail(failure);
161 signal();
162 }
163
164 @Override
165 public void onComplete(Result result)
166 {
167 if (result.isFailed() && failure == null)
168 fail(result.getFailure());
169 this.result = result;
170 resultLatch.countDown();
171 signal();
172 }
173
174 private void fail(Throwable failure)
175 {
176 if (LOG.isDebugEnabled())
177 LOG.debug("Queuing failure {} {}", FAILURE, failure);
178 queue.offer(FAILURE);
179 this.failure = failure;
180 responseLatch.countDown();
181 }
182
183 protected boolean await()
184 {
185 try
186 {
187 synchronized (this)
188 {
189 while (length.get() >= maxBufferSize && failure == null && !closed)
190 wait();
191
192 return failure == null && !closed;
193 }
194 }
195 catch (InterruptedException x)
196 {
197 Thread.currentThread().interrupt();
198 return false;
199 }
200 }
201
202 protected void signal()
203 {
204 synchronized (this)
205 {
206 notifyAll();
207 }
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223 public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
224 {
225 boolean expired = !responseLatch.await(timeout, unit);
226 if (expired)
227 throw new TimeoutException();
228 if (failure != null)
229 throw new ExecutionException(failure);
230 return response;
231 }
232
233
234
235
236
237
238
239
240
241
242
243
244
245 public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
246 {
247 boolean expired = !resultLatch.await(timeout, unit);
248 if (expired)
249 throw new TimeoutException();
250 return result;
251 }
252
253
254
255
256
257
258
259
260 public InputStream getInputStream()
261 {
262 InputStream result = new Input();
263 if (stream.compareAndSet(null, result))
264 return result;
265 return IO.getClosedStream();
266 }
267
268 private class Input extends InputStream
269 {
270 private byte[] bytes;
271 private int index;
272
273 @Override
274 public int read() throws IOException
275 {
276 while (true)
277 {
278 if (bytes == EOF)
279 {
280
281
282 index = -1;
283 return -1;
284 }
285 else if (bytes == FAILURE)
286 {
287 throw failure();
288 }
289 else if (bytes == CLOSED)
290 {
291 if (index < 0)
292 return -1;
293 throw new AsynchronousCloseException();
294 }
295 else if (bytes != null)
296 {
297 int result = bytes[index] & 0xFF;
298 if (++index == bytes.length)
299 {
300 length.addAndGet(-index);
301 bytes = null;
302 index = 0;
303 signal();
304 }
305 return result;
306 }
307 else
308 {
309 bytes = take();
310 if (LOG.isDebugEnabled())
311 LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
312 }
313 }
314 }
315
316 private IOException failure()
317 {
318 if (failure instanceof IOException)
319 return (IOException)failure;
320 else
321 return new IOException(failure);
322 }
323
324 private byte[] take() throws IOException
325 {
326 try
327 {
328 return queue.take();
329 }
330 catch (InterruptedException x)
331 {
332 throw new InterruptedIOException();
333 }
334 }
335
336 @Override
337 public void close() throws IOException
338 {
339 if (!closed)
340 {
341 super.close();
342 if (LOG.isDebugEnabled())
343 LOG.debug("Queuing close {}{}", CLOSED, "");
344 queue.offer(CLOSED);
345 closed = true;
346 signal();
347 }
348 }
349 }
350 }