1 //
2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
8 //
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
11 //
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
14 //
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
17 //
18
19 package org.eclipse.jetty.client.util;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.nio.ByteBuffer;
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27
28 import org.eclipse.jetty.client.api.ContentProvider;
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.Callback;
31 import org.eclipse.jetty.util.log.Log;
32 import org.eclipse.jetty.util.log.Logger;
33
34 /**
35 * A {@link ContentProvider} for an {@link InputStream}.
36 * <p>
37 * The input stream is read once and therefore fully consumed.
38 * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
39 * because the stream has been consumed on the first invocation.
40 * <p>
41 * However, it is possible for subclasses to override {@link #onRead(byte[], int, int)} to copy
42 * the content read from the stream to another location (for example a file), and be able to
43 * support multiple invocations of {@link #iterator()}, returning the iterator provided by this
44 * class on the first invocation, and an iterator on the bytes copied to the other location
45 * for subsequent invocations.
46 * <p>
47 * It is possible to specify, at the constructor, a buffer size used to read content from the
48 * stream, by default 4096 bytes.
49 * <p>
50 * The {@link InputStream} passed to the constructor is by default closed when is it fully
51 * consumed (or when an exception is thrown while reading it), unless otherwise specified
52 * to the {@link #InputStreamContentProvider(java.io.InputStream, int, boolean) constructor}.
53 */
54 public class InputStreamContentProvider implements ContentProvider, Callback, Closeable
55 {
56 private static final Logger LOG = Log.getLogger(InputStreamContentProvider.class);
57
58 private final InputStreamContentProviderIterator iterator = new InputStreamContentProviderIterator();
59 private final InputStream stream;
60 private final int bufferSize;
61 private final boolean autoClose;
62
63 public InputStreamContentProvider(InputStream stream)
64 {
65 this(stream, 4096);
66 }
67
68 public InputStreamContentProvider(InputStream stream, int bufferSize)
69 {
70 this(stream, bufferSize, true);
71 }
72
73 public InputStreamContentProvider(InputStream stream, int bufferSize, boolean autoClose)
74 {
75 this.stream = stream;
76 this.bufferSize = bufferSize;
77 this.autoClose = autoClose;
78 }
79
80 @Override
81 public long getLength()
82 {
83 return -1;
84 }
85
86 /**
87 * Callback method invoked just after having read from the stream,
88 * but before returning the iteration element (a {@link ByteBuffer}
89 * to the caller.
90 * <p>
91 * Subclasses may override this method to copy the content read from
92 * the stream to another location (a file, or in memory if the content
93 * is known to fit).
94 *
95 * @param buffer the byte array containing the bytes read
96 * @param offset the offset from where bytes should be read
97 * @param length the length of the bytes read
98 * @return a {@link ByteBuffer} wrapping the byte array
99 */
100 protected ByteBuffer onRead(byte[] buffer, int offset, int length)
101 {
102 if (length <= 0)
103 return BufferUtil.EMPTY_BUFFER;
104 return ByteBuffer.wrap(buffer, offset, length);
105 }
106
107 /**
108 * Callback method invoked when an exception is thrown while reading
109 * from the stream.
110 *
111 * @param failure the exception thrown while reading from the stream.
112 */
113 protected void onReadFailure(Throwable failure)
114 {
115 }
116
117 @Override
118 public Iterator<ByteBuffer> iterator()
119 {
120 return iterator;
121 }
122
123 @Override
124 public void close()
125 {
126 if (autoClose)
127 {
128 try
129 {
130 stream.close();
131 }
132 catch (IOException x)
133 {
134 LOG.ignore(x);
135 }
136 }
137 }
138
139 @Override
140 public void succeeded()
141 {
142 }
143
144 @Override
145 public void failed(Throwable failure)
146 {
147 // TODO: forward the failure to the iterator.
148 close();
149 }
150
151 /**
152 * Iterating over an {@link InputStream} is tricky, because {@link #hasNext()} must return false
153 * if the stream reads -1. However, we don't know what to return until we read the stream, which
154 * means that stream reading must be performed by {@link #hasNext()}, which introduces a side-effect
155 * on what is supposed to be a simple query method (with respect to the Query Command Separation
156 * Principle).
157 * <p>
158 * Alternatively, we could return {@code true} from {@link #hasNext()} even if we don't know that
159 * we will read -1, but then when {@link #next()} reads -1 it must return an empty buffer.
160 * However this is problematic, since GETs with no content indication would become GET with chunked
161 * content, and not understood by servers.
162 * <p>
163 * Therefore we need to make sure that {@link #hasNext()} does not perform any side effect (so that
164 * it can be called multiple times) until {@link #next()} is called.
165 */
166 private class InputStreamContentProviderIterator implements Iterator<ByteBuffer>, Closeable
167 {
168 private Throwable failure;
169 private ByteBuffer buffer;
170 private Boolean hasNext;
171
172 @Override
173 public boolean hasNext()
174 {
175 try
176 {
177 if (hasNext != null)
178 return hasNext;
179
180 byte[] bytes = new byte[bufferSize];
181 int read = stream.read(bytes);
182 if (LOG.isDebugEnabled())
183 LOG.debug("Read {} bytes from {}", read, stream);
184 if (read > 0)
185 {
186 hasNext = Boolean.TRUE;
187 buffer = onRead(bytes, 0, read);
188 return true;
189 }
190 else if (read < 0)
191 {
192 hasNext = Boolean.FALSE;
193 buffer = null;
194 close();
195 return false;
196 }
197 else
198 {
199 hasNext = Boolean.TRUE;
200 buffer = BufferUtil.EMPTY_BUFFER;
201 return true;
202 }
203 }
204 catch (Throwable x)
205 {
206 if (LOG.isDebugEnabled())
207 LOG.debug(x);
208 if (failure == null)
209 {
210 failure = x;
211 onReadFailure(x);
212 // Signal we have more content to cause a call to
213 // next() which will throw NoSuchElementException.
214 hasNext = Boolean.TRUE;
215 buffer = null;
216 close();
217 return true;
218 }
219 throw new IllegalStateException();
220 }
221 }
222
223 @Override
224 public ByteBuffer next()
225 {
226 if (failure != null)
227 {
228 // Consume the failure so that calls to hasNext() will return false.
229 hasNext = Boolean.FALSE;
230 buffer = null;
231 throw (NoSuchElementException)new NoSuchElementException().initCause(failure);
232 }
233 if (!hasNext())
234 throw new NoSuchElementException();
235
236 ByteBuffer result = buffer;
237 if (result == null)
238 {
239 hasNext = Boolean.FALSE;
240 buffer = null;
241 throw new NoSuchElementException();
242 }
243 else
244 {
245 hasNext = null;
246 buffer = null;
247 return result;
248 }
249 }
250
251 @Override
252 public void remove()
253 {
254 throw new UnsupportedOperationException();
255 }
256
257 @Override
258 public void close()
259 {
260 InputStreamContentProvider.this.close();
261 }
262 }
263 }