View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.AsynchronousCloseException;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.TimeoutException;
32  import java.util.concurrent.atomic.AtomicLong;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.eclipse.jetty.client.HttpClient;
36  import org.eclipse.jetty.client.api.Response;
37  import org.eclipse.jetty.client.api.Result;
38  import org.eclipse.jetty.util.IO;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  
42  /**
43   * Implementation of {@link Response.Listener} that produces an {@link InputStream}
44   * that allows applications to read the response content.
45   * <p />
46   * Typical usage is:
47   * <pre>
48   * InputStreamResponseListener listener = new InputStreamResponseListener();
49   * client.newRequest(...).send(listener);
50   *
51   * // Wait for the response headers to arrive
52   * Response response = listener.get(5, TimeUnit.SECONDS);
53   * if (response.getStatus() == 200)
54   * {
55   *     // Obtain the input stream on the response content
56   *     try (InputStream input = listener.getInputStream())
57   *     {
58   *         // Read the response content
59   *     }
60   * }
61   * </pre>
62   * <p />
63   * The {@link HttpClient} implementation (the producer) will feed the input stream
64   * asynchronously while the application (the consumer) is reading from it.
65   * Chunks of content are maintained in a queue, and it is possible to specify a
66   * maximum buffer size for the bytes held in the queue, by default 16384 bytes.
67   * <p />
68   * If the consumer is faster than the producer, then the consumer will block
69   * with the typical {@link InputStream#read()} semantic.
70   * If the consumer is slower than the producer, then the producer will block
71   * until the client consumes.
72   */
73  public class InputStreamResponseListener extends Response.Listener.Empty
74  {
75      private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
76      private static final byte[] EOF = new byte[0];
77      private static final byte[] CLOSED = new byte[0];
78      private static final byte[] FAILURE = new byte[0];
79      private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
80      private final AtomicLong length = new AtomicLong();
81      private final CountDownLatch responseLatch = new CountDownLatch(1);
82      private final CountDownLatch resultLatch = new CountDownLatch(1);
83      private final AtomicReference<InputStream> stream = new AtomicReference<>();
84      private final long maxBufferSize;
85      private Response response;
86      private Result result;
87      private volatile Throwable failure;
88      private volatile boolean closed;
89  
90      public InputStreamResponseListener()
91      {
92          this(16 * 1024L);
93      }
94  
95      public InputStreamResponseListener(long maxBufferSize)
96      {
97          this.maxBufferSize = maxBufferSize;
98      }
99  
100     @Override
101     public void onHeaders(Response response)
102     {
103         this.response = response;
104         responseLatch.countDown();
105     }
106 
107     @Override
108     public void onContent(Response response, ByteBuffer content)
109     {
110         // Avoid buffering if the input stream is early closed.
111         if (closed)
112             return;
113 
114         int remaining = content.remaining();
115         byte[] bytes = new byte[remaining];
116         content.get(bytes);
117         LOG.debug("Queuing {}/{} bytes", bytes, bytes.length);
118         queue.offer(bytes);
119 
120         long newLength = length.addAndGet(remaining);
121         while (newLength >= maxBufferSize)
122         {
123             LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
124             // Block to avoid infinite buffering
125             if (!await())
126                 break;
127             newLength = length.get();
128             LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
129         }
130     }
131 
132     @Override
133     public void onComplete(Result result)
134     {
135         this.result = result;
136         if (result.isSucceeded())
137         {
138             LOG.debug("Queuing end of content {}{}", EOF, "");
139             queue.offer(EOF);
140         }
141         else
142         {
143             LOG.debug("Queuing failure {} {}", FAILURE, failure);
144             queue.offer(FAILURE);
145             this.failure = result.getFailure();
146             responseLatch.countDown();
147         }
148         resultLatch.countDown();
149         signal();
150     }
151 
152     protected boolean await()
153     {
154         try
155         {
156             synchronized (this)
157             {
158                 while (length.get() >= maxBufferSize && failure == null && !closed)
159                     wait();
160                 // Re-read the values as they may have changed while waiting.
161                 return failure == null && !closed;
162             }
163         }
164         catch (InterruptedException x)
165         {
166             Thread.currentThread().interrupt();
167             return false;
168         }
169     }
170 
171     protected void signal()
172     {
173         synchronized (this)
174         {
175             notifyAll();
176         }
177     }
178 
179     /**
180      * Waits for the given timeout for the response to be available, then returns it.
181      * <p />
182      * The wait ends as soon as all the HTTP headers have been received, without waiting for the content.
183      * To wait for the whole content, see {@link #await(long, TimeUnit)}.
184      *
185      * @param timeout the time to wait
186      * @param unit the timeout unit
187      * @return the response
188      * @throws InterruptedException if the thread is interrupted
189      * @throws TimeoutException if the timeout expires
190      * @throws ExecutionException if a failure happened
191      */
192     public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
193     {
194         boolean expired = !responseLatch.await(timeout, unit);
195         if (expired)
196             throw new TimeoutException();
197         if (failure != null)
198             throw new ExecutionException(failure);
199         return response;
200     }
201 
202     /**
203      * Waits for the given timeout for the whole request/response cycle to be finished,
204      * then returns the corresponding result.
205      * <p />
206      *
207      * @param timeout the time to wait
208      * @param unit the timeout unit
209      * @return the result
210      * @throws InterruptedException if the thread is interrupted
211      * @throws TimeoutException if the timeout expires
212      * @see #get(long, TimeUnit)
213      */
214     public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
215     {
216         boolean expired = !resultLatch.await(timeout, unit);
217         if (expired)
218             throw new TimeoutException();
219         return result;
220     }
221 
222     /**
223      * Returns an {@link InputStream} providing the response content bytes.
224      * <p />
225      * The method may be invoked only once; subsequent invocations will return a closed {@link InputStream}.
226      *
227      * @return an input stream providing the response content
228      */
229     public InputStream getInputStream()
230     {
231         InputStream result = new Input();
232         if (stream.compareAndSet(null, result))
233             return result;
234         return IO.getClosedStream();
235     }
236 
237     private class Input extends InputStream
238     {
239         private byte[] bytes;
240         private int index;
241 
242         @Override
243         public int read() throws IOException
244         {
245             while (true)
246             {
247                 if (bytes == EOF)
248                 {
249                     // Mark the fact that we saw -1,
250                     // so that in the close case we don't throw
251                     index = -1;
252                     return -1;
253                 }
254                 else if (bytes == FAILURE)
255                 {
256                     throw failure();
257                 }
258                 else if (bytes == CLOSED)
259                 {
260                     if (index < 0)
261                         return -1;
262                     throw new AsynchronousCloseException();
263                 }
264                 else if (bytes != null)
265                 {
266                     int result = bytes[index] & 0xFF;
267                     if (++index == bytes.length)
268                     {
269                         length.addAndGet(-index);
270                         bytes = null;
271                         index = 0;
272                         signal();
273                     }
274                     return result;
275                 }
276                 else
277                 {
278                     bytes = take();
279                     LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
280                 }
281             }
282         }
283 
284         private IOException failure()
285         {
286             if (failure instanceof IOException)
287                 return (IOException)failure;
288             else
289                 return new IOException(failure);
290         }
291 
292         private byte[] take() throws IOException
293         {
294             try
295             {
296                 return queue.take();
297             }
298             catch (InterruptedException x)
299             {
300                 throw new InterruptedIOException();
301             }
302         }
303 
304         @Override
305         public void close() throws IOException
306         {
307             LOG.debug("Queuing close {}{}", CLOSED, "");
308             queue.offer(CLOSED);
309             closed = true;
310             signal();
311             super.close();
312         }
313     }
314 }