View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.AsynchronousCloseException;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.TimeoutException;
32  import java.util.concurrent.atomic.AtomicLong;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.eclipse.jetty.client.HttpClient;
36  import org.eclipse.jetty.client.api.Response;
37  import org.eclipse.jetty.client.api.Response.Listener;
38  import org.eclipse.jetty.client.api.Result;
39  import org.eclipse.jetty.util.IO;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  
43  /**
44   * Implementation of {@link Listener} that produces an {@link InputStream}
45   * that allows applications to read the response content.
46   * <p />
47   * Typical usage is:
48   * <pre>
49   * InputStreamResponseListener listener = new InputStreamResponseListener();
50   * client.newRequest(...).send(listener);
51   *
52   * // Wait for the response headers to arrive
53   * Response response = listener.get(5, TimeUnit.SECONDS);
54   * if (response.getStatus() == 200)
55   * {
56   *     // Obtain the input stream on the response content
57   *     try (InputStream input = listener.getInputStream())
58   *     {
59   *         // Read the response content
60   *     }
61   * }
62   * </pre>
63   * <p />
64   * The {@link HttpClient} implementation (the producer) will feed the input stream
65   * asynchronously while the application (the consumer) is reading from it.
66   * Chunks of content are maintained in a queue, and it is possible to specify a
67   * maximum buffer size for the bytes held in the queue, by default 16384 bytes.
68   * <p />
69   * If the consumer is faster than the producer, then the consumer will block
70   * with the typical {@link InputStream#read()} semantic.
71   * If the consumer is slower than the producer, then the producer will block
72   * until the client consumes.
73   */
74  public class InputStreamResponseListener extends Listener.Adapter
75  {
76      private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
77      private static final byte[] EOF = new byte[0];
78      private static final byte[] CLOSED = new byte[0];
79      private static final byte[] FAILURE = new byte[0];
80      private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
81      private final AtomicLong length = new AtomicLong();
82      private final CountDownLatch responseLatch = new CountDownLatch(1);
83      private final CountDownLatch resultLatch = new CountDownLatch(1);
84      private final AtomicReference<InputStream> stream = new AtomicReference<>();
85      private final long maxBufferSize;
86      private Response response;
87      private Result result;
88      private volatile Throwable failure;
89      private volatile boolean closed;
90  
91      public InputStreamResponseListener()
92      {
93          this(16 * 1024L);
94      }
95  
96      public InputStreamResponseListener(long maxBufferSize)
97      {
98          this.maxBufferSize = maxBufferSize;
99      }
100 
101     @Override
102     public void onHeaders(Response response)
103     {
104         this.response = response;
105         responseLatch.countDown();
106     }
107 
108     @Override
109     public void onContent(Response response, ByteBuffer content)
110     {
111         // Avoid buffering if the input stream is early closed.
112         if (closed)
113             return;
114 
115         int remaining = content.remaining();
116         byte[] bytes = new byte[remaining];
117         content.get(bytes);
118         LOG.debug("Queuing {}/{} bytes", bytes, bytes.length);
119         queue.offer(bytes);
120 
121         long newLength = length.addAndGet(remaining);
122         while (newLength >= maxBufferSize)
123         {
124             LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
125             // Block to avoid infinite buffering
126             if (!await())
127                 break;
128             newLength = length.get();
129             LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
130         }
131     }
132 
133     @Override
134     public void onComplete(Result result)
135     {
136         this.result = result;
137         if (result.isSucceeded())
138         {
139             LOG.debug("Queuing end of content {}{}", EOF, "");
140             queue.offer(EOF);
141         }
142         else
143         {
144             LOG.debug("Queuing failure {} {}", FAILURE, failure);
145             queue.offer(FAILURE);
146             this.failure = result.getFailure();
147             responseLatch.countDown();
148         }
149         resultLatch.countDown();
150         signal();
151     }
152 
153     protected boolean await()
154     {
155         try
156         {
157             synchronized (this)
158             {
159                 while (length.get() >= maxBufferSize && failure == null && !closed)
160                     wait();
161                 // Re-read the values as they may have changed while waiting.
162                 return failure == null && !closed;
163             }
164         }
165         catch (InterruptedException x)
166         {
167             Thread.currentThread().interrupt();
168             return false;
169         }
170     }
171 
172     protected void signal()
173     {
174         synchronized (this)
175         {
176             notifyAll();
177         }
178     }
179 
180     /**
181      * Waits for the given timeout for the response to be available, then returns it.
182      * <p />
183      * The wait ends as soon as all the HTTP headers have been received, without waiting for the content.
184      * To wait for the whole content, see {@link #await(long, TimeUnit)}.
185      *
186      * @param timeout the time to wait
187      * @param unit the timeout unit
188      * @return the response
189      * @throws InterruptedException if the thread is interrupted
190      * @throws TimeoutException if the timeout expires
191      * @throws ExecutionException if a failure happened
192      */
193     public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
194     {
195         boolean expired = !responseLatch.await(timeout, unit);
196         if (expired)
197             throw new TimeoutException();
198         if (failure != null)
199             throw new ExecutionException(failure);
200         return response;
201     }
202 
203     /**
204      * Waits for the given timeout for the whole request/response cycle to be finished,
205      * then returns the corresponding result.
206      * <p />
207      *
208      * @param timeout the time to wait
209      * @param unit the timeout unit
210      * @return the result
211      * @throws InterruptedException if the thread is interrupted
212      * @throws TimeoutException if the timeout expires
213      * @see #get(long, TimeUnit)
214      */
215     public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
216     {
217         boolean expired = !resultLatch.await(timeout, unit);
218         if (expired)
219             throw new TimeoutException();
220         return result;
221     }
222 
223     /**
224      * Returns an {@link InputStream} providing the response content bytes.
225      * <p />
226      * The method may be invoked only once; subsequent invocations will return a closed {@link InputStream}.
227      *
228      * @return an input stream providing the response content
229      */
230     public InputStream getInputStream()
231     {
232         InputStream result = new Input();
233         if (stream.compareAndSet(null, result))
234             return result;
235         return IO.getClosedStream();
236     }
237 
238     private class Input extends InputStream
239     {
240         private byte[] bytes;
241         private int index;
242 
243         @Override
244         public int read() throws IOException
245         {
246             while (true)
247             {
248                 if (bytes == EOF)
249                 {
250                     // Mark the fact that we saw -1,
251                     // so that in the close case we don't throw
252                     index = -1;
253                     return -1;
254                 }
255                 else if (bytes == FAILURE)
256                 {
257                     throw failure();
258                 }
259                 else if (bytes == CLOSED)
260                 {
261                     if (index < 0)
262                         return -1;
263                     throw new AsynchronousCloseException();
264                 }
265                 else if (bytes != null)
266                 {
267                     int result = bytes[index] & 0xFF;
268                     if (++index == bytes.length)
269                     {
270                         length.addAndGet(-index);
271                         bytes = null;
272                         index = 0;
273                         signal();
274                     }
275                     return result;
276                 }
277                 else
278                 {
279                     bytes = take();
280                     LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
281                 }
282             }
283         }
284 
285         private IOException failure()
286         {
287             if (failure instanceof IOException)
288                 return (IOException)failure;
289             else
290                 return new IOException(failure);
291         }
292 
293         private byte[] take() throws IOException
294         {
295             try
296             {
297                 return queue.take();
298             }
299             catch (InterruptedException x)
300             {
301                 throw new InterruptedIOException();
302             }
303         }
304 
305         @Override
306         public void close() throws IOException
307         {
308             LOG.debug("Queuing close {}{}", CLOSED, "");
309             queue.offer(CLOSED);
310             closed = true;
311             signal();
312             super.close();
313         }
314     }
315 }