View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.AsynchronousCloseException;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.TimeoutException;
32  import java.util.concurrent.atomic.AtomicLong;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.eclipse.jetty.client.HttpClient;
36  import org.eclipse.jetty.client.api.Response;
37  import org.eclipse.jetty.client.api.Response.Listener;
38  import org.eclipse.jetty.client.api.Result;
39  import org.eclipse.jetty.util.IO;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  
43  /**
44   * Implementation of {@link Listener} that produces an {@link InputStream}
45   * that allows applications to read the response content.
46   * <p>
47   * Typical usage is:
48   * <pre>
49   * InputStreamResponseListener listener = new InputStreamResponseListener();
50   * client.newRequest(...).send(listener);
51   *
52   * // Wait for the response headers to arrive
53   * Response response = listener.get(5, TimeUnit.SECONDS);
54   * if (response.getStatus() == 200)
55   * {
56   *     // Obtain the input stream on the response content
57   *     try (InputStream input = listener.getInputStream())
58   *     {
59   *         // Read the response content
60   *     }
61   * }
62   * </pre>
63   * <p>
64   * The {@link HttpClient} implementation (the producer) will feed the input stream
65   * asynchronously while the application (the consumer) is reading from it.
66   * Chunks of content are maintained in a queue, and it is possible to specify a
67   * maximum buffer size for the bytes held in the queue, by default 16384 bytes.
68   * <p>
69   * If the consumer is faster than the producer, then the consumer will block
70   * with the typical {@link InputStream#read()} semantic.
71   * If the consumer is slower than the producer, then the producer will block
72   * until the client consumes.
73   */
74  public class InputStreamResponseListener extends Listener.Adapter
75  {
76      private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
77      private static final byte[] EOF = new byte[0];
78      private static final byte[] CLOSED = new byte[0];
79      private static final byte[] FAILURE = new byte[0];
80      private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
81      private final AtomicLong length = new AtomicLong();
82      private final CountDownLatch responseLatch = new CountDownLatch(1);
83      private final CountDownLatch resultLatch = new CountDownLatch(1);
84      private final AtomicReference<InputStream> stream = new AtomicReference<>();
85      private final long maxBufferSize;
86      private Response response;
87      private Result result;
88      private volatile Throwable failure;
89      private volatile boolean closed;
90  
91      public InputStreamResponseListener()
92      {
93          this(16 * 1024L);
94      }
95  
96      public InputStreamResponseListener(long maxBufferSize)
97      {
98          this.maxBufferSize = maxBufferSize;
99      }
100 
101     @Override
102     public void onHeaders(Response response)
103     {
104         this.response = response;
105         responseLatch.countDown();
106     }
107 
108     @Override
109     public void onContent(Response response, ByteBuffer content)
110     {
111         if (!closed)
112         {
113             int remaining = content.remaining();
114             if (remaining > 0)
115             {
116 
117                 byte[] bytes = new byte[remaining];
118                 content.get(bytes);
119                 if (LOG.isDebugEnabled())
120                     LOG.debug("Queuing {}/{} bytes", bytes, remaining);
121                 queue.offer(bytes);
122 
123                 long newLength = length.addAndGet(remaining);
124                 while (newLength >= maxBufferSize)
125                 {
126                     if (LOG.isDebugEnabled())
127                         LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
128                     // Block to avoid infinite buffering
129                     if (!await())
130                         break;
131                     newLength = length.get();
132                     if (LOG.isDebugEnabled())
133                         LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
134                 }
135             }
136             else
137             {
138                 if (LOG.isDebugEnabled())
139                     LOG.debug("Queuing skipped, empty content {}", content);
140             }
141         }
142         else
143         {
144             LOG.debug("Queuing skipped, stream already closed");
145         }
146     }
147 
148     @Override
149     public void onSuccess(Response response)
150     {
151         if (LOG.isDebugEnabled())
152             LOG.debug("Queuing end of content {}{}", EOF, "");
153         queue.offer(EOF);
154         signal();
155     }
156 
157     @Override
158     public void onFailure(Response response, Throwable failure)
159     {
160         fail(failure);
161         signal();
162     }
163 
164     @Override
165     public void onComplete(Result result)
166     {
167         if (result.isFailed() && failure == null)
168             fail(result.getFailure());
169         this.result = result;
170         resultLatch.countDown();
171         signal();
172     }
173 
174     private void fail(Throwable failure)
175     {
176         if (LOG.isDebugEnabled())
177             LOG.debug("Queuing failure {} {}", FAILURE, failure);
178         queue.offer(FAILURE);
179         this.failure = failure;
180         responseLatch.countDown();
181     }
182 
183     protected boolean await()
184     {
185         try
186         {
187             synchronized (this)
188             {
189                 while (length.get() >= maxBufferSize && failure == null && !closed)
190                     wait();
191                 // Re-read the values as they may have changed while waiting.
192                 return failure == null && !closed;
193             }
194         }
195         catch (InterruptedException x)
196         {
197             Thread.currentThread().interrupt();
198             return false;
199         }
200     }
201 
202     protected void signal()
203     {
204         synchronized (this)
205         {
206             notifyAll();
207         }
208     }
209 
210     /**
211      * Waits for the given timeout for the response to be available, then returns it.
212      * <p>
213      * The wait ends as soon as all the HTTP headers have been received, without waiting for the content.
214      * To wait for the whole content, see {@link #await(long, TimeUnit)}.
215      *
216      * @param timeout the time to wait
217      * @param unit the timeout unit
218      * @return the response
219      * @throws InterruptedException if the thread is interrupted
220      * @throws TimeoutException if the timeout expires
221      * @throws ExecutionException if a failure happened
222      */
223     public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
224     {
225         boolean expired = !responseLatch.await(timeout, unit);
226         if (expired)
227             throw new TimeoutException();
228         if (failure != null)
229             throw new ExecutionException(failure);
230         return response;
231     }
232 
233     /**
234      * Waits for the given timeout for the whole request/response cycle to be finished,
235      * then returns the corresponding result.
236      * <p>
237      *
238      * @param timeout the time to wait
239      * @param unit the timeout unit
240      * @return the result
241      * @throws InterruptedException if the thread is interrupted
242      * @throws TimeoutException if the timeout expires
243      * @see #get(long, TimeUnit)
244      */
245     public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
246     {
247         boolean expired = !resultLatch.await(timeout, unit);
248         if (expired)
249             throw new TimeoutException();
250         return result;
251     }
252 
253     /**
254      * Returns an {@link InputStream} providing the response content bytes.
255      * <p>
256      * The method may be invoked only once; subsequent invocations will return a closed {@link InputStream}.
257      *
258      * @return an input stream providing the response content
259      */
260     public InputStream getInputStream()
261     {
262         InputStream result = new Input();
263         if (stream.compareAndSet(null, result))
264             return result;
265         return IO.getClosedStream();
266     }
267 
268     private class Input extends InputStream
269     {
270         private byte[] bytes;
271         private int index;
272 
273         @Override
274         public int read() throws IOException
275         {
276             while (true)
277             {
278                 if (bytes == EOF)
279                 {
280                     // Mark the fact that we saw -1,
281                     // so that in the close case we don't throw
282                     index = -1;
283                     return -1;
284                 }
285                 else if (bytes == FAILURE)
286                 {
287                     throw failure();
288                 }
289                 else if (bytes == CLOSED)
290                 {
291                     if (index < 0)
292                         return -1;
293                     throw new AsynchronousCloseException();
294                 }
295                 else if (bytes != null)
296                 {
297                     int result = bytes[index] & 0xFF;
298                     if (++index == bytes.length)
299                     {
300                         length.addAndGet(-index);
301                         bytes = null;
302                         index = 0;
303                         signal();
304                     }
305                     return result;
306                 }
307                 else
308                 {
309                     bytes = take();
310                     if (LOG.isDebugEnabled())
311                         LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
312                 }
313             }
314         }
315 
316         private IOException failure()
317         {
318             if (failure instanceof IOException)
319                 return (IOException)failure;
320             else
321                 return new IOException(failure);
322         }
323 
324         private byte[] take() throws IOException
325         {
326             try
327             {
328                 return queue.take();
329             }
330             catch (InterruptedException x)
331             {
332                 throw new InterruptedIOException();
333             }
334         }
335 
336         @Override
337         public void close() throws IOException
338         {
339             if (!closed)
340             {
341                 super.close();
342                 if (LOG.isDebugEnabled())
343                     LOG.debug("Queuing close {}{}", CLOSED, "");
344                 queue.offer(CLOSED);
345                 closed = true;
346                 signal();
347             }
348         }
349     }
350 }