1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.AsynchronousCloseException;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.atomic.AtomicLong;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.eclipse.jetty.client.api.Response;
36 import org.eclipse.jetty.client.api.Response.Listener;
37 import org.eclipse.jetty.client.api.Result;
38 import org.eclipse.jetty.util.IO;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public class InputStreamResponseListener extends Listener.Adapter
74 {
75 private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
76 private static final byte[] EOF = new byte[0];
77 private static final byte[] CLOSED = new byte[0];
78 private static final byte[] FAILURE = new byte[0];
79 private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
80 private final AtomicLong length = new AtomicLong();
81 private final CountDownLatch responseLatch = new CountDownLatch(1);
82 private final CountDownLatch resultLatch = new CountDownLatch(1);
83 private final AtomicReference<InputStream> stream = new AtomicReference<>();
84 private final long maxBufferSize;
85 private Response response;
86 private Result result;
87 private volatile Throwable failure;
88 private volatile boolean closed;
89
90 public InputStreamResponseListener()
91 {
92 this(16 * 1024L);
93 }
94
95 public InputStreamResponseListener(long maxBufferSize)
96 {
97 this.maxBufferSize = maxBufferSize;
98 }
99
100 @Override
101 public void onHeaders(Response response)
102 {
103 this.response = response;
104 responseLatch.countDown();
105 }
106
107 @Override
108 public void onContent(Response response, ByteBuffer content)
109 {
110 if (!closed)
111 {
112 int remaining = content.remaining();
113 if (remaining > 0)
114 {
115
116 byte[] bytes = new byte[remaining];
117 content.get(bytes);
118 if (LOG.isDebugEnabled())
119 LOG.debug("Queuing {}/{} bytes", bytes, remaining);
120 queue.offer(bytes);
121
122 long newLength = length.addAndGet(remaining);
123 while (newLength >= maxBufferSize)
124 {
125 if (LOG.isDebugEnabled())
126 LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
127
128 if (!await())
129 break;
130 newLength = length.get();
131 if (LOG.isDebugEnabled())
132 LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
133 }
134 }
135 else
136 {
137 if (LOG.isDebugEnabled())
138 LOG.debug("Queuing skipped, empty content {}", content);
139 }
140 }
141 else
142 {
143 LOG.debug("Queuing skipped, stream already closed");
144 }
145 }
146
147 @Override
148 public void onSuccess(Response response)
149 {
150 if (LOG.isDebugEnabled())
151 LOG.debug("Queuing end of content {}{}", EOF, "");
152 queue.offer(EOF);
153 signal();
154 }
155
156 @Override
157 public void onFailure(Response response, Throwable failure)
158 {
159 fail(failure);
160 signal();
161 }
162
163 @Override
164 public void onComplete(Result result)
165 {
166 if (result.isFailed() && failure == null)
167 fail(result.getFailure());
168 this.result = result;
169 resultLatch.countDown();
170 signal();
171 }
172
173 private void fail(Throwable failure)
174 {
175 if (LOG.isDebugEnabled())
176 LOG.debug("Queuing failure {} {}", FAILURE, failure);
177 queue.offer(FAILURE);
178 this.failure = failure;
179 responseLatch.countDown();
180 }
181
182 protected boolean await()
183 {
184 try
185 {
186 synchronized (this)
187 {
188 while (length.get() >= maxBufferSize && failure == null && !closed)
189 wait();
190
191 return failure == null && !closed;
192 }
193 }
194 catch (InterruptedException x)
195 {
196 Thread.currentThread().interrupt();
197 return false;
198 }
199 }
200
201 protected void signal()
202 {
203 synchronized (this)
204 {
205 notifyAll();
206 }
207 }
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222 public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
223 {
224 boolean expired = !responseLatch.await(timeout, unit);
225 if (expired)
226 throw new TimeoutException();
227 if (failure != null)
228 throw new ExecutionException(failure);
229 return response;
230 }
231
232
233
234
235
236
237
238
239
240
241
242
243
244 public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
245 {
246 boolean expired = !resultLatch.await(timeout, unit);
247 if (expired)
248 throw new TimeoutException();
249 return result;
250 }
251
252
253
254
255
256
257
258
259 public InputStream getInputStream()
260 {
261 InputStream result = new Input();
262 if (stream.compareAndSet(null, result))
263 return result;
264 return IO.getClosedStream();
265 }
266
267 private class Input extends InputStream
268 {
269 private byte[] bytes;
270 private int index;
271
272 @Override
273 public int read() throws IOException
274 {
275 while (true)
276 {
277 if (bytes == EOF)
278 {
279
280
281 index = -1;
282 return -1;
283 }
284 else if (bytes == FAILURE)
285 {
286 throw failure();
287 }
288 else if (bytes == CLOSED)
289 {
290 if (index < 0)
291 return -1;
292 throw new AsynchronousCloseException();
293 }
294 else if (bytes != null)
295 {
296 int result = bytes[index] & 0xFF;
297 if (++index == bytes.length)
298 {
299 length.addAndGet(-index);
300 bytes = null;
301 index = 0;
302 signal();
303 }
304 return result;
305 }
306 else
307 {
308 bytes = take();
309 if (LOG.isDebugEnabled())
310 LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
311 }
312 }
313 }
314
315 private IOException failure()
316 {
317 if (failure instanceof IOException)
318 return (IOException)failure;
319 else
320 return new IOException(failure);
321 }
322
323 private byte[] take() throws IOException
324 {
325 try
326 {
327 return queue.take();
328 }
329 catch (InterruptedException x)
330 {
331 throw new InterruptedIOException();
332 }
333 }
334
335 @Override
336 public void close() throws IOException
337 {
338 if (!closed)
339 {
340 super.close();
341 if (LOG.isDebugEnabled())
342 LOG.debug("Queuing close {}{}", CLOSED, "");
343 queue.offer(CLOSED);
344 closed = true;
345 signal();
346 }
347 }
348 }
349 }