View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.AsynchronousCloseException;
26  import java.util.ArrayDeque;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.List;
30  import java.util.Queue;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.TimeoutException;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.eclipse.jetty.client.HttpClient;
38  import org.eclipse.jetty.client.api.Response;
39  import org.eclipse.jetty.client.api.Response.Listener;
40  import org.eclipse.jetty.client.api.Result;
41  import org.eclipse.jetty.util.BufferUtil;
42  import org.eclipse.jetty.util.Callback;
43  import org.eclipse.jetty.util.IO;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  
47  /**
48   * Implementation of {@link Listener} that produces an {@link InputStream}
49   * that allows applications to read the response content.
50   * <p>
51   * Typical usage is:
52   * <pre>
53   * InputStreamResponseListener listener = new InputStreamResponseListener();
54   * client.newRequest(...).send(listener);
55   *
56   * // Wait for the response headers to arrive
57   * Response response = listener.get(5, TimeUnit.SECONDS);
58   * if (response.getStatus() == 200)
59   * {
60   *     // Obtain the input stream on the response content
61   *     try (InputStream input = listener.getInputStream())
62   *     {
63   *         // Read the response content
64   *     }
65   * }
66   * </pre>
67   * <p>
68   * The {@link HttpClient} implementation (the producer) will feed the input stream
69   * asynchronously while the application (the consumer) is reading from it.
70   * <p>
71   * If the consumer is faster than the producer, then the consumer will block
72   * with the typical {@link InputStream#read()} semantic.
73   * If the consumer is slower than the producer, then the producer will block
74   * until the client consumes.
75   */
76  public class InputStreamResponseListener extends Listener.Adapter
77  {
78      private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
79      private static final DeferredContentProvider.Chunk EOF = new DeferredContentProvider.Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
80      private final Object lock = this;
81      private final CountDownLatch responseLatch = new CountDownLatch(1);
82      private final CountDownLatch resultLatch = new CountDownLatch(1);
83      private final AtomicReference<InputStream> stream = new AtomicReference<>();
84      private final Queue<DeferredContentProvider.Chunk> chunks = new ArrayDeque<>();
85      private Response response;
86      private Result result;
87      private Throwable failure;
88      private boolean closed;
89  
90      public InputStreamResponseListener()
91      {
92      }
93  
94      /**
95       * @deprecated response content is not buffered anymore, but handled asynchronously.
96       */
97      @Deprecated
98      public InputStreamResponseListener(long maxBufferSize)
99      {
100     }
101 
102     @Override
103     public void onHeaders(Response response)
104     {
105         synchronized (lock)
106         {
107             this.response = response;
108             responseLatch.countDown();
109         }
110     }
111 
112     @Override
113     public void onContent(Response response, ByteBuffer content, Callback callback)
114     {
115         if (content.remaining() == 0)
116         {
117             if (LOG.isDebugEnabled())
118                 LOG.debug("Skipped empty content {}", content);
119             callback.succeeded();
120             return;
121         }
122 
123         boolean closed;
124         synchronized (lock)
125         {
126             closed = this.closed;
127             if (!closed)
128             {
129                 if (LOG.isDebugEnabled())
130                     LOG.debug("Queueing content {}", content);
131                 chunks.add(new DeferredContentProvider.Chunk(content, callback));
132                 lock.notifyAll();
133             }
134         }
135 
136         if (closed)
137         {
138             if (LOG.isDebugEnabled())
139                 LOG.debug("InputStream closed, ignored content {}", content);
140             callback.failed(new AsynchronousCloseException());
141         }
142     }
143 
144     @Override
145     public void onSuccess(Response response)
146     {
147         synchronized (lock)
148         {
149             if (!closed)
150                 chunks.add(EOF);
151             lock.notifyAll();
152         }
153 
154         if (LOG.isDebugEnabled())
155             LOG.debug("End of content");
156     }
157 
158     @Override
159     public void onFailure(Response response, Throwable failure)
160     {
161         List<Callback> callbacks;
162         synchronized (lock)
163         {
164             if (this.failure != null)
165                 return;
166             this.failure = failure;
167             callbacks = drain();
168             lock.notifyAll();
169         }
170 
171         if (LOG.isDebugEnabled())
172             LOG.debug("Content failure", failure);
173 
174         callbacks.forEach(callback -> callback.failed(failure));
175     }
176 
177     @Override
178     public void onComplete(Result result)
179     {
180         Throwable failure = result.getFailure();
181         List<Callback> callbacks = Collections.emptyList();
182         synchronized (lock)
183         {
184             this.result = result;
185             if (result.isFailed() && this.failure == null)
186             {
187                 this.failure = failure;
188                 callbacks = drain();
189             }
190             // Notify the response latch in case of request failures.
191             responseLatch.countDown();
192             resultLatch.countDown();
193             lock.notifyAll();
194         }
195 
196         if (LOG.isDebugEnabled())
197         {
198             if (failure == null)
199                 LOG.debug("Result success");
200             else
201                 LOG.debug("Result failure", failure);
202         }
203 
204         callbacks.forEach(callback -> callback.failed(failure));
205     }
206 
207     /**
208      * Waits for the given timeout for the response to be available, then returns it.
209      * <p>
210      * The wait ends as soon as all the HTTP headers have been received, without waiting for the content.
211      * To wait for the whole content, see {@link #await(long, TimeUnit)}.
212      *
213      * @param timeout the time to wait
214      * @param unit the timeout unit
215      * @return the response
216      * @throws InterruptedException if the thread is interrupted
217      * @throws TimeoutException if the timeout expires
218      * @throws ExecutionException if a failure happened
219      */
220     public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
221     {
222         boolean expired = !responseLatch.await(timeout, unit);
223         if (expired)
224             throw new TimeoutException();
225         synchronized (lock)
226         {
227             // If the request failed there is no response.
228             if (response == null)
229                 throw new ExecutionException(failure);
230             return response;
231         }
232     }
233 
234     /**
235      * Waits for the given timeout for the whole request/response cycle to be finished,
236      * then returns the corresponding result.
237      * <p>
238      *
239      * @param timeout the time to wait
240      * @param unit the timeout unit
241      * @return the result
242      * @throws InterruptedException if the thread is interrupted
243      * @throws TimeoutException if the timeout expires
244      * @see #get(long, TimeUnit)
245      */
246     public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
247     {
248         boolean expired = !resultLatch.await(timeout, unit);
249         if (expired)
250             throw new TimeoutException();
251         synchronized (lock)
252         {
253             return result;
254         }
255     }
256 
257     /**
258      * Returns an {@link InputStream} providing the response content bytes.
259      * <p>
260      * The method may be invoked only once; subsequent invocations will return a closed {@link InputStream}.
261      *
262      * @return an input stream providing the response content
263      */
264     public InputStream getInputStream()
265     {
266         InputStream result = new Input();
267         if (stream.compareAndSet(null, result))
268             return result;
269         return IO.getClosedStream();
270     }
271 
272     private List<Callback> drain()
273     {
274         List<Callback> callbacks = new ArrayList<>();
275         synchronized (lock)
276         {
277             while (true)
278             {
279                 DeferredContentProvider.Chunk chunk = chunks.peek();
280                 if (chunk == null || chunk == EOF)
281                     break;
282                 callbacks.add(chunk.callback);
283                 chunks.poll();
284             }
285         }
286         return callbacks;
287     }
288 
289     private class Input extends InputStream
290     {
291         @Override
292         public int read() throws IOException
293         {
294             byte[] tmp = new byte[1];
295             int read = read(tmp);
296             if (read < 0)
297                 return read;
298             return tmp[0] & 0xFF;
299         }
300 
301         @Override
302         public int read(byte[] b, int offset, int length) throws IOException
303         {
304             try
305             {
306                 int result;
307                 Callback callback = null;
308                 synchronized (lock)
309                 {
310                     DeferredContentProvider.Chunk chunk;
311                     while (true)
312                     {
313                         chunk = chunks.peek();
314                         if (chunk == EOF)
315                             return -1;
316 
317                         if (chunk != null)
318                             break;
319 
320                         if (failure != null)
321                             throw toIOException(failure);
322 
323                         if (closed)
324                             throw new AsynchronousCloseException();
325 
326                         lock.wait();
327                     }
328 
329                     ByteBuffer buffer = chunk.buffer;
330                     result = Math.min(buffer.remaining(), length);
331                     buffer.get(b, offset, result);
332                     if (!buffer.hasRemaining())
333                     {
334                         callback = chunk.callback;
335                         chunks.poll();
336                     }
337                 }
338                 if (callback != null)
339                     callback.succeeded();
340                 return result;
341             }
342             catch (InterruptedException x)
343             {
344                 throw new InterruptedIOException();
345             }
346         }
347 
348         private IOException toIOException(Throwable failure)
349         {
350             if (failure instanceof IOException)
351                 return (IOException)failure;
352             else
353                 return new IOException(failure);
354         }
355 
356         @Override
357         public void close() throws IOException
358         {
359             List<Callback> callbacks;
360             synchronized (lock)
361             {
362                 if (closed)
363                     return;
364                 closed = true;
365                 callbacks = drain();
366                 lock.notifyAll();
367             }
368 
369             if (LOG.isDebugEnabled())
370                 LOG.debug("InputStream close");
371 
372             Throwable failure = new AsynchronousCloseException();
373             callbacks.forEach(callback -> callback.failed(failure));
374 
375             super.close();
376         }
377     }
378 }