View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.NoSuchElementException;
29  import java.util.Objects;
30  import java.util.Queue;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import org.eclipse.jetty.client.AsyncContentProvider;
35  import org.eclipse.jetty.client.api.ContentProvider;
36  import org.eclipse.jetty.client.api.Request;
37  import org.eclipse.jetty.client.api.Response;
38  import org.eclipse.jetty.util.ArrayQueue;
39  import org.eclipse.jetty.util.BufferUtil;
40  import org.eclipse.jetty.util.Callback;
41  
42  /**
43   * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)}
44   * has been called, therefore providing the request content at a later time.
45   * <p />
46   * {@link DeferredContentProvider} can only be used in conjunction with
47   * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()})
48   * because it provides content asynchronously.
49   * <p />
50   * The deferred content is provided once and then fully consumed.
51   * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
52   * because the stream has been consumed on the first invocation.
53   * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy
54   * the content to another location (for example a file) and be able to support multiple invocations
55   * of of {@link #iterator()} returning the iterator provided by this
56    * class on the first invocation, and an iterator on the bytes copied to the other location
57    * for subsequent invocations.
58   * <p />
59   * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive
60   * separately from HTTP content chunks.
61   * <p />
62   * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple
63   * times, and when all content has been provided it must be signaled with a call to {@link #close()}.
64   * <p />
65   * Example usage:
66   * <pre>
67   * HttpClient httpClient = ...;
68   *
69   * // Use try-with-resources to autoclose DeferredContentProvider
70   * try (DeferredContentProvider content = new DeferredContentProvider())
71   * {
72   *     httpClient.newRequest("localhost", 8080)
73   *             .content(content)
74   *             .send(new Response.CompleteListener()
75   *             {
76   *                 &#64Override
77   *                 public void onComplete(Result result)
78   *                 {
79   *                     // Your logic here
80   *                 }
81   *             });
82   *
83   *     // At a later time...
84   *     content.offer(ByteBuffer.wrap("some content".getBytes()));
85   * }
86   * </pre>
87   */
88  public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable
89  {
90      private static final AsyncChunk CLOSE = new AsyncChunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE);
91  
92      private final Object lock = this;
93      private final Queue<AsyncChunk> chunks = new ArrayQueue<>(4, 64, lock);
94      private final AtomicReference<Listener> listener = new AtomicReference<>();
95      private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
96      private final AtomicBoolean closed = new AtomicBoolean();
97      private int size;
98      private Throwable failure;
99  
100     /**
101      * Creates a new {@link DeferredContentProvider} with the given initial content
102      *
103      * @param buffers the initial content
104      */
105     public DeferredContentProvider(ByteBuffer... buffers)
106     {
107         for (ByteBuffer buffer : buffers)
108             offer(buffer);
109     }
110 
111     @Override
112     public void setListener(Listener listener)
113     {
114         if (!this.listener.compareAndSet(null, listener))
115             throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
116                     AsyncContentProvider.class.getName()));
117     }
118 
119     @Override
120     public long getLength()
121     {
122         return -1;
123     }
124 
125     /**
126      * Adds the given content buffer to this content provider
127      * and notifies the listener that content is available.
128      *
129      * @param buffer the content to add
130      * @return true if the content was added, false otherwise
131      */
132     public boolean offer(ByteBuffer buffer)
133     {
134         return offer(buffer, Callback.Adapter.INSTANCE);
135     }
136 
137     public boolean offer(ByteBuffer buffer, Callback callback)
138     {
139         return offer(new AsyncChunk(buffer, callback));
140     }
141 
142     private boolean offer(AsyncChunk chunk)
143     {
144         Throwable failure;
145         boolean result = false;
146         synchronized (lock)
147         {
148             failure = this.failure;
149             if (failure == null)
150             {
151                 result = chunks.offer(chunk);
152                 if (result && chunk != CLOSE)
153                     ++size;
154             }
155         }
156         if (failure != null)
157             chunk.callback.failed(failure);
158         else if (result)
159             notifyListener();
160         return result;
161     }
162 
163     private void clear()
164     {
165         synchronized (lock)
166         {
167             chunks.clear();
168         }
169     }
170 
171     public void flush() throws IOException
172     {
173         synchronized (lock)
174         {
175             try
176             {
177                 while (true)
178                 {
179                     if (failure != null)
180                         throw new IOException(failure);
181                     if (size == 0)
182                         break;
183                     lock.wait();
184                 }
185             }
186             catch (InterruptedException x)
187             {
188                 throw new InterruptedIOException();
189             }
190         }
191     }
192 
193     /**
194      * No more content will be added to this content provider
195      * and notifies the listener that no more content is available.
196      */
197     public void close()
198     {
199         if (closed.compareAndSet(false, true))
200             offer(CLOSE);
201     }
202 
203     @Override
204     public void succeeded()
205     {
206     }
207 
208     @Override
209     public void failed(Throwable failure)
210     {
211         iterator.failed(failure);
212     }
213 
214     private void notifyListener()
215     {
216         Listener listener = this.listener.get();
217         if (listener != null)
218             listener.onContent();
219     }
220 
221     @Override
222     public Iterator<ByteBuffer> iterator()
223     {
224         return iterator;
225     }
226 
227     private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
228     {
229         private AsyncChunk current;
230 
231         @Override
232         public boolean hasNext()
233         {
234             synchronized (lock)
235             {
236                 return chunks.peek() != CLOSE;
237             }
238         }
239 
240         @Override
241         public ByteBuffer next()
242         {
243             synchronized (lock)
244             {
245                 AsyncChunk chunk = current = chunks.poll();
246                 if (chunk == CLOSE)
247                     throw new NoSuchElementException();
248                 return chunk == null ? null : chunk.buffer;
249             }
250         }
251 
252         @Override
253         public void remove()
254         {
255             throw new UnsupportedOperationException();
256         }
257 
258         @Override
259         public void succeeded()
260         {
261             AsyncChunk chunk;
262             synchronized (lock)
263             {
264                 chunk = current;
265                 if (chunk != null)
266                 {
267                     --size;
268                     lock.notify();
269                 }
270             }
271             if (chunk != null)
272                 chunk.callback.succeeded();
273         }
274 
275         @Override
276         public void failed(Throwable x)
277         {
278             List<AsyncChunk> chunks = new ArrayList<>();
279             synchronized (lock)
280             {
281                 failure = x;
282                 // Transfer all chunks to fail them all.
283                 chunks.addAll(DeferredContentProvider.this.chunks);
284                 clear();
285                 current = null;
286                 lock.notify();
287             }
288             for (AsyncChunk chunk : chunks)
289                 chunk.callback.failed(x);
290         }
291     }
292 
293     private static class AsyncChunk
294     {
295         private final ByteBuffer buffer;
296         private final Callback callback;
297 
298         private AsyncChunk(ByteBuffer buffer, Callback callback)
299         {
300             this.buffer = Objects.requireNonNull(buffer);
301             this.callback = Objects.requireNonNull(callback);
302         }
303     }
304 }