1 //
2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
8 //
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
11 //
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
14 //
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
17 //
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.Queue;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import org.eclipse.jetty.client.AsyncContentProvider;
32 import org.eclipse.jetty.client.api.ContentProvider;
33 import org.eclipse.jetty.client.api.Request;
34 import org.eclipse.jetty.client.api.Response;
35 import org.eclipse.jetty.util.ArrayQueue;
36 import org.eclipse.jetty.util.Callback;
37
38 /**
39 * A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)}
40 * has been called, therefore providing the request content at a later time.
41 * <p />
42 * {@link DeferredContentProvider} can only be used in conjunction with
43 * {@link Request#send(Response.CompleteListener)} (and not with its blocking counterpart {@link Request#send()})
44 * because it provides content asynchronously.
45 * <p />
46 * The deferred content is provided once and then fully consumed.
47 * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
48 * because the stream has been consumed on the first invocation.
49 * However, it is possible for subclasses to override {@link #offer(ByteBuffer)} and {@link #close()} to copy
50 * the content to another location (for example a file) and be able to support multiple invocations
51 * of of {@link #iterator()} returning the iterator provided by this
52 * class on the first invocation, and an iterator on the bytes copied to the other location
53 * for subsequent invocations.
54 * <p />
55 * Typical usage of {@link DeferredContentProvider} is in asynchronous proxies, where HTTP headers arrive
56 * separately from HTTP content chunks.
57 * <p />
58 * The deferred content must be provided through {@link #offer(ByteBuffer)}, which can be invoked multiple
59 * times, and when all content has been provided it must be signaled with a call to {@link #close()}.
60 * <p />
61 * Example usage:
62 * <pre>
63 * HttpClient httpClient = ...;
64 *
65 * // Use try-with-resources to autoclose DeferredContentProvider
66 * try (DeferredContentProvider content = new DeferredContentProvider())
67 * {
68 * httpClient.newRequest("localhost", 8080)
69 * .content(content)
70 * .send(new Response.CompleteListener()
71 * {
72 * @Override
73 * public void onComplete(Result result)
74 * {
75 * // Your logic here
76 * }
77 * });
78 *
79 * // At a later time...
80 * content.offer(ByteBuffer.wrap("some content".getBytes()));
81 * }
82 * </pre>
83 */
84 public class DeferredContentProvider implements AsyncContentProvider, Closeable
85 {
86 private static final ByteBuffer CLOSE = ByteBuffer.allocate(0);
87
88 private final Object lock = this;
89 private final Queue<ByteBuffer> chunks = new ArrayQueue<>(4, 64, lock);
90 private final AtomicReference<Listener> listener = new AtomicReference<>();
91 private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
92 private final AtomicBoolean closed = new AtomicBoolean();
93 private int size;
94 private Throwable failure;
95
96 /**
97 * Creates a new {@link DeferredContentProvider} with the given initial content
98 *
99 * @param buffers the initial content
100 */
101 public DeferredContentProvider(ByteBuffer... buffers)
102 {
103 for (ByteBuffer buffer : buffers)
104 offer(buffer);
105 }
106
107 @Override
108 public void setListener(Listener listener)
109 {
110 if (!this.listener.compareAndSet(null, listener))
111 throw new IllegalStateException(String.format("The same %s instance cannot be used in multiple requests",
112 AsyncContentProvider.class.getName()));
113 }
114
115 @Override
116 public long getLength()
117 {
118 return -1;
119 }
120
121 /**
122 * Adds the given content buffer to this content provider
123 * and notifies the listener that content is available.
124 *
125 * @param buffer the content to add
126 * @return true if the content was added, false otherwise
127 */
128 public boolean offer(ByteBuffer buffer)
129 {
130 boolean result;
131 synchronized (lock)
132 {
133 result = chunks.offer(buffer);
134 if (result && buffer != CLOSE)
135 ++size;
136 }
137 if (result)
138 notifyListener();
139 return result;
140 }
141
142 public void flush() throws IOException
143 {
144 synchronized (lock)
145 {
146 try
147 {
148 while (true)
149 {
150 if (failure != null)
151 throw new IOException(failure);
152 if (size == 0)
153 break;
154 lock.wait();
155 }
156 }
157 catch (InterruptedException x)
158 {
159 throw new InterruptedIOException();
160 }
161 }
162 }
163
164 /**
165 * No more content will be added to this content provider
166 * and notifies the listener that no more content is available.
167 */
168 public void close()
169 {
170 if (closed.compareAndSet(false, true))
171 offer(CLOSE);
172 }
173
174 private void notifyListener()
175 {
176 Listener listener = this.listener.get();
177 if (listener != null)
178 listener.onContent();
179 }
180
181 @Override
182 public Iterator<ByteBuffer> iterator()
183 {
184 return iterator;
185 }
186
187 private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
188 {
189 private ByteBuffer current;
190
191 @Override
192 public boolean hasNext()
193 {
194 synchronized (lock)
195 {
196 return chunks.peek() != CLOSE;
197 }
198 }
199
200 @Override
201 public ByteBuffer next()
202 {
203 synchronized (lock)
204 {
205 ByteBuffer element = current = chunks.poll();
206 if (element == CLOSE)
207 throw new NoSuchElementException();
208 return element;
209 }
210 }
211
212 @Override
213 public void remove()
214 {
215 throw new UnsupportedOperationException();
216 }
217
218 @Override
219 public void succeeded()
220 {
221 synchronized (lock)
222 {
223 if (current != null)
224 {
225 --size;
226 lock.notify();
227 }
228 }
229 }
230
231 @Override
232 public void failed(Throwable x)
233 {
234 synchronized (lock)
235 {
236 failure = x;
237 lock.notify();
238 }
239 }
240 }
241 }