View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.util.Objects;
23  import javax.servlet.ReadListener;
24  import javax.servlet.ServletInputStream;
25  
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.RuntimeIOException;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /**
32   * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
33   * <p/>
34   * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
35   * maintains two states: the content state that tells whether there is content to consume and the EOF
36   * state that tells whether an EOF has arrived.
37   * Only once the content has been consumed the content state is moved to the EOF state.
38   */
39  public abstract class HttpInput<T> extends ServletInputStream implements Runnable
40  {
41      private final static Logger LOG = Log.getLogger(HttpInput.class);
42  
43      private final byte[] _oneByteBuffer = new byte[1];
44      private final Object _lock;
45      private HttpChannelState _channelState;
46      private ReadListener _listener;
47      private Throwable _onError;
48      private boolean _notReady;
49      private State _contentState = STREAM;
50      private State _eofState;
51      private long _contentRead;
52  
53      protected HttpInput()
54      {
55          this(null);
56      }
57  
58      protected HttpInput(Object lock)
59      {
60          _lock = lock == null ? this : lock;
61      }
62  
63      public void init(HttpChannelState state)
64      {
65          synchronized (lock())
66          {
67              _channelState = state;
68          }
69      }
70  
71      public final Object lock()
72      {
73          return _lock;
74      }
75  
76      public void recycle()
77      {
78          synchronized (lock())
79          {
80              _listener = null;
81              _onError = null;
82              _notReady = false;
83              _contentState = STREAM;
84              _eofState = null;
85              _contentRead = 0;
86          }
87      }
88  
89      @Override
90      public int available()
91      {
92          try
93          {
94              synchronized (lock())
95              {
96                  T item = getNextContent();
97                  return item == null ? 0 : remaining(item);
98              }
99          }
100         catch (IOException e)
101         {
102             throw new RuntimeIOException(e);
103         }
104     }
105 
106     @Override
107     public int read() throws IOException
108     {
109         int read = read(_oneByteBuffer, 0, 1);
110         return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
111     }
112 
113     @Override
114     public int read(byte[] b, int off, int len) throws IOException
115     {
116         synchronized (lock())
117         {
118             T item = getNextContent();
119             if (item == null)
120             {
121                 _contentState.waitForContent(this);
122                 item = getNextContent();
123                 if (item == null)
124                     return _contentState.noContent();
125             }
126             int l = get(item, b, off, len);
127             _contentRead += l;
128             return l;
129         }
130     }
131 
132     /**
133      * A convenience method to call nextContent and to check the return value, which if null then the
134      * a check is made for EOF and the state changed accordingly.
135      *
136      * @return Content or null if none available.
137      * @throws IOException
138      * @see #nextContent()
139      */
140     protected T getNextContent() throws IOException
141     {
142         T content = nextContent();
143         if (content == null)
144         {
145             synchronized (lock())
146             {
147                 if (_eofState != null)
148                 {
149                     if (LOG.isDebugEnabled())
150                         LOG.debug("{} eof {}", this, _eofState);
151                     _contentState = _eofState;
152                 }
153             }
154         }
155         return content;
156     }
157 
158     /**
159      * Access the next content to be consumed from.   Returning the next item does not consume it
160      * and it may be returned multiple times until it is consumed.
161      * <p/>
162      * Calls to {@link #get(Object, byte[], int, int)}
163      * or {@link #consume(Object, int)} are required to consume data from the content.
164      *
165      * @return the content or null if none available.
166      * @throws IOException if retrieving the content fails
167      */
168     protected abstract T nextContent() throws IOException;
169 
170     /**
171      * @param item the content
172      * @return how many bytes remain in the given content
173      */
174     protected abstract int remaining(T item);
175 
176     /**
177      * Copies the given content into the given byte buffer.
178      *
179      * @param item   the content to copy from
180      * @param buffer the buffer to copy into
181      * @param offset the buffer offset to start copying from
182      * @param length the space available in the buffer
183      * @return the number of bytes actually copied
184      */
185     protected abstract int get(T item, byte[] buffer, int offset, int length);
186 
187     /**
188      * Consumes the given content.
189      *
190      * @param item   the content to consume
191      * @param length the number of bytes to consume
192      */
193     protected abstract void consume(T item, int length);
194 
195     /**
196      * Blocks until some content or some end-of-file event arrives.
197      *
198      * @throws IOException if the wait is interrupted
199      */
200     protected abstract void blockForContent() throws IOException;
201 
202     /**
203      * Adds some content to this input stream.
204      *
205      * @param item the content to add
206      */
207     public abstract void content(T item);
208 
209     protected boolean onAsyncRead()
210     {
211         synchronized (lock())
212         {
213             if (_listener == null)
214                 return false;
215         }
216         _channelState.onReadPossible();
217         return true;
218     }
219 
220     public long getContentRead()
221     {
222         synchronized (lock())
223         {
224             return _contentRead;
225         }
226     }
227 
228     /**
229      * This method should be called to signal that an EOF has been
230      * detected before all the expected content arrived.
231      * <p/>
232      * Typically this will result in an EOFException being thrown
233      * from a subsequent read rather than a -1 return.
234      */
235     public void earlyEOF()
236     {
237         synchronized (lock())
238         {
239             if (!isEOF())
240             {
241                 if (LOG.isDebugEnabled())
242                     LOG.debug("{} early EOF", this);
243                 _eofState = EARLY_EOF;
244                 if (_listener == null)
245                     return;
246             }
247         }
248         _channelState.onReadPossible();
249     }
250 
251 
252     public boolean isEarlyEOF()
253     {
254         synchronized (lock())
255         {
256             return _contentState==EARLY_EOF;
257         }
258     }
259     
260     /**
261      * This method should be called to signal that all the expected
262      * content arrived.
263      */
264     public void messageComplete()
265     {
266         synchronized (lock())
267         {
268             if (!isEOF())
269             {
270                 if (LOG.isDebugEnabled())
271                     LOG.debug("{} EOF", this);
272                 _eofState = EOF;
273                 if (_listener == null)
274                     return;
275             }
276         }
277         _channelState.onReadPossible();
278     }
279 
280     public boolean consumeAll()
281     {
282         synchronized (lock())
283         {
284             // Don't bother reading if we already know there was an error.
285             if (_onError != null)
286                 return false;
287 
288             try
289             {
290                 while (!isFinished())
291                 {
292                     T item = getNextContent();
293                     if (item == null)
294                         _contentState.waitForContent(this);
295                     else
296                         consume(item, remaining(item));
297                 }
298                 return true;
299             }
300             catch (IOException e)
301             {
302                 LOG.debug(e);
303                 return false;
304             }
305         }
306     }
307 
308     public boolean isAsync()
309     {
310         synchronized (lock())
311         {
312             return _contentState==ASYNC;
313         }
314     }
315     
316     /**
317      * @return whether an EOF has been detected, even though there may be content to consume.
318      */
319     public boolean isEOF()
320     {
321         synchronized (lock())
322         {
323             return _eofState != null && _eofState.isEOF();
324         }
325     }
326 
327     @Override
328     public boolean isFinished()
329     {
330         synchronized (lock())
331         {
332             return _contentState.isEOF();
333         }
334     }
335     
336 
337     @Override
338     public boolean isReady()
339     {
340         boolean finished;
341         synchronized (lock())
342         {
343             if (_contentState.isEOF())
344                 return true;
345             if (_listener == null )
346                 return true;
347             if (available() > 0)
348                 return true;
349             if (_notReady)
350                 return false;
351             _notReady = true;
352             finished = isFinished();
353         }
354         if (finished)
355             _channelState.onReadPossible();
356         else
357             unready();
358         return false;
359     }
360 
361     protected void unready()
362     {
363     }
364 
365     @Override
366     public void setReadListener(ReadListener readListener)
367     {
368         readListener = Objects.requireNonNull(readListener);
369         synchronized (lock())
370         {
371             if (_contentState != STREAM)
372                 throw new IllegalStateException("state=" + _contentState);
373             _contentState = ASYNC;
374             _listener = readListener;
375             _notReady = true;
376         }
377         _channelState.onReadPossible();
378     }
379 
380     public void failed(Throwable x)
381     {
382         synchronized (lock())
383         {
384             if (_onError != null)
385                 LOG.warn(x);
386             else
387                 _onError = x;
388         }
389     }
390 
391     @Override
392     public void run()
393     {
394         final Throwable error;
395         final ReadListener listener;
396         boolean available = false;
397         final boolean eof;
398 
399         synchronized (lock())
400         {
401             if (!_notReady || _listener == null)
402                 return;
403 
404             error = _onError;
405             listener = _listener;
406 
407             try
408             {
409                 T item = getNextContent();
410                 available = item != null && remaining(item) > 0;
411             }
412             catch (Exception e)
413             {
414                 failed(e);
415             }
416 
417             eof = !available && isFinished();
418             _notReady = !available && !eof;
419         }
420 
421         try
422         {
423             if (error != null)
424                 listener.onError(error);
425             else if (available)
426                 listener.onDataAvailable();
427             else if (eof)
428                 listener.onAllDataRead();
429             else
430                 unready();
431         }
432         catch (Throwable e)
433         {
434             LOG.warn(e.toString());
435             LOG.debug(e);
436             listener.onError(e);
437         }
438     }
439 
440     protected static abstract class State
441     {
442         public void waitForContent(HttpInput<?> in) throws IOException
443         {
444         }
445 
446         public int noContent() throws IOException
447         {
448             return -1;
449         }
450 
451         public boolean isEOF()
452         {
453             return false;
454         }
455     }
456 
457     protected static final State STREAM = new State()
458     {
459         @Override
460         public void waitForContent(HttpInput<?> input) throws IOException
461         {
462             input.blockForContent();
463         }
464 
465         @Override
466         public String toString()
467         {
468             return "STREAM";
469         }
470     };
471 
472     protected static final State ASYNC = new State()
473     {
474         @Override
475         public int noContent() throws IOException
476         {
477             return 0;
478         }
479 
480         @Override
481         public String toString()
482         {
483             return "ASYNC";
484         }
485     };
486 
487     protected static final State EARLY_EOF = new State()
488     {
489         @Override
490         public int noContent() throws IOException
491         {
492             throw new EofException("Early EOF");
493         }
494 
495         @Override
496         public boolean isEOF()
497         {
498             return true;
499         }
500 
501         @Override
502         public String toString()
503         {
504             return "EARLY_EOF";
505         }
506     };
507 
508     protected static final State EOF = new State()
509     {
510         @Override
511         public boolean isEOF()
512         {
513             return true;
514         }
515 
516         @Override
517         public String toString()
518         {
519             return "EOF";
520         }
521     };
522 }