1 //
2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
8 //
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
11 //
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
14 //
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
17 //
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.Queue;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.eclipse.jetty.client.AsyncContentProvider;
33 import org.eclipse.jetty.client.api.ContentProvider;
34 import org.eclipse.jetty.client.api.Request;
35 import org.eclipse.jetty.client.api.Response;
36 import org.eclipse.jetty.util.ArrayQueue;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.Callback;
39
40 /**
41 * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)}
42 * has been called, therefore providing the request content at a later time.
43 * <p />
44 * {@link DeferredContentProvider} can only be used in conjunction with
45 * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()})
46 * because it provides content asynchronously.
47 * <p />
48 * The deferred content is provided once and then fully consumed.
49 * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
50 * because the stream has been consumed on the first invocation.
51 * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy
52 * the content to another location (for example a file) and be able to support multiple invocations
53 * of of {@link #iterator()} returning the iterator provided by this
54 * class on the first invocation, and an iterator on the bytes copied to the other location
55 * for subsequent invocations.
56 * <p />
57 * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive
58 * separately from HTTP content chunks.
59 * <p />
60 * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple
61 * times, and when all content has been provided it must be signaled with a call to {@link #close()}.
62 * <p />
63 * Example usage:
64 * <pre>
65 * HttpClient httpClient = ...;
66 *
67 * // Use try-with-resources to autoclose DeferredContentProvider
68 * try (DeferredContentProvider content = new DeferredContentProvider())
69 * {
70 * httpClient.newRequest("localhost", 8080)
71 * .content(content)
72 * .send(new Response.CompleteListener()
73 * {
74 * @Override
75 * public void onComplete(Result result)
76 * {
77 * // Your logic here
78 * }
79 * });
80 *
81 * // At a later time...
82 * content.offer(ByteBuffer.wrap("some content".getBytes()));
83 * }
84 * </pre>
85 */
86 public class DeferredContentProvider implements AsyncContentProvider, Closeable
87 {
88 private static final Callback EMPTY_CALLBACK = new Callback.Adapter();
89 private static final AsyncChunk CLOSE = new AsyncChunk(BufferUtil.EMPTY_BUFFER, EMPTY_CALLBACK);
90
91 private final Object lock = this;
92 private final Queue<AsyncChunk> chunks = new ArrayQueue<>(4, 64, lock);
93 private final AtomicReference<Listener> listener = new AtomicReference<>();
94 private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
95 private final AtomicBoolean closed = new AtomicBoolean();
96 private int size;
97 private Throwable failure;
98
99 /**
100 * Creates a new {@link DeferredContentProvider} with the given initial content
101 *
102 * @param buffers the initial content
103 */
104 public DeferredContentProvider(ByteBuffer... buffers)
105 {
106 for (ByteBuffer buffer : buffers)
107 offer(buffer);
108 }
109
110 @Override
111 public void setListener(Listener listener)
112 {
113 if (!this.listener.compareAndSet(null, listener))
114 throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
115 AsyncContentProvider.class.getName()));
116 }
117
118 @Override
119 public long getLength()
120 {
121 return -1;
122 }
123
124 /**
125 * Adds the given content buffer to this content provider
126 * and notifies the listener that content is available.
127 *
128 * @param buffer the content to add
129 * @return true if the content was added, false otherwise
130 */
131 public boolean offer(ByteBuffer buffer)
132 {
133 return offer(buffer, EMPTY_CALLBACK);
134 }
135
136 public boolean offer(ByteBuffer buffer, Callback callback)
137 {
138 return offer(new AsyncChunk(buffer, callback));
139 }
140
141 private boolean offer(AsyncChunk chunk)
142 {
143 boolean result;
144 synchronized (lock)
145 {
146 result = chunks.offer(chunk);
147 if (result && chunk != CLOSE)
148 ++size;
149 }
150 if (result)
151 notifyListener();
152 return result;
153 }
154
155 public void flush() throws IOException
156 {
157 synchronized (lock)
158 {
159 try
160 {
161 while (true)
162 {
163 if (failure != null)
164 throw new IOException(failure);
165 if (size == 0)
166 break;
167 lock.wait();
168 }
169 }
170 catch (InterruptedException x)
171 {
172 throw new InterruptedIOException();
173 }
174 }
175 }
176
177 /**
178 * No more content will be added to this content provider
179 * and notifies the listener that no more content is available.
180 */
181 public void close()
182 {
183 if (closed.compareAndSet(false, true))
184 offer(CLOSE);
185 }
186
187 private void notifyListener()
188 {
189 Listener listener = this.listener.get();
190 if (listener != null)
191 listener.onContent();
192 }
193
194 @Override
195 public Iterator<ByteBuffer> iterator()
196 {
197 return iterator;
198 }
199
200 private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
201 {
202 private AsyncChunk current;
203
204 @Override
205 public boolean hasNext()
206 {
207 synchronized (lock)
208 {
209 return chunks.peek() != CLOSE;
210 }
211 }
212
213 @Override
214 public ByteBuffer next()
215 {
216 synchronized (lock)
217 {
218 AsyncChunk chunk = current = chunks.poll();
219 if (chunk == CLOSE)
220 throw new NoSuchElementException();
221 return chunk == null ? null : chunk.buffer;
222 }
223 }
224
225 @Override
226 public void remove()
227 {
228 throw new UnsupportedOperationException();
229 }
230
231 @Override
232 public void succeeded()
233 {
234 AsyncChunk chunk;
235 synchronized (lock)
236 {
237 chunk = current;
238 if (chunk != null)
239 {
240 --size;
241 lock.notify();
242 }
243 }
244 if (chunk != null)
245 chunk.callback.succeeded();
246 }
247
248 @Override
249 public void failed(Throwable x)
250 {
251 AsyncChunk chunk;
252 synchronized (lock)
253 {
254 chunk = current;
255 failure = x;
256 lock.notify();
257 }
258 if (chunk != null)
259 chunk.callback.failed(x);
260 }
261 }
262
263 private static class AsyncChunk
264 {
265 private final ByteBuffer buffer;
266 private final Callback callback;
267
268 private AsyncChunk(ByteBuffer buffer, Callback callback)
269 {
270 this.buffer = Objects.requireNonNull(buffer);
271 this.callback = Objects.requireNonNull(callback);
272 }
273 }
274 }