View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.nio.ByteBuffer;
25  import java.util.Iterator;
26  import java.util.NoSuchElementException;
27  import java.util.Queue;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import org.eclipse.jetty.client.AsyncContentProvider;
32  import org.eclipse.jetty.client.api.ContentProvider;
33  import org.eclipse.jetty.client.api.Request;
34  import org.eclipse.jetty.client.api.Response;
35  import org.eclipse.jetty.util.ArrayQueue;
36  import org.eclipse.jetty.util.Callback;
37  
38  /**
39   * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)}
40   * has been called, therefore providing the request content at a later time.
41   * <p />
42   * {@link DeferredContentProvider} can only be used in conjunction with
43   * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()})
44   * because it provides content asynchronously.
45   * <p />
46   * The deferred content is provided once and then fully consumed.
47   * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
48   * because the stream has been consumed on the first invocation.
49   * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy
50   * the content to another location (for example a file) and be able to support multiple invocations
51   * of of {@link #iterator()} returning the iterator provided by this
52    * class on the first invocation, and an iterator on the bytes copied to the other location
53    * for subsequent invocations.
54   * <p />
55   * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive
56   * separately from HTTP content chunks.
57   * <p />
58   * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple
59   * times, and when all content has been provided it must be signaled with a call to {@link #close()}.
60   * <p />
61   * Example usage:
62   * <pre>
63   * HttpClient httpClient = ...;
64   *
65   * // Use try-with-resources to autoclose DeferredContentProvider
66   * try (DeferredContentProvider content = new DeferredContentProvider())
67   * {
68   *     httpClient.newRequest("localhost", 8080)
69   *             .content(content)
70   *             .send(new Response.CompleteListener()
71   *             {
72   *                 &#64Override
73   *                 public void onComplete(Result result)
74   *                 {
75   *                     // Your logic here
76   *                 }
77   *             });
78   *
79   *     // At a later time...
80   *     content.offer(ByteBuffer.wrap("some content".getBytes()));
81   * }
82   * </pre>
83   */
84  public class DeferredContentProvider implements AsyncContentProvider, Closeable
85  {
86      private static final ByteBuffer CLOSE = ByteBuffer.allocate(0);
87  
88      private final Object lock = this;
89      private final Queue<ByteBuffer> chunks = new ArrayQueue<>(4, 64, lock);
90      private final AtomicReference<Listener> listener = new AtomicReference<>();
91      private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
92      private final AtomicBoolean closed = new AtomicBoolean();
93      private int size;
94      private Throwable failure;
95  
96      /**
97       * Creates a new {@link DeferredContentProvider} with the given initial content
98       *
99       * @param buffers the initial content
100      */
101     public DeferredContentProvider(ByteBuffer... buffers)
102     {
103         for (ByteBuffer buffer : buffers)
104             offer(buffer);
105     }
106 
107     @Override
108     public void setListener(Listener listener)
109     {
110         if (!this.listener.compareAndSet(null, listener))
111             throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
112                     AsyncContentProvider.class.getName()));
113     }
114 
115     @Override
116     public long getLength()
117     {
118         return -1;
119     }
120 
121     /**
122      * Adds the given content buffer to this content provider
123      * and notifies the listener that content is available.
124      *
125      * @param buffer the content to add
126      * @return true if the content was added, false otherwise
127      */
128     public boolean offer(ByteBuffer buffer)
129     {
130         boolean result;
131         synchronized (lock)
132         {
133             result = chunks.offer(buffer);
134             if (result && buffer != CLOSE)
135                 ++size;
136         }
137         if (result)
138             notifyListener();
139         return result;
140     }
141 
142     public void flush() throws IOException
143     {
144         synchronized (lock)
145         {
146             try
147             {
148                 while (true)
149                 {
150                     if (failure != null)
151                         throw new IOException(failure);
152                     if (size == 0)
153                         break;
154                     lock.wait();
155                 }
156             }
157             catch (InterruptedException x)
158             {
159                 throw new InterruptedIOException();
160             }
161         }
162     }
163 
164     /**
165      * No more content will be added to this content provider
166      * and notifies the listener that no more content is available.
167      */
168     public void close()
169     {
170         if (closed.compareAndSet(false, true))
171             offer(CLOSE);
172     }
173 
174     private void notifyListener()
175     {
176         Listener listener = this.listener.get();
177         if (listener != null)
178             listener.onContent();
179     }
180 
181     @Override
182     public Iterator<ByteBuffer> iterator()
183     {
184         return iterator;
185     }
186 
187     private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
188     {
189         private ByteBuffer current;
190 
191         @Override
192         public boolean hasNext()
193         {
194             synchronized (lock)
195             {
196                 return chunks.peek() != CLOSE;
197             }
198         }
199 
200         @Override
201         public ByteBuffer next()
202         {
203             synchronized (lock)
204             {
205                 ByteBuffer element = current = chunks.poll();
206                 if (element == CLOSE)
207                     throw new NoSuchElementException();
208                 return element;
209             }
210         }
211 
212         @Override
213         public void remove()
214         {
215             throw new UnsupportedOperationException();
216         }
217 
218         @Override
219         public void succeeded()
220         {
221             synchronized (lock)
222             {
223                 if (current != null)
224                 {
225                     --size;
226                     lock.notify();
227                 }
228             }
229         }
230 
231         @Override
232         public void failed(Throwable x)
233         {
234             synchronized (lock)
235             {
236                 failure = x;
237                 lock.notify();
238             }
239         }
240     }
241 }