View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.AsynchronousCloseException;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.TimeoutException;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.eclipse.jetty.client.HttpClient;
35  import org.eclipse.jetty.client.api.Response;
36  import org.eclipse.jetty.client.api.Result;
37  import org.eclipse.jetty.util.log.Log;
38  import org.eclipse.jetty.util.log.Logger;
39  
40  /**
41   * Implementation of {@link Response.Listener} that produces an {@link InputStream}
42   * that allows applications to read the response content.
43   * <p />
44   * Typical usage is:
45   * <pre>
46   * InputStreamResponseListener listener = new InputStreamResponseListener();
47   * client.newRequest(...).send(listener);
48   *
49   * // Wait for the response headers to arrive
50   * Response response = listener.get(5, TimeUnit.SECONDS);
51   * if (response.getStatus() == 200)
52   * {
53   *     // Obtain the input stream on the response content
54   *     try (InputStream input = listener.getInputStream())
55   *     {
56   *         // Read the response content
57   *     }
58   * }
59   * </pre>
60   * <p />
61   * The {@link HttpClient} implementation (the producer) will feed the input stream
62   * asynchronously while the application (the consumer) is reading from it.
63   * Chunks of content are maintained in a queue, and it is possible to specify a
64   * maximum buffer size for the bytes held in the queue, by default 16384 bytes.
65   * <p />
66   * If the consumer is faster than the producer, then the consumer will block
67   * with the typical {@link InputStream#read()} semantic.
68   * If the consumer is slower than the producer, then the producer will block
69   * until the client consumes.
70   */
71  public class InputStreamResponseListener extends Response.Listener.Empty
72  {
73      private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
74      private static final byte[] EOF = new byte[0];
75      private static final byte[] CLOSE = new byte[0];
76      private static final byte[] FAILURE = new byte[0];
77      private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
78      private final AtomicLong length = new AtomicLong();
79      private final CountDownLatch responseLatch = new CountDownLatch(1);
80      private final CountDownLatch resultLatch = new CountDownLatch(1);
81      private final long maxBufferSize;
82      private Response response;
83      private Result result;
84      private volatile Throwable failure;
85  
86      public InputStreamResponseListener()
87      {
88          this(16 * 1024L);
89      }
90  
91      public InputStreamResponseListener(long maxBufferSize)
92      {
93          this.maxBufferSize = maxBufferSize;
94      }
95  
96      @Override
97      public void onHeaders(Response response)
98      {
99          this.response = response;
100         responseLatch.countDown();
101     }
102 
103     @Override
104     public void onContent(Response response, ByteBuffer content)
105     {
106         int remaining = content.remaining();
107         byte[] bytes = new byte[remaining];
108         content.get(bytes);
109         LOG.debug("Queuing {}/{} bytes", bytes, bytes.length);
110         queue.offer(bytes);
111 
112         long newLength = length.addAndGet(remaining);
113         while (newLength >= maxBufferSize)
114         {
115             LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
116             if (!await())
117                 break;
118             newLength = length.get();
119             LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
120         }
121     }
122 
123     @Override
124     public void onFailure(Response response, Throwable failure)
125     {
126         this.failure = failure;
127         LOG.debug("Queuing failure {} {}", FAILURE, failure);
128         queue.offer(FAILURE);
129         responseLatch.countDown();
130     }
131 
132     @Override
133     public void onSuccess(Response response)
134     {
135         LOG.debug("Queuing end of content {}{}", EOF, "");
136         queue.offer(EOF);
137     }
138 
139     @Override
140     public void onComplete(Result result)
141     {
142         this.result = result;
143         resultLatch.countDown();
144     }
145 
146     private boolean await()
147     {
148         try
149         {
150             synchronized (this)
151             {
152                 wait();
153             }
154             return true;
155         }
156         catch (InterruptedException x)
157         {
158             return false;
159         }
160     }
161 
162     private void signal()
163     {
164         synchronized (this)
165         {
166             notify();
167         }
168     }
169 
170     public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
171     {
172         boolean expired = !responseLatch.await(timeout, unit);
173         if (expired)
174             throw new TimeoutException();
175         if (failure != null)
176             throw new ExecutionException(failure);
177         return response;
178     }
179 
180     public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
181     {
182         boolean expired = !resultLatch.await(timeout, unit);
183         if (expired)
184             throw new TimeoutException();
185         return result;
186     }
187 
188     public InputStream getInputStream()
189     {
190         return new Input();
191     }
192 
193     private class Input extends InputStream
194     {
195         private byte[] bytes;
196         private int index;
197 
198         @Override
199         public int read() throws IOException
200         {
201             while (true)
202             {
203                 if (bytes == EOF)
204                 {
205                     // Mark the fact that we saw -1,
206                     // so that in the close case we don't throw
207                     index = -1;
208                     return -1;
209                 }
210                 else if (bytes == FAILURE)
211                 {
212                     throw failure();
213                 }
214                 else if (bytes == CLOSE)
215                 {
216                     if (index < 0)
217                         return -1;
218                     throw new AsynchronousCloseException();
219                 }
220                 else if (bytes != null)
221                 {
222                     if (index < bytes.length)
223                         return bytes[index++];
224                     length.addAndGet(-index);
225                     bytes = null;
226                     index = 0;
227                 }
228                 else
229                 {
230                     bytes = take();
231                     LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
232                     signal();
233                 }
234             }
235         }
236 
237         private IOException failure()
238         {
239             if (failure instanceof IOException)
240                 return (IOException)failure;
241             else
242                 return new IOException(failure);
243         }
244 
245         private byte[] take() throws IOException
246         {
247             try
248             {
249                 return queue.take();
250             }
251             catch (InterruptedException x)
252             {
253                 throw new InterruptedIOException();
254             }
255         }
256 
257         @Override
258         public void close() throws IOException
259         {
260             LOG.debug("Queuing close {}{}", CLOSE, "");
261             queue.offer(CLOSE);
262             super.close();
263         }
264     }
265 }