1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.AsynchronousCloseException;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.atomic.AtomicLong;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.eclipse.jetty.client.HttpClient;
36 import org.eclipse.jetty.client.api.Response;
37 import org.eclipse.jetty.client.api.Response.Listener;
38 import org.eclipse.jetty.client.api.Result;
39 import org.eclipse.jetty.util.IO;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public class InputStreamResponseListener extends Listener.Adapter
75 {
76 private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
77 private static final byte[] EOF = new byte[0];
78 private static final byte[] CLOSED = new byte[0];
79 private static final byte[] FAILURE = new byte[0];
80 private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
81 private final AtomicLong length = new AtomicLong();
82 private final CountDownLatch responseLatch = new CountDownLatch(1);
83 private final CountDownLatch resultLatch = new CountDownLatch(1);
84 private final AtomicReference<InputStream> stream = new AtomicReference<>();
85 private final long maxBufferSize;
86 private Response response;
87 private Result result;
88 private volatile Throwable failure;
89 private volatile boolean closed;
90
91 public InputStreamResponseListener()
92 {
93 this(16 * 1024L);
94 }
95
96 public InputStreamResponseListener(long maxBufferSize)
97 {
98 this.maxBufferSize = maxBufferSize;
99 }
100
101 @Override
102 public void onHeaders(Response response)
103 {
104 this.response = response;
105 responseLatch.countDown();
106 }
107
108 @Override
109 public void onContent(Response response, ByteBuffer content)
110 {
111
112 if (closed)
113 return;
114
115 int remaining = content.remaining();
116 byte[] bytes = new byte[remaining];
117 content.get(bytes);
118 LOG.debug("Queuing {}/{} bytes", bytes, bytes.length);
119 queue.offer(bytes);
120
121 long newLength = length.addAndGet(remaining);
122 while (newLength >= maxBufferSize)
123 {
124 LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
125
126 if (!await())
127 break;
128 newLength = length.get();
129 LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
130 }
131 }
132
133 @Override
134 public void onComplete(Result result)
135 {
136 this.result = result;
137 if (result.isSucceeded())
138 {
139 LOG.debug("Queuing end of content {}{}", EOF, "");
140 queue.offer(EOF);
141 }
142 else
143 {
144 LOG.debug("Queuing failure {} {}", FAILURE, failure);
145 queue.offer(FAILURE);
146 this.failure = result.getFailure();
147 responseLatch.countDown();
148 }
149 resultLatch.countDown();
150 signal();
151 }
152
153 protected boolean await()
154 {
155 try
156 {
157 synchronized (this)
158 {
159 while (length.get() >= maxBufferSize && failure == null && !closed)
160 wait();
161
162 return failure == null && !closed;
163 }
164 }
165 catch (InterruptedException x)
166 {
167 Thread.currentThread().interrupt();
168 return false;
169 }
170 }
171
172 protected void signal()
173 {
174 synchronized (this)
175 {
176 notifyAll();
177 }
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193 public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
194 {
195 boolean expired = !responseLatch.await(timeout, unit);
196 if (expired)
197 throw new TimeoutException();
198 if (failure != null)
199 throw new ExecutionException(failure);
200 return response;
201 }
202
203
204
205
206
207
208
209
210
211
212
213
214
215 public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
216 {
217 boolean expired = !resultLatch.await(timeout, unit);
218 if (expired)
219 throw new TimeoutException();
220 return result;
221 }
222
223
224
225
226
227
228
229
230 public InputStream getInputStream()
231 {
232 InputStream result = new Input();
233 if (stream.compareAndSet(null, result))
234 return result;
235 return IO.getClosedStream();
236 }
237
238 private class Input extends InputStream
239 {
240 private byte[] bytes;
241 private int index;
242
243 @Override
244 public int read() throws IOException
245 {
246 while (true)
247 {
248 if (bytes == EOF)
249 {
250
251
252 index = -1;
253 return -1;
254 }
255 else if (bytes == FAILURE)
256 {
257 throw failure();
258 }
259 else if (bytes == CLOSED)
260 {
261 if (index < 0)
262 return -1;
263 throw new AsynchronousCloseException();
264 }
265 else if (bytes != null)
266 {
267 int result = bytes[index] & 0xFF;
268 if (++index == bytes.length)
269 {
270 length.addAndGet(-index);
271 bytes = null;
272 index = 0;
273 signal();
274 }
275 return result;
276 }
277 else
278 {
279 bytes = take();
280 LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
281 }
282 }
283 }
284
285 private IOException failure()
286 {
287 if (failure instanceof IOException)
288 return (IOException)failure;
289 else
290 return new IOException(failure);
291 }
292
293 private byte[] take() throws IOException
294 {
295 try
296 {
297 return queue.take();
298 }
299 catch (InterruptedException x)
300 {
301 throw new InterruptedIOException();
302 }
303 }
304
305 @Override
306 public void close() throws IOException
307 {
308 LOG.debug("Queuing close {}{}", CLOSED, "");
309 queue.offer(CLOSED);
310 closed = true;
311 signal();
312 super.close();
313 }
314 }
315 }