View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.util.Objects;
23  import javax.servlet.ReadListener;
24  import javax.servlet.ServletInputStream;
25  
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.RuntimeIOException;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /**
32   * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
33   * <p/>
34   * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
35   * maintains two states: the content state that tells whether there is content to consume and the EOF
36   * state that tells whether an EOF has arrived.
37   * Only once the content has been consumed the content state is moved to the EOF state.
38   */
39  public abstract class HttpInput<T> extends ServletInputStream implements Runnable
40  {
41      private final static Logger LOG = Log.getLogger(HttpInput.class);
42  
43      private final byte[] _oneByteBuffer = new byte[1];
44      private final Object _lock;
45      private HttpChannelState _channelState;
46      private ReadListener _listener;
47      private Throwable _onError;
48      private boolean _notReady;
49      private State _contentState = STREAM;
50      private State _eofState;
51      private long _contentRead;
52  
53      protected HttpInput()
54      {
55          this(null);
56      }
57  
58      protected HttpInput(Object lock)
59      {
60          _lock = lock == null ? this : lock;
61      }
62  
63      public void init(HttpChannelState state)
64      {
65          synchronized (lock())
66          {
67              _channelState = state;
68          }
69      }
70  
71      public final Object lock()
72      {
73          return _lock;
74      }
75  
76      public void recycle()
77      {
78          synchronized (lock())
79          {
80              _listener = null;
81              _onError = null;
82              _notReady = false;
83              _contentState = STREAM;
84              _eofState = null;
85              _contentRead = 0;
86          }
87      }
88  
89      @Override
90      public int available()
91      {
92          try
93          {
94              synchronized (lock())
95              {
96                  T item = getNextContent();
97                  return item == null ? 0 : remaining(item);
98              }
99          }
100         catch (IOException e)
101         {
102             throw new RuntimeIOException(e);
103         }
104     }
105 
106     @Override
107     public int read() throws IOException
108     {
109         int read = read(_oneByteBuffer, 0, 1);
110         return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
111     }
112 
113     @Override
114     public int read(byte[] b, int off, int len) throws IOException
115     {
116         synchronized (lock())
117         {
118             T item = getNextContent();
119             if (item == null)
120             {
121                 _contentState.waitForContent(this);
122                 item = getNextContent();
123                 if (item == null)
124                     return _contentState.noContent();
125             }
126             int l = get(item, b, off, len);
127             _contentRead += l;
128             return l;
129         }
130     }
131 
132     /**
133      * A convenience method to call nextContent and to check the return value, which if null then the
134      * a check is made for EOF and the state changed accordingly.
135      *
136      * @return Content or null if none available.
137      * @throws IOException
138      * @see #nextContent()
139      */
140     protected T getNextContent() throws IOException
141     {
142         T content = nextContent();
143         if (content == null)
144         {
145             synchronized (lock())
146             {
147                 if (_eofState != null)
148                 {
149                     LOG.debug("{} eof {}", this, _eofState);
150                     _contentState = _eofState;
151                 }
152             }
153         }
154         return content;
155     }
156 
157     /**
158      * Access the next content to be consumed from.   Returning the next item does not consume it
159      * and it may be returned multiple times until it is consumed.
160      * <p/>
161      * Calls to {@link #get(Object, byte[], int, int)}
162      * or {@link #consume(Object, int)} are required to consume data from the content.
163      *
164      * @return the content or null if none available.
165      * @throws IOException if retrieving the content fails
166      */
167     protected abstract T nextContent() throws IOException;
168 
169     /**
170      * @param item the content
171      * @return how many bytes remain in the given content
172      */
173     protected abstract int remaining(T item);
174 
175     /**
176      * Copies the given content into the given byte buffer.
177      *
178      * @param item   the content to copy from
179      * @param buffer the buffer to copy into
180      * @param offset the buffer offset to start copying from
181      * @param length the space available in the buffer
182      * @return the number of bytes actually copied
183      */
184     protected abstract int get(T item, byte[] buffer, int offset, int length);
185 
186     /**
187      * Consumes the given content.
188      *
189      * @param item   the content to consume
190      * @param length the number of bytes to consume
191      */
192     protected abstract void consume(T item, int length);
193 
194     /**
195      * Blocks until some content or some end-of-file event arrives.
196      *
197      * @throws IOException if the wait is interrupted
198      */
199     protected abstract void blockForContent() throws IOException;
200 
201     /**
202      * Adds some content to this input stream.
203      *
204      * @param item the content to add
205      */
206     public abstract void content(T item);
207 
208     protected boolean onAsyncRead()
209     {
210         synchronized (lock())
211         {
212             if (_listener == null)
213                 return false;
214         }
215         _channelState.onReadPossible();
216         return true;
217     }
218 
219     public long getContentRead()
220     {
221         synchronized (lock())
222         {
223             return _contentRead;
224         }
225     }
226 
227     /**
228      * This method should be called to signal that an EOF has been
229      * detected before all the expected content arrived.
230      * <p/>
231      * Typically this will result in an EOFException being thrown
232      * from a subsequent read rather than a -1 return.
233      */
234     public void earlyEOF()
235     {
236         synchronized (lock())
237         {
238             if (!isEOF())
239             {
240                 LOG.debug("{} early EOF", this);
241                 _eofState = EARLY_EOF;
242                 if (_listener == null)
243                     return;
244             }
245         }
246         _channelState.onReadPossible();
247     }
248 
249     /**
250      * This method should be called to signal that all the expected
251      * content arrived.
252      */
253     public void messageComplete()
254     {
255         synchronized (lock())
256         {
257             if (!isEOF())
258             {
259                 LOG.debug("{} EOF", this);
260                 _eofState = EOF;
261                 if (_listener == null)
262                     return;
263             }
264         }
265         _channelState.onReadPossible();
266     }
267 
268     public void consumeAll()
269     {
270         synchronized (lock())
271         {
272             try
273             {
274                 while (!isFinished())
275                 {
276                     T item = getNextContent();
277                     if (item == null)
278                         _contentState.waitForContent(this);
279                     else
280                         consume(item, remaining(item));
281                 }
282             }
283             catch (IOException e)
284             {
285                 LOG.debug(e);
286             }
287         }
288     }
289 
290     /**
291      * @return whether an EOF has been detected, even though there may be content to consume.
292      */
293     public boolean isEOF()
294     {
295         synchronized (lock())
296         {
297             return _eofState != null && _eofState.isEOF();
298         }
299     }
300 
301     @Override
302     public boolean isFinished()
303     {
304         synchronized (lock())
305         {
306             return _contentState.isEOF();
307         }
308     }
309 
310     @Override
311     public boolean isReady()
312     {
313         boolean finished;
314         synchronized (lock())
315         {
316             if (_contentState.isEOF())
317                 return true;
318             if (_listener == null )
319                 return true;
320             if (available() > 0)
321                 return true;
322             if (_notReady)
323                 return false;
324             _notReady = true;
325             finished = isFinished();
326         }
327         if (finished)
328             _channelState.onReadPossible();
329         else
330             unready();
331         return false;
332     }
333 
334     protected void unready()
335     {
336     }
337 
338     @Override
339     public void setReadListener(ReadListener readListener)
340     {
341         readListener = Objects.requireNonNull(readListener);
342         synchronized (lock())
343         {
344             if (_contentState != STREAM)
345                 throw new IllegalStateException("state=" + _contentState);
346             _contentState = ASYNC;
347             _listener = readListener;
348             _notReady = true;
349         }
350         _channelState.onReadPossible();
351     }
352 
353     public void failed(Throwable x)
354     {
355         synchronized (lock())
356         {
357             if (_onError != null)
358                 LOG.warn(x);
359             else
360                 _onError = x;
361         }
362     }
363 
364     @Override
365     public void run()
366     {
367         final Throwable error;
368         final ReadListener listener;
369         boolean available = false;
370         final boolean eof;
371 
372         synchronized (lock())
373         {
374             if (!_notReady || _listener == null)
375                 return;
376 
377             error = _onError;
378             listener = _listener;
379 
380             try
381             {
382                 T item = getNextContent();
383                 available = item != null && remaining(item) > 0;
384             }
385             catch (Exception e)
386             {
387                 failed(e);
388             }
389 
390             eof = !available && isFinished();
391             _notReady = !available && !eof;
392         }
393 
394         try
395         {
396             if (error != null)
397                 listener.onError(error);
398             else if (available)
399                 listener.onDataAvailable();
400             else if (eof)
401                 listener.onAllDataRead();
402             else
403                 unready();
404         }
405         catch (Throwable e)
406         {
407             LOG.warn(e.toString());
408             LOG.debug(e);
409             listener.onError(e);
410         }
411     }
412 
413     protected static abstract class State
414     {
415         public void waitForContent(HttpInput<?> in) throws IOException
416         {
417         }
418 
419         public int noContent() throws IOException
420         {
421             return -1;
422         }
423 
424         public boolean isEOF()
425         {
426             return false;
427         }
428     }
429 
430     protected static final State STREAM = new State()
431     {
432         @Override
433         public void waitForContent(HttpInput<?> input) throws IOException
434         {
435             input.blockForContent();
436         }
437 
438         public String toString()
439         {
440             return "STREAM";
441         }
442     };
443 
444     protected static final State ASYNC = new State()
445     {
446         @Override
447         public int noContent() throws IOException
448         {
449             return 0;
450         }
451 
452         @Override
453         public String toString()
454         {
455             return "ASYNC";
456         }
457     };
458 
459     protected static final State EARLY_EOF = new State()
460     {
461         @Override
462         public int noContent() throws IOException
463         {
464             throw new EofException();
465         }
466 
467         @Override
468         public boolean isEOF()
469         {
470             return true;
471         }
472 
473         public String toString()
474         {
475             return "EARLY_EOF";
476         }
477     };
478 
479     protected static final State EOF = new State()
480     {
481         @Override
482         public boolean isEOF()
483         {
484             return true;
485         }
486 
487         public String toString()
488         {
489             return "EOF";
490         }
491     };
492 }