View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.util.Iterator;
26  import java.util.NoSuchElementException;
27  import java.util.Objects;
28  import java.util.Queue;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.eclipse.jetty.client.AsyncContentProvider;
33  import org.eclipse.jetty.client.api.ContentProvider;
34  import org.eclipse.jetty.client.api.Request;
35  import org.eclipse.jetty.client.api.Response;
36  import org.eclipse.jetty.util.ArrayQueue;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.Callback;
39  
40  /**
41   * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)}
42   * has been called, therefore providing the request content at a later time.
43   * <p />
44   * {@link DeferredContentProvider} can only be used in conjunction with
45   * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()})
46   * because it provides content asynchronously.
47   * <p />
48   * The deferred content is provided once and then fully consumed.
49   * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
50   * because the stream has been consumed on the first invocation.
51   * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy
52   * the content to another location (for example a file) and be able to support multiple invocations
53   * of of {@link #iterator()} returning the iterator provided by this
54    * class on the first invocation, and an iterator on the bytes copied to the other location
55    * for subsequent invocations.
56   * <p />
57   * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive
58   * separately from HTTP content chunks.
59   * <p />
60   * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple
61   * times, and when all content has been provided it must be signaled with a call to {@link #close()}.
62   * <p />
63   * Example usage:
64   * <pre>
65   * HttpClient httpClient = ...;
66   *
67   * // Use try-with-resources to autoclose DeferredContentProvider
68   * try (DeferredContentProvider content = new DeferredContentProvider())
69   * {
70   *     httpClient.newRequest("localhost", 8080)
71   *             .content(content)
72   *             .send(new Response.CompleteListener()
73   *             {
74   *                 &#64Override
75   *                 public void onComplete(Result result)
76   *                 {
77   *                     // Your logic here
78   *                 }
79   *             });
80   *
81   *     // At a later time...
82   *     content.offer(ByteBuffer.wrap("some content".getBytes()));
83   * }
84   * </pre>
85   */
86  public class DeferredContentProvider implements AsyncContentProvider, Closeable
87  {
88      private static final Callback EMPTY_CALLBACK = new Callback.Adapter();
89      private static final AsyncChunk CLOSE = new AsyncChunk(BufferUtil.EMPTY_BUFFER, EMPTY_CALLBACK);
90  
91      private final Object lock = this;
92      private final Queue<AsyncChunk> chunks = new ArrayQueue<>(4, 64, lock);
93      private final AtomicReference<Listener> listener = new AtomicReference<>();
94      private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
95      private final AtomicBoolean closed = new AtomicBoolean();
96      private int size;
97      private Throwable failure;
98  
99      /**
100      * Creates a new {@link DeferredContentProvider} with the given initial content
101      *
102      * @param buffers the initial content
103      */
104     public DeferredContentProvider(ByteBuffer... buffers)
105     {
106         for (ByteBuffer buffer : buffers)
107             offer(buffer);
108     }
109 
110     @Override
111     public void setListener(Listener listener)
112     {
113         if (!this.listener.compareAndSet(null, listener))
114             throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
115                     AsyncContentProvider.class.getName()));
116     }
117 
118     @Override
119     public long getLength()
120     {
121         return -1;
122     }
123 
124     /**
125      * Adds the given content buffer to this content provider
126      * and notifies the listener that content is available.
127      *
128      * @param buffer the content to add
129      * @return true if the content was added, false otherwise
130      */
131     public boolean offer(ByteBuffer buffer)
132     {
133         return offer(buffer, EMPTY_CALLBACK);
134     }
135 
136     public boolean offer(ByteBuffer buffer, Callback callback)
137     {
138         return offer(new AsyncChunk(buffer, callback));
139     }
140 
141     private boolean offer(AsyncChunk chunk)
142     {
143         boolean result;
144         synchronized (lock)
145         {
146             result = chunks.offer(chunk);
147             if (result && chunk != CLOSE)
148                 ++size;
149         }
150         if (result)
151             notifyListener();
152         return result;
153     }
154 
155     public void flush() throws IOException
156     {
157         synchronized (lock)
158         {
159             try
160             {
161                 while (true)
162                 {
163                     if (failure != null)
164                         throw new IOException(failure);
165                     if (size == 0)
166                         break;
167                     lock.wait();
168                 }
169             }
170             catch (InterruptedException x)
171             {
172                 throw new InterruptedIOException();
173             }
174         }
175     }
176 
177     /**
178      * No more content will be added to this content provider
179      * and notifies the listener that no more content is available.
180      */
181     public void close()
182     {
183         if (closed.compareAndSet(false, true))
184             offer(CLOSE);
185     }
186 
187     private void notifyListener()
188     {
189         Listener listener = this.listener.get();
190         if (listener != null)
191             listener.onContent();
192     }
193 
194     @Override
195     public Iterator<ByteBuffer> iterator()
196     {
197         return iterator;
198     }
199 
200     private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
201     {
202         private AsyncChunk current;
203 
204         @Override
205         public boolean hasNext()
206         {
207             synchronized (lock)
208             {
209                 return chunks.peek() != CLOSE;
210             }
211         }
212 
213         @Override
214         public ByteBuffer next()
215         {
216             synchronized (lock)
217             {
218                 AsyncChunk chunk = current = chunks.poll();
219                 if (chunk == CLOSE)
220                     throw new NoSuchElementException();
221                 return chunk == null ? null : chunk.buffer;
222             }
223         }
224 
225         @Override
226         public void remove()
227         {
228             throw new UnsupportedOperationException();
229         }
230 
231         @Override
232         public void succeeded()
233         {
234             AsyncChunk chunk;
235             synchronized (lock)
236             {
237                 chunk = current;
238                 if (chunk != null)
239                 {
240                     --size;
241                     lock.notify();
242                 }
243             }
244             if (chunk != null)
245                 chunk.callback.succeeded();
246         }
247 
248         @Override
249         public void failed(Throwable x)
250         {
251             AsyncChunk chunk;
252             synchronized (lock)
253             {
254                 chunk = current;
255                 failure = x;
256                 lock.notify();
257             }
258             if (chunk != null)
259                 chunk.callback.failed(x);
260         }
261     }
262 
263     private static class AsyncChunk
264     {
265         private final ByteBuffer buffer;
266         private final Callback callback;
267 
268         private AsyncChunk(ByteBuffer buffer, Callback callback)
269         {
270             this.buffer = Objects.requireNonNull(buffer);
271             this.callback = Objects.requireNonNull(callback);
272         }
273     }
274 }