View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.AsynchronousCloseException;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.TimeoutException;
32  import java.util.concurrent.atomic.AtomicLong;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.eclipse.jetty.client.api.Response;
36  import org.eclipse.jetty.client.api.Response.Listener;
37  import org.eclipse.jetty.client.api.Result;
38  import org.eclipse.jetty.util.IO;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  
42  /**
43   * Implementation of {@link Listener} that produces an {@link InputStream}
44   * that allows applications to read the response content.
45   * <p />
46   * Typical usage is:
47   * <pre>
48   * InputStreamResponseListener listener = new InputStreamResponseListener();
49   * client.newRequest(...).send(listener);
50   *
51   * // Wait for the response headers to arrive
52   * Response response = listener.get(5, TimeUnit.SECONDS);
53   * if (response.getStatus() == 200)
54   * {
55   *     // Obtain the input stream on the response content
56   *     try (InputStream input = listener.getInputStream())
57   *     {
58   *         // Read the response content
59   *     }
60   * }
61   * </pre>
62   * <p />
63   * The {@link HttpClient} implementation (the producer) will feed the input stream
64   * asynchronously while the application (the consumer) is reading from it.
65   * Chunks of content are maintained in a queue, and it is possible to specify a
66   * maximum buffer size for the bytes held in the queue, by default 16384 bytes.
67   * <p />
68   * If the consumer is faster than the producer, then the consumer will block
69   * with the typical {@link InputStream#read()} semantic.
70   * If the consumer is slower than the producer, then the producer will block
71   * until the client consumes.
72   */
73  public class InputStreamResponseListener extends Listener.Adapter
74  {
75      private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
76      private static final byte[] EOF = new byte[0];
77      private static final byte[] CLOSED = new byte[0];
78      private static final byte[] FAILURE = new byte[0];
79      private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
80      private final AtomicLong length = new AtomicLong();
81      private final CountDownLatch responseLatch = new CountDownLatch(1);
82      private final CountDownLatch resultLatch = new CountDownLatch(1);
83      private final AtomicReference<InputStream> stream = new AtomicReference<>();
84      private final long maxBufferSize;
85      private Response response;
86      private Result result;
87      private volatile Throwable failure;
88      private volatile boolean closed;
89  
90      public InputStreamResponseListener()
91      {
92          this(16 * 1024L);
93      }
94  
95      public InputStreamResponseListener(long maxBufferSize)
96      {
97          this.maxBufferSize = maxBufferSize;
98      }
99  
100     @Override
101     public void onHeaders(Response response)
102     {
103         this.response = response;
104         responseLatch.countDown();
105     }
106 
107     @Override
108     public void onContent(Response response, ByteBuffer content)
109     {
110         if (!closed)
111         {
112             int remaining = content.remaining();
113             if (remaining > 0)
114             {
115 
116                 byte[] bytes = new byte[remaining];
117                 content.get(bytes);
118                 if (LOG.isDebugEnabled())
119                     LOG.debug("Queuing {}/{} bytes", bytes, remaining);
120                 queue.offer(bytes);
121 
122                 long newLength = length.addAndGet(remaining);
123                 while (newLength >= maxBufferSize)
124                 {
125                     if (LOG.isDebugEnabled())
126                         LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
127                     // Block to avoid infinite buffering
128                     if (!await())
129                         break;
130                     newLength = length.get();
131                     if (LOG.isDebugEnabled())
132                         LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
133                 }
134             }
135             else
136             {
137                 if (LOG.isDebugEnabled())
138                     LOG.debug("Queuing skipped, empty content {}", content);
139             }
140         }
141         else
142         {
143             LOG.debug("Queuing skipped, stream already closed");
144         }
145     }
146 
147     @Override
148     public void onSuccess(Response response)
149     {
150         if (LOG.isDebugEnabled())
151             LOG.debug("Queuing end of content {}{}", EOF, "");
152         queue.offer(EOF);
153         signal();
154     }
155 
156     @Override
157     public void onFailure(Response response, Throwable failure)
158     {
159         fail(failure);
160         signal();
161     }
162 
163     @Override
164     public void onComplete(Result result)
165     {
166         if (result.isFailed() && failure == null)
167             fail(result.getFailure());
168         this.result = result;
169         resultLatch.countDown();
170         signal();
171     }
172 
173     private void fail(Throwable failure)
174     {
175         if (LOG.isDebugEnabled())
176             LOG.debug("Queuing failure {} {}", FAILURE, failure);
177         queue.offer(FAILURE);
178         this.failure = failure;
179         responseLatch.countDown();
180     }
181 
182     protected boolean await()
183     {
184         try
185         {
186             synchronized (this)
187             {
188                 while (length.get() >= maxBufferSize && failure == null && !closed)
189                     wait();
190                 // Re-read the values as they may have changed while waiting.
191                 return failure == null && !closed;
192             }
193         }
194         catch (InterruptedException x)
195         {
196             Thread.currentThread().interrupt();
197             return false;
198         }
199     }
200 
201     protected void signal()
202     {
203         synchronized (this)
204         {
205             notifyAll();
206         }
207     }
208 
209     /**
210      * Waits for the given timeout for the response to be available, then returns it.
211      * <p />
212      * The wait ends as soon as all the HTTP headers have been received, without waiting for the content.
213      * To wait for the whole content, see {@link #await(long, TimeUnit)}.
214      *
215      * @param timeout the time to wait
216      * @param unit the timeout unit
217      * @return the response
218      * @throws InterruptedException if the thread is interrupted
219      * @throws TimeoutException if the timeout expires
220      * @throws ExecutionException if a failure happened
221      */
222     public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
223     {
224         boolean expired = !responseLatch.await(timeout, unit);
225         if (expired)
226             throw new TimeoutException();
227         if (failure != null)
228             throw new ExecutionException(failure);
229         return response;
230     }
231 
232     /**
233      * Waits for the given timeout for the whole request/response cycle to be finished,
234      * then returns the corresponding result.
235      * <p />
236      *
237      * @param timeout the time to wait
238      * @param unit the timeout unit
239      * @return the result
240      * @throws InterruptedException if the thread is interrupted
241      * @throws TimeoutException if the timeout expires
242      * @see #get(long, TimeUnit)
243      */
244     public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
245     {
246         boolean expired = !resultLatch.await(timeout, unit);
247         if (expired)
248             throw new TimeoutException();
249         return result;
250     }
251 
252     /**
253      * Returns an {@link InputStream} providing the response content bytes.
254      * <p />
255      * The method may be invoked only once; subsequent invocations will return a closed {@link InputStream}.
256      *
257      * @return an input stream providing the response content
258      */
259     public InputStream getInputStream()
260     {
261         InputStream result = new Input();
262         if (stream.compareAndSet(null, result))
263             return result;
264         return IO.getClosedStream();
265     }
266 
267     private class Input extends InputStream
268     {
269         private byte[] bytes;
270         private int index;
271 
272         @Override
273         public int read() throws IOException
274         {
275             while (true)
276             {
277                 if (bytes == EOF)
278                 {
279                     // Mark the fact that we saw -1,
280                     // so that in the close case we don't throw
281                     index = -1;
282                     return -1;
283                 }
284                 else if (bytes == FAILURE)
285                 {
286                     throw failure();
287                 }
288                 else if (bytes == CLOSED)
289                 {
290                     if (index < 0)
291                         return -1;
292                     throw new AsynchronousCloseException();
293                 }
294                 else if (bytes != null)
295                 {
296                     int result = bytes[index] & 0xFF;
297                     if (++index == bytes.length)
298                     {
299                         length.addAndGet(-index);
300                         bytes = null;
301                         index = 0;
302                         signal();
303                     }
304                     return result;
305                 }
306                 else
307                 {
308                     bytes = take();
309                     if (LOG.isDebugEnabled())
310                         LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
311                 }
312             }
313         }
314 
315         private IOException failure()
316         {
317             if (failure instanceof IOException)
318                 return (IOException)failure;
319             else
320                 return new IOException(failure);
321         }
322 
323         private byte[] take() throws IOException
324         {
325             try
326             {
327                 return queue.take();
328             }
329             catch (InterruptedException x)
330             {
331                 throw new InterruptedIOException();
332             }
333         }
334 
335         @Override
336         public void close() throws IOException
337         {
338             if (!closed)
339             {
340                 super.close();
341                 if (LOG.isDebugEnabled())
342                     LOG.debug("Queuing close {}{}", CLOSED, "");
343                 queue.offer(CLOSED);
344                 closed = true;
345                 signal();
346             }
347         }
348     }
349 }