1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.AsynchronousCloseException;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.atomic.AtomicLong;
33
34 import org.eclipse.jetty.client.HttpClient;
35 import org.eclipse.jetty.client.api.Response;
36 import org.eclipse.jetty.client.api.Result;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.log.Logger;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class InputStreamResponseListener extends Response.Listener.Empty
72 {
73 private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
74 private static final byte[] EOF = new byte[0];
75 private static final byte[] CLOSE = new byte[0];
76 private static final byte[] FAILURE = new byte[0];
77 private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
78 private final AtomicLong length = new AtomicLong();
79 private final CountDownLatch responseLatch = new CountDownLatch(1);
80 private final CountDownLatch resultLatch = new CountDownLatch(1);
81 private final long maxBufferSize;
82 private Response response;
83 private Result result;
84 private volatile Throwable failure;
85
86 public InputStreamResponseListener()
87 {
88 this(16 * 1024L);
89 }
90
91 public InputStreamResponseListener(long maxBufferSize)
92 {
93 this.maxBufferSize = maxBufferSize;
94 }
95
96 @Override
97 public void onHeaders(Response response)
98 {
99 this.response = response;
100 responseLatch.countDown();
101 }
102
103 @Override
104 public void onContent(Response response, ByteBuffer content)
105 {
106 int remaining = content.remaining();
107 byte[] bytes = new byte[remaining];
108 content.get(bytes);
109 LOG.debug("Queuing {}/{} bytes", bytes, bytes.length);
110 queue.offer(bytes);
111
112 long newLength = length.addAndGet(remaining);
113 while (newLength >= maxBufferSize)
114 {
115 LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
116 if (!await())
117 break;
118 newLength = length.get();
119 LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
120 }
121 }
122
123 @Override
124 public void onFailure(Response response, Throwable failure)
125 {
126 this.failure = failure;
127 LOG.debug("Queuing failure {} {}", FAILURE, failure);
128 queue.offer(FAILURE);
129 responseLatch.countDown();
130 }
131
132 @Override
133 public void onSuccess(Response response)
134 {
135 LOG.debug("Queuing end of content {}{}", EOF, "");
136 queue.offer(EOF);
137 }
138
139 @Override
140 public void onComplete(Result result)
141 {
142 this.result = result;
143 resultLatch.countDown();
144 }
145
146 private boolean await()
147 {
148 try
149 {
150 synchronized (this)
151 {
152 wait();
153 }
154 return true;
155 }
156 catch (InterruptedException x)
157 {
158 return false;
159 }
160 }
161
162 private void signal()
163 {
164 synchronized (this)
165 {
166 notify();
167 }
168 }
169
170 public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
171 {
172 boolean expired = !responseLatch.await(timeout, unit);
173 if (expired)
174 throw new TimeoutException();
175 if (failure != null)
176 throw new ExecutionException(failure);
177 return response;
178 }
179
180 public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
181 {
182 boolean expired = !resultLatch.await(timeout, unit);
183 if (expired)
184 throw new TimeoutException();
185 return result;
186 }
187
188 public InputStream getInputStream()
189 {
190 return new Input();
191 }
192
193 private class Input extends InputStream
194 {
195 private byte[] bytes;
196 private int index;
197
198 @Override
199 public int read() throws IOException
200 {
201 while (true)
202 {
203 if (bytes == EOF)
204 {
205
206
207 index = -1;
208 return -1;
209 }
210 else if (bytes == FAILURE)
211 {
212 throw failure();
213 }
214 else if (bytes == CLOSE)
215 {
216 if (index < 0)
217 return -1;
218 throw new AsynchronousCloseException();
219 }
220 else if (bytes != null)
221 {
222 if (index < bytes.length)
223 return bytes[index++];
224 length.addAndGet(-index);
225 bytes = null;
226 index = 0;
227 }
228 else
229 {
230 bytes = take();
231 LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
232 signal();
233 }
234 }
235 }
236
237 private IOException failure()
238 {
239 if (failure instanceof IOException)
240 return (IOException)failure;
241 else
242 return new IOException(failure);
243 }
244
245 private byte[] take() throws IOException
246 {
247 try
248 {
249 return queue.take();
250 }
251 catch (InterruptedException x)
252 {
253 throw new InterruptedIOException();
254 }
255 }
256
257 @Override
258 public void close() throws IOException
259 {
260 LOG.debug("Queuing close {}{}", CLOSE, "");
261 queue.offer(CLOSE);
262 super.close();
263 }
264 }
265 }