1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.AsynchronousCloseException;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.atomic.AtomicLong;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.eclipse.jetty.client.HttpClient;
36 import org.eclipse.jetty.client.api.Response;
37 import org.eclipse.jetty.client.api.Result;
38 import org.eclipse.jetty.util.IO;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public class InputStreamResponseListener extends Response.Listener.Empty
74 {
75 private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
76 private static final byte[] EOF = new byte[0];
77 private static final byte[] CLOSED = new byte[0];
78 private static final byte[] FAILURE = new byte[0];
79 private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
80 private final AtomicLong length = new AtomicLong();
81 private final CountDownLatch responseLatch = new CountDownLatch(1);
82 private final CountDownLatch resultLatch = new CountDownLatch(1);
83 private final AtomicReference<InputStream> stream = new AtomicReference<>();
84 private final long maxBufferSize;
85 private Response response;
86 private Result result;
87 private volatile Throwable failure;
88 private volatile boolean closed;
89
90 public InputStreamResponseListener()
91 {
92 this(16 * 1024L);
93 }
94
95 public InputStreamResponseListener(long maxBufferSize)
96 {
97 this.maxBufferSize = maxBufferSize;
98 }
99
100 @Override
101 public void onHeaders(Response response)
102 {
103 this.response = response;
104 responseLatch.countDown();
105 }
106
107 @Override
108 public void onContent(Response response, ByteBuffer content)
109 {
110
111 if (closed)
112 return;
113
114 int remaining = content.remaining();
115 byte[] bytes = new byte[remaining];
116 content.get(bytes);
117 LOG.debug("Queuing {}/{} bytes", bytes, bytes.length);
118 queue.offer(bytes);
119
120 long newLength = length.addAndGet(remaining);
121 while (newLength >= maxBufferSize)
122 {
123 LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
124
125 if (!await())
126 break;
127 newLength = length.get();
128 LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
129 }
130 }
131
132 @Override
133 public void onComplete(Result result)
134 {
135 this.result = result;
136 if (result.isSucceeded())
137 {
138 LOG.debug("Queuing end of content {}{}", EOF, "");
139 queue.offer(EOF);
140 }
141 else
142 {
143 LOG.debug("Queuing failure {} {}", FAILURE, failure);
144 queue.offer(FAILURE);
145 this.failure = result.getFailure();
146 responseLatch.countDown();
147 }
148 resultLatch.countDown();
149 signal();
150 }
151
152 protected boolean await()
153 {
154 try
155 {
156 synchronized (this)
157 {
158 while (length.get() >= maxBufferSize && failure == null && !closed)
159 wait();
160
161 return failure == null && !closed;
162 }
163 }
164 catch (InterruptedException x)
165 {
166 Thread.currentThread().interrupt();
167 return false;
168 }
169 }
170
171 protected void signal()
172 {
173 synchronized (this)
174 {
175 notifyAll();
176 }
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192 public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
193 {
194 boolean expired = !responseLatch.await(timeout, unit);
195 if (expired)
196 throw new TimeoutException();
197 if (failure != null)
198 throw new ExecutionException(failure);
199 return response;
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213
214 public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
215 {
216 boolean expired = !resultLatch.await(timeout, unit);
217 if (expired)
218 throw new TimeoutException();
219 return result;
220 }
221
222
223
224
225
226
227
228
229 public InputStream getInputStream()
230 {
231 InputStream result = new Input();
232 if (stream.compareAndSet(null, result))
233 return result;
234 return IO.getClosedStream();
235 }
236
237 private class Input extends InputStream
238 {
239 private byte[] bytes;
240 private int index;
241
242 @Override
243 public int read() throws IOException
244 {
245 while (true)
246 {
247 if (bytes == EOF)
248 {
249
250
251 index = -1;
252 return -1;
253 }
254 else if (bytes == FAILURE)
255 {
256 throw failure();
257 }
258 else if (bytes == CLOSED)
259 {
260 if (index < 0)
261 return -1;
262 throw new AsynchronousCloseException();
263 }
264 else if (bytes != null)
265 {
266 int result = bytes[index] & 0xFF;
267 if (++index == bytes.length)
268 {
269 length.addAndGet(-index);
270 bytes = null;
271 index = 0;
272 signal();
273 }
274 return result;
275 }
276 else
277 {
278 bytes = take();
279 LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
280 }
281 }
282 }
283
284 private IOException failure()
285 {
286 if (failure instanceof IOException)
287 return (IOException)failure;
288 else
289 return new IOException(failure);
290 }
291
292 private byte[] take() throws IOException
293 {
294 try
295 {
296 return queue.take();
297 }
298 catch (InterruptedException x)
299 {
300 throw new InterruptedIOException();
301 }
302 }
303
304 @Override
305 public void close() throws IOException
306 {
307 LOG.debug("Queuing close {}{}", CLOSED, "");
308 queue.offer(CLOSED);
309 closed = true;
310 signal();
311 super.close();
312 }
313 }
314 }