View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.util;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.nio.ByteBuffer;
25  import java.util.Iterator;
26  import java.util.NoSuchElementException;
27  
28  import org.eclipse.jetty.client.api.ContentProvider;
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.Callback;
31  import org.eclipse.jetty.util.log.Log;
32  import org.eclipse.jetty.util.log.Logger;
33  
34  /**
35   * A {@link ContentProvider} for an {@link InputStream}.
36   * <p>
37   * The input stream is read once and therefore fully consumed.
38   * Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
39   * because the stream has been consumed on the first invocation.
40   * <p>
41   * However, it is possible for subclasses to override {@link #onRead(byte[], int, int)} to copy
42   * the content read from the stream to another location (for example a file), and be able to
43   * support multiple invocations of {@link #iterator()}, returning the iterator provided by this
44   * class on the first invocation, and an iterator on the bytes copied to the other location
45   * for subsequent invocations.
46   * <p>
47   * It is possible to specify, at the constructor, a buffer size used to read content from the
48   * stream, by default 4096 bytes.
49   * <p>
50   * The {@link InputStream} passed to the constructor is by default closed when is it fully
51   * consumed (or when an exception is thrown while reading it), unless otherwise specified
52   * to the {@link #InputStreamContentProvider(java.io.InputStream, int, boolean) constructor}.
53   */
54  public class InputStreamContentProvider implements ContentProvider, Callback, Closeable
55  {
56      private static final Logger LOG = Log.getLogger(InputStreamContentProvider.class);
57  
58      private final InputStreamContentProviderIterator iterator = new InputStreamContentProviderIterator();
59      private final InputStream stream;
60      private final int bufferSize;
61      private final boolean autoClose;
62  
63      public InputStreamContentProvider(InputStream stream)
64      {
65          this(stream, 4096);
66      }
67  
68      public InputStreamContentProvider(InputStream stream, int bufferSize)
69      {
70          this(stream, bufferSize, true);
71      }
72  
73      public InputStreamContentProvider(InputStream stream, int bufferSize, boolean autoClose)
74      {
75          this.stream = stream;
76          this.bufferSize = bufferSize;
77          this.autoClose = autoClose;
78      }
79  
80      @Override
81      public long getLength()
82      {
83          return -1;
84      }
85  
86      /**
87       * Callback method invoked just after having read from the stream,
88       * but before returning the iteration element (a {@link ByteBuffer}
89       * to the caller.
90       * <p>
91       * Subclasses may override this method to copy the content read from
92       * the stream to another location (a file, or in memory if the content
93       * is known to fit).
94       *
95       * @param buffer the byte array containing the bytes read
96       * @param offset the offset from where bytes should be read
97       * @param length the length of the bytes read
98       * @return a {@link ByteBuffer} wrapping the byte array
99       */
100     protected ByteBuffer onRead(byte[] buffer, int offset, int length)
101     {
102         if (length <= 0)
103             return BufferUtil.EMPTY_BUFFER;
104         return ByteBuffer.wrap(buffer, offset, length);
105     }
106 
107     /**
108      * Callback method invoked when an exception is thrown while reading
109      * from the stream.
110      *
111      * @param failure the exception thrown while reading from the stream.
112      */
113     protected void onReadFailure(Throwable failure)
114     {
115     }
116 
117     @Override
118     public Iterator<ByteBuffer> iterator()
119     {
120         return iterator;
121     }
122 
123     @Override
124     public void close()
125     {
126         if (autoClose)
127         {
128             try
129             {
130                 stream.close();
131             }
132             catch (IOException x)
133             {
134                 LOG.ignore(x);
135             }
136         }
137     }
138 
139     @Override
140     public void succeeded()
141     {
142     }
143 
144     @Override
145     public void failed(Throwable failure)
146     {
147         // TODO: forward the failure to the iterator.
148         close();
149     }
150 
151     /**
152      * Iterating over an {@link InputStream} is tricky, because {@link #hasNext()} must return false
153      * if the stream reads -1. However, we don't know what to return until we read the stream, which
154      * means that stream reading must be performed by {@link #hasNext()}, which introduces a side-effect
155      * on what is supposed to be a simple query method (with respect to the Query Command Separation
156      * Principle).
157      * <p>
158      * Alternatively, we could return {@code true} from {@link #hasNext()} even if we don't know that
159      * we will read -1, but then when {@link #next()} reads -1 it must return an empty buffer.
160      * However this is problematic, since GETs with no content indication would become GET with chunked
161      * content, and not understood by servers.
162      * <p>
163      * Therefore we need to make sure that {@link #hasNext()} does not perform any side effect (so that
164      * it can be called multiple times) until {@link #next()} is called.
165      */
166     private class InputStreamContentProviderIterator implements Iterator<ByteBuffer>, Closeable
167     {
168         private Throwable failure;
169         private ByteBuffer buffer;
170         private Boolean hasNext;
171 
172         @Override
173         public boolean hasNext()
174         {
175             try
176             {
177                 if (hasNext != null)
178                     return hasNext;
179 
180                 byte[] bytes = new byte[bufferSize];
181                 int read = stream.read(bytes);
182                 if (LOG.isDebugEnabled())
183                     LOG.debug("Read {} bytes from {}", read, stream);
184                 if (read > 0)
185                 {
186                     hasNext = Boolean.TRUE;
187                     buffer = onRead(bytes, 0, read);
188                     return true;
189                 }
190                 else if (read < 0)
191                 {
192                     hasNext = Boolean.FALSE;
193                     buffer = null;
194                     close();
195                     return false;
196                 }
197                 else
198                 {
199                     hasNext = Boolean.TRUE;
200                     buffer = BufferUtil.EMPTY_BUFFER;
201                     return true;
202                 }
203             }
204             catch (Throwable x)
205             {
206                 if (LOG.isDebugEnabled())
207                     LOG.debug(x);
208                 if (failure == null)
209                 {
210                     failure = x;
211                     onReadFailure(x);
212                     // Signal we have more content to cause a call to
213                     // next() which will throw NoSuchElementException.
214                     hasNext = Boolean.TRUE;
215                     buffer = null;
216                     close();
217                     return true;
218                 }
219                 throw new IllegalStateException();
220             }
221         }
222 
223         @Override
224         public ByteBuffer next()
225         {
226             if (failure != null)
227             {
228                 // Consume the failure so that calls to hasNext() will return false.
229                 hasNext = Boolean.FALSE;
230                 buffer = null;
231                 throw (NoSuchElementException)new NoSuchElementException().initCause(failure);
232             }
233             if (!hasNext())
234                 throw new NoSuchElementException();
235 
236             ByteBuffer result = buffer;
237             if (result == null)
238             {
239                 hasNext = Boolean.FALSE;
240                 buffer = null;
241                 throw new NoSuchElementException();
242             }
243             else
244             {
245                 hasNext = null;
246                 buffer = null;
247                 return result;
248             }
249         }
250 
251         @Override
252         public void remove()
253         {
254             throw new UnsupportedOperationException();
255         }
256 
257         @Override
258         public void close()
259         {
260             InputStreamContentProvider.this.close();
261         }
262     }
263 }