View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayDeque;
26  import java.util.ArrayList;
27  import java.util.Deque;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.NoSuchElementException;
31  import java.util.Objects;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.eclipse.jetty.client.AsyncContentProvider;
36  import org.eclipse.jetty.client.Synchronizable;
37  import org.eclipse.jetty.client.api.ContentProvider;
38  import org.eclipse.jetty.client.api.Request;
39  import org.eclipse.jetty.client.api.Response;
40  import org.eclipse.jetty.util.BufferUtil;
41  import org.eclipse.jetty.util.Callback;
42  
43  /**
44   * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)}
45   * has been called, therefore providing the request content at a later time.
46   * <p>
47   * {@link DeferredContentProvider} can only be used in conjunction with
48   * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()})
49   * because it provides content asynchronously.
50   * <p>
51   * The deferred content is provided once and then fully consumed.
52   * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
53   * because the stream has been consumed on the first invocation.
54   * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy
55   * the content to another location (for example a file) and be able to support multiple invocations
56   * of of {@link #iterator()} returning the iterator provided by this
57    * class on the first invocation, and an iterator on the bytes copied to the other location
58    * for subsequent invocations.
59   * <p>
60   * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive
61   * separately from HTTP content chunks.
62   * <p>
63   * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple
64   * times, and when all content has been provided it must be signaled with a call to {@link #close()}.
65   * <p>
66   * Example usage:
67   * <pre>
68   * HttpClient httpClient = ...;
69   *
70   * // Use try-with-resources to autoclose DeferredContentProvider
71   * try (DeferredContentProvider content = new DeferredContentProvider())
72   * {
73   *     httpClient.newRequest("localhost", 8080)
74   *             .content(content)
75   *             .send(new Response.CompleteListener()
76   *             {
77   *                 &#64;Override
78   *                 public void onComplete(Result result)
79   *                 {
80   *                     // Your logic here
81   *                 }
82   *             });
83   *
84   *     // At a later time...
85   *     content.offer(ByteBuffer.wrap("some content".getBytes()));
86   * }
87   * </pre>
88   */
89  public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable
90  {
91      private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
92  
93      private final Object lock = this;
94      private final Deque<Chunk> chunks = new ArrayDeque<>();
95      private final AtomicReference<Listener> listener = new AtomicReference<>();
96      private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
97      private final AtomicBoolean closed = new AtomicBoolean();
98      private long length = -1;
99      private int size;
100     private Throwable failure;
101 
102     /**
103      * Creates a new {@link DeferredContentProvider} with the given initial content
104      *
105      * @param buffers the initial content
106      */
107     public DeferredContentProvider(ByteBuffer... buffers)
108     {
109         for (ByteBuffer buffer : buffers)
110             offer(buffer);
111     }
112 
113     @Override
114     public void setListener(Listener listener)
115     {
116         if (!this.listener.compareAndSet(null, listener))
117             throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
118                     AsyncContentProvider.class.getName()));
119 
120         if (isClosed())
121         {
122             synchronized (lock)
123             {
124                 long total = 0;
125                 for (Chunk chunk : chunks)
126                     total += chunk.buffer.remaining();
127                 length = total;
128             }
129         }
130     }
131 
132     @Override
133     public long getLength()
134     {
135         return length;
136     }
137 
138     /**
139      * Adds the given content buffer to this content provider
140      * and notifies the listener that content is available.
141      *
142      * @param buffer the content to add
143      * @return true if the content was added, false otherwise
144      */
145     public boolean offer(ByteBuffer buffer)
146     {
147         return offer(buffer, Callback.NOOP);
148     }
149 
150     public boolean offer(ByteBuffer buffer, Callback callback)
151     {
152         return offer(new Chunk(buffer, callback));
153     }
154 
155     private boolean offer(Chunk chunk)
156     {
157         Throwable failure;
158         boolean result = false;
159         synchronized (lock)
160         {
161             failure = this.failure;
162             if (failure == null)
163             {
164                 result = chunks.offer(chunk);
165                 if (result && chunk != CLOSE)
166                     ++size;
167             }
168         }
169         if (failure != null)
170             chunk.callback.failed(failure);
171         else if (result)
172             notifyListener();
173         return result;
174     }
175 
176     private void clear()
177     {
178         synchronized (lock)
179         {
180             chunks.clear();
181         }
182     }
183 
184     public void flush() throws IOException
185     {
186         synchronized (lock)
187         {
188             try
189             {
190                 while (true)
191                 {
192                     if (failure != null)
193                         throw new IOException(failure);
194                     if (size == 0)
195                         break;
196                     lock.wait();
197                 }
198             }
199             catch (InterruptedException x)
200             {
201                 throw new InterruptedIOException();
202             }
203         }
204     }
205 
206     /**
207      * No more content will be added to this content provider
208      * and notifies the listener that no more content is available.
209      */
210     public void close()
211     {
212         if (closed.compareAndSet(false, true))
213             offer(CLOSE);
214     }
215 
216     public boolean isClosed()
217     {
218         return closed.get();
219     }
220 
221     @Override
222     public void failed(Throwable failure)
223     {
224         iterator.failed(failure);
225     }
226 
227     private void notifyListener()
228     {
229         Listener listener = this.listener.get();
230         if (listener != null)
231             listener.onContent();
232     }
233 
234     @Override
235     public Iterator<ByteBuffer> iterator()
236     {
237         return iterator;
238     }
239 
240     private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback, Synchronizable
241     {
242         private Chunk current;
243 
244         @Override
245         public boolean hasNext()
246         {
247             synchronized (lock)
248             {
249                 return chunks.peek() != CLOSE;
250             }
251         }
252 
253         @Override
254         public ByteBuffer next()
255         {
256             synchronized (lock)
257             {
258                 Chunk chunk = current = chunks.poll();
259                 if (chunk == CLOSE)
260                 {
261                     // Slow path: reinsert the CLOSE chunk
262                     // so that hasNext() works correctly.
263                     chunks.offerFirst(CLOSE);
264                     throw new NoSuchElementException();
265                 }
266                 return chunk == null ? null : chunk.buffer;
267             }
268         }
269 
270         @Override
271         public void remove()
272         {
273             throw new UnsupportedOperationException();
274         }
275 
276         @Override
277         public void succeeded()
278         {
279             Chunk chunk;
280             synchronized (lock)
281             {
282                 chunk = current;
283                 if (chunk != null)
284                 {
285                     --size;
286                     lock.notify();
287                 }
288             }
289             if (chunk != null)
290                 chunk.callback.succeeded();
291         }
292 
293         @Override
294         public void failed(Throwable x)
295         {
296             List<Chunk> chunks = new ArrayList<>();
297             synchronized (lock)
298             {
299                 failure = x;
300                 // Transfer all chunks to fail them all.
301                 Chunk chunk = current;
302                 current = null;
303                 if (chunk != null)
304                     chunks.add(chunk);
305                 chunks.addAll(DeferredContentProvider.this.chunks);
306                 clear();
307                 lock.notify();
308             }
309             for (Chunk chunk : chunks)
310                 chunk.callback.failed(x);
311         }
312 
313         @Override
314         public Object getLock()
315         {
316             return lock;
317         }
318     }
319 
320     public static class Chunk
321     {
322         public final ByteBuffer buffer;
323         public final Callback callback;
324 
325         public Chunk(ByteBuffer buffer, Callback callback)
326         {
327             this.buffer = Objects.requireNonNull(buffer);
328             this.callback = Objects.requireNonNull(callback);
329         }
330 
331         @Override
332         public String toString()
333         {
334             return String.format("%s@%x", getClass().getSimpleName(), hashCode());
335         }
336     }
337 }