View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.NoSuchElementException;
29  import java.util.Objects;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.eclipse.jetty.client.AsyncContentProvider;
34  import org.eclipse.jetty.client.Synchronizable;
35  import org.eclipse.jetty.client.api.ContentProvider;
36  import org.eclipse.jetty.client.api.Request;
37  import org.eclipse.jetty.client.api.Response;
38  import org.eclipse.jetty.util.ArrayQueue;
39  import org.eclipse.jetty.util.BufferUtil;
40  import org.eclipse.jetty.util.Callback;
41  
42  /**
43   * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)}
44   * has been called, therefore providing the request content at a later time.
45   * <p>
46   * {@link DeferredContentProvider} can only be used in conjunction with
47   * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()})
48   * because it provides content asynchronously.
49   * <p>
50   * The deferred content is provided once and then fully consumed.
51   * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
52   * because the stream has been consumed on the first invocation.
53   * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy
54   * the content to another location (for example a file) and be able to support multiple invocations
55   * of of {@link #iterator()} returning the iterator provided by this
56    * class on the first invocation, and an iterator on the bytes copied to the other location
57    * for subsequent invocations.
58   * <p>
59   * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive
60   * separately from HTTP content chunks.
61   * <p>
62   * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple
63   * times, and when all content has been provided it must be signaled with a call to {@link #close()}.
64   * <p>
65   * Example usage:
66   * <pre>
67   * HttpClient httpClient = ...;
68   *
69   * // Use try-with-resources to autoclose DeferredContentProvider
70   * try (DeferredContentProvider content = new DeferredContentProvider())
71   * {
72   *     httpClient.newRequest("localhost", 8080)
73   *             .content(content)
74   *             .send(new Response.CompleteListener()
75   *             {
76   *                 &#64;Override
77   *                 public void onComplete(Result result)
78   *                 {
79   *                     // Your logic here
80   *                 }
81   *             });
82   *
83   *     // At a later time...
84   *     content.offer(ByteBuffer.wrap("some content".getBytes()));
85   * }
86   * </pre>
87   */
88  public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable
89  {
90      private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
91  
92      private final Object lock = this;
93      private final ArrayQueue<Chunk> chunks = new ArrayQueue<>(4, 64, lock);
94      private final AtomicReference<Listener> listener = new AtomicReference<>();
95      private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
96      private final AtomicBoolean closed = new AtomicBoolean();
97      private long length = -1;
98      private int size;
99      private Throwable failure;
100 
101     /**
102      * Creates a new {@link DeferredContentProvider} with the given initial content
103      *
104      * @param buffers the initial content
105      */
106     public DeferredContentProvider(ByteBuffer... buffers)
107     {
108         for (ByteBuffer buffer : buffers)
109             offer(buffer);
110     }
111 
112     @Override
113     public void setListener(Listener listener)
114     {
115         if (!this.listener.compareAndSet(null, listener))
116             throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
117                     AsyncContentProvider.class.getName()));
118 
119         if (isClosed())
120         {
121             synchronized (lock)
122             {
123                 long total = 0;
124                 for (Chunk chunk : chunks)
125                     total += chunk.buffer.remaining();
126                 length = total;
127             }
128         }
129     }
130 
131     @Override
132     public long getLength()
133     {
134         return length;
135     }
136 
137     /**
138      * Adds the given content buffer to this content provider
139      * and notifies the listener that content is available.
140      *
141      * @param buffer the content to add
142      * @return true if the content was added, false otherwise
143      */
144     public boolean offer(ByteBuffer buffer)
145     {
146         return offer(buffer, Callback.NOOP);
147     }
148 
149     public boolean offer(ByteBuffer buffer, Callback callback)
150     {
151         return offer(new Chunk(buffer, callback));
152     }
153 
154     private boolean offer(Chunk chunk)
155     {
156         Throwable failure;
157         boolean result = false;
158         synchronized (lock)
159         {
160             failure = this.failure;
161             if (failure == null)
162             {
163                 result = chunks.offer(chunk);
164                 if (result && chunk != CLOSE)
165                     ++size;
166             }
167         }
168         if (failure != null)
169             chunk.callback.failed(failure);
170         else if (result)
171             notifyListener();
172         return result;
173     }
174 
175     private void clear()
176     {
177         synchronized (lock)
178         {
179             chunks.clear();
180         }
181     }
182 
183     public void flush() throws IOException
184     {
185         synchronized (lock)
186         {
187             try
188             {
189                 while (true)
190                 {
191                     if (failure != null)
192                         throw new IOException(failure);
193                     if (size == 0)
194                         break;
195                     lock.wait();
196                 }
197             }
198             catch (InterruptedException x)
199             {
200                 throw new InterruptedIOException();
201             }
202         }
203     }
204 
205     /**
206      * No more content will be added to this content provider
207      * and notifies the listener that no more content is available.
208      */
209     public void close()
210     {
211         if (closed.compareAndSet(false, true))
212             offer(CLOSE);
213     }
214 
215     public boolean isClosed()
216     {
217         return closed.get();
218     }
219 
220     @Override
221     public void succeeded()
222     {
223     }
224 
225     @Override
226     public void failed(Throwable failure)
227     {
228         iterator.failed(failure);
229     }
230 
231     private void notifyListener()
232     {
233         Listener listener = this.listener.get();
234         if (listener != null)
235             listener.onContent();
236     }
237 
238     @Override
239     public Iterator<ByteBuffer> iterator()
240     {
241         return iterator;
242     }
243 
244     private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback, Synchronizable
245     {
246         private Chunk current;
247 
248         @Override
249         public boolean hasNext()
250         {
251             synchronized (lock)
252             {
253                 return chunks.peek() != CLOSE;
254             }
255         }
256 
257         @Override
258         public ByteBuffer next()
259         {
260             synchronized (lock)
261             {
262                 Chunk chunk = current = chunks.poll();
263                 if (chunk == CLOSE)
264                 {
265                     // Slow path: reinsert the CLOSE chunk
266                     // so that hasNext() works correctly.
267                     chunks.add(0, CLOSE);
268                     throw new NoSuchElementException();
269                 }
270                 return chunk == null ? null : chunk.buffer;
271             }
272         }
273 
274         @Override
275         public void remove()
276         {
277             throw new UnsupportedOperationException();
278         }
279 
280         @Override
281         public void succeeded()
282         {
283             Chunk chunk;
284             synchronized (lock)
285             {
286                 chunk = current;
287                 if (chunk != null)
288                 {
289                     --size;
290                     lock.notify();
291                 }
292             }
293             if (chunk != null)
294                 chunk.callback.succeeded();
295         }
296 
297         @Override
298         public void failed(Throwable x)
299         {
300             List<Chunk> chunks = new ArrayList<>();
301             synchronized (lock)
302             {
303                 failure = x;
304                 // Transfer all chunks to fail them all.
305                 Chunk chunk = current;
306                 current = null;
307                 if (chunk != null)
308                     chunks.add(chunk);
309                 chunks.addAll(DeferredContentProvider.this.chunks);
310                 clear();
311                 lock.notify();
312             }
313             for (Chunk chunk : chunks)
314                 chunk.callback.failed(x);
315         }
316 
317         @Override
318         public Object getLock()
319         {
320             return lock;
321         }
322     }
323 
324     public static class Chunk
325     {
326         public final ByteBuffer buffer;
327         public final Callback callback;
328 
329         public Chunk(ByteBuffer buffer, Callback callback)
330         {
331             this.buffer = Objects.requireNonNull(buffer);
332             this.callback = Objects.requireNonNull(callback);
333         }
334 
335         @Override
336         public String toString()
337         {
338             return String.format("%s@%x", getClass().getSimpleName(), hashCode());
339         }
340     }
341 }