View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.util.Objects;
23  import javax.servlet.ReadListener;
24  import javax.servlet.ServletInputStream;
25  
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.RuntimeIOException;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /**
32   * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
33   * <p/>
34   * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
35   * maintains two states: the content state that tells whether there is content to consume and the EOF
36   * state that tells whether an EOF has arrived.
37   * Only once the content has been consumed the content state is moved to the EOF state.
38   */
39  public abstract class HttpInput<T> extends ServletInputStream implements Runnable
40  {
41      private final static Logger LOG = Log.getLogger(HttpInput.class);
42  
43      private final byte[] _oneByteBuffer = new byte[1];
44      private final Object _lock;
45      private HttpChannelState _channelState;
46      private ReadListener _listener;
47      private Throwable _onError;
48      private boolean _notReady;
49      private State _contentState = STREAM;
50      private State _eofState;
51      private long _contentRead;
52  
53      protected HttpInput()
54      {
55          this(null);
56      }
57  
58      protected HttpInput(Object lock)
59      {
60          _lock = lock == null ? this : lock;
61      }
62  
63      public void init(HttpChannelState state)
64      {
65          synchronized (lock())
66          {
67              _channelState = state;
68          }
69      }
70  
71      public final Object lock()
72      {
73          return _lock;
74      }
75  
76      public void recycle()
77      {
78          synchronized (lock())
79          {
80              _listener = null;
81              _onError = null;
82              _notReady = false;
83              _contentState = STREAM;
84              _eofState = null;
85              _contentRead = 0;
86          }
87      }
88  
89      @Override
90      public int available()
91      {
92          try
93          {
94              synchronized (lock())
95              {
96                  T item = getNextContent();
97                  return item == null ? 0 : remaining(item);
98              }
99          }
100         catch (IOException e)
101         {
102             throw new RuntimeIOException(e);
103         }
104     }
105 
106     @Override
107     public int read() throws IOException
108     {
109         int read = read(_oneByteBuffer, 0, 1);
110         return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
111     }
112 
113     @Override
114     public int read(byte[] b, int off, int len) throws IOException
115     {
116         synchronized (lock())
117         {
118             T item = getNextContent();
119             if (item == null)
120             {
121                 _contentState.waitForContent(this);
122                 item = getNextContent();
123                 if (item == null)
124                     return _contentState.noContent();
125             }
126             int l = get(item, b, off, len);
127             _contentRead += l;
128             return l;
129         }
130     }
131 
132     /**
133      * A convenience method to call nextContent and to check the return value, which if null then the
134      * a check is made for EOF and the state changed accordingly.
135      *
136      * @return Content or null if none available.
137      * @throws IOException
138      * @see #nextContent()
139      */
140     protected T getNextContent() throws IOException
141     {
142         T content = nextContent();
143         if (content == null)
144         {
145             synchronized (lock())
146             {
147                 if (_eofState != null)
148                 {
149                     if (LOG.isDebugEnabled())
150                         LOG.debug("{} eof {}", this, _eofState);
151                     _contentState = _eofState;
152                 }
153             }
154         }
155         return content;
156     }
157 
158     /**
159      * Access the next content to be consumed from.   Returning the next item does not consume it
160      * and it may be returned multiple times until it is consumed.
161      * <p/>
162      * Calls to {@link #get(Object, byte[], int, int)}
163      * or {@link #consume(Object, int)} are required to consume data from the content.
164      *
165      * @return the content or null if none available.
166      * @throws IOException if retrieving the content fails
167      */
168     protected abstract T nextContent() throws IOException;
169 
170     /**
171      * @param item the content
172      * @return how many bytes remain in the given content
173      */
174     protected abstract int remaining(T item);
175 
176     /**
177      * Copies the given content into the given byte buffer.
178      *
179      * @param item   the content to copy from
180      * @param buffer the buffer to copy into
181      * @param offset the buffer offset to start copying from
182      * @param length the space available in the buffer
183      * @return the number of bytes actually copied
184      */
185     protected abstract int get(T item, byte[] buffer, int offset, int length);
186 
187     /**
188      * Consumes the given content.
189      *
190      * @param item   the content to consume
191      * @param length the number of bytes to consume
192      */
193     protected abstract void consume(T item, int length);
194 
195     /**
196      * Blocks until some content or some end-of-file event arrives.
197      *
198      * @throws IOException if the wait is interrupted
199      */
200     protected abstract void blockForContent() throws IOException;
201 
202     /**
203      * Adds some content to this input stream.
204      *
205      * @param item the content to add
206      */
207     public abstract void content(T item);
208 
209     protected boolean onAsyncRead()
210     {
211         synchronized (lock())
212         {
213             if (_listener == null)
214                 return false;
215         }
216         _channelState.onReadPossible();
217         return true;
218     }
219 
220     public long getContentRead()
221     {
222         synchronized (lock())
223         {
224             return _contentRead;
225         }
226     }
227 
228     /**
229      * This method should be called to signal that an EOF has been
230      * detected before all the expected content arrived.
231      * <p/>
232      * Typically this will result in an EOFException being thrown
233      * from a subsequent read rather than a -1 return.
234      */
235     public void earlyEOF()
236     {
237         synchronized (lock())
238         {
239             if (!isEOF())
240             {
241                 if (LOG.isDebugEnabled())
242                     LOG.debug("{} early EOF", this);
243                 _eofState = EARLY_EOF;
244                 if (_listener == null)
245                     return;
246             }
247         }
248         _channelState.onReadPossible();
249     }
250 
251 
252     public boolean isEarlyEOF()
253     {
254         synchronized (lock())
255         {
256             return _contentState==EARLY_EOF;
257         }
258     }
259     
260     /**
261      * This method should be called to signal that all the expected
262      * content arrived.
263      */
264     public void messageComplete()
265     {
266         synchronized (lock())
267         {
268             if (!isEOF())
269             {
270                 if (LOG.isDebugEnabled())
271                     LOG.debug("{} EOF", this);
272                 _eofState = EOF;
273                 if (_listener == null)
274                     return;
275             }
276         }
277         _channelState.onReadPossible();
278     }
279 
280     public void consumeAll()
281     {
282         synchronized (lock())
283         {
284             try
285             {
286                 while (!isFinished())
287                 {
288                     T item = getNextContent();
289                     if (item == null)
290                         _contentState.waitForContent(this);
291                     else
292                         consume(item, remaining(item));
293                 }
294             }
295             catch (IOException e)
296             {
297                 LOG.debug(e);
298             }
299         }
300     }
301 
302     public boolean isAsync()
303     {
304         synchronized (lock())
305         {
306             return _contentState==ASYNC;
307         }
308     }
309     
310     /**
311      * @return whether an EOF has been detected, even though there may be content to consume.
312      */
313     public boolean isEOF()
314     {
315         synchronized (lock())
316         {
317             return _eofState != null && _eofState.isEOF();
318         }
319     }
320 
321     @Override
322     public boolean isFinished()
323     {
324         synchronized (lock())
325         {
326             return _contentState.isEOF();
327         }
328     }
329     
330 
331     @Override
332     public boolean isReady()
333     {
334         boolean finished;
335         synchronized (lock())
336         {
337             if (_contentState.isEOF())
338                 return true;
339             if (_listener == null )
340                 return true;
341             if (available() > 0)
342                 return true;
343             if (_notReady)
344                 return false;
345             _notReady = true;
346             finished = isFinished();
347         }
348         if (finished)
349             _channelState.onReadPossible();
350         else
351             unready();
352         return false;
353     }
354 
355     protected void unready()
356     {
357     }
358 
359     @Override
360     public void setReadListener(ReadListener readListener)
361     {
362         readListener = Objects.requireNonNull(readListener);
363         synchronized (lock())
364         {
365             if (_contentState != STREAM)
366                 throw new IllegalStateException("state=" + _contentState);
367             _contentState = ASYNC;
368             _listener = readListener;
369             _notReady = true;
370         }
371         _channelState.onReadPossible();
372     }
373 
374     public void failed(Throwable x)
375     {
376         synchronized (lock())
377         {
378             if (_onError != null)
379                 LOG.warn(x);
380             else
381                 _onError = x;
382         }
383     }
384 
385     @Override
386     public void run()
387     {
388         final Throwable error;
389         final ReadListener listener;
390         boolean available = false;
391         final boolean eof;
392 
393         synchronized (lock())
394         {
395             if (!_notReady || _listener == null)
396                 return;
397 
398             error = _onError;
399             listener = _listener;
400 
401             try
402             {
403                 T item = getNextContent();
404                 available = item != null && remaining(item) > 0;
405             }
406             catch (Exception e)
407             {
408                 failed(e);
409             }
410 
411             eof = !available && isFinished();
412             _notReady = !available && !eof;
413         }
414 
415         try
416         {
417             if (error != null)
418                 listener.onError(error);
419             else if (available)
420                 listener.onDataAvailable();
421             else if (eof)
422                 listener.onAllDataRead();
423             else
424                 unready();
425         }
426         catch (Throwable e)
427         {
428             LOG.warn(e.toString());
429             LOG.debug(e);
430             listener.onError(e);
431         }
432     }
433 
434     protected static abstract class State
435     {
436         public void waitForContent(HttpInput<?> in) throws IOException
437         {
438         }
439 
440         public int noContent() throws IOException
441         {
442             return -1;
443         }
444 
445         public boolean isEOF()
446         {
447             return false;
448         }
449     }
450 
451     protected static final State STREAM = new State()
452     {
453         @Override
454         public void waitForContent(HttpInput<?> input) throws IOException
455         {
456             input.blockForContent();
457         }
458 
459         @Override
460         public String toString()
461         {
462             return "STREAM";
463         }
464     };
465 
466     protected static final State ASYNC = new State()
467     {
468         @Override
469         public int noContent() throws IOException
470         {
471             return 0;
472         }
473 
474         @Override
475         public String toString()
476         {
477             return "ASYNC";
478         }
479     };
480 
481     protected static final State EARLY_EOF = new State()
482     {
483         @Override
484         public int noContent() throws IOException
485         {
486             throw new EofException("Early EOF");
487         }
488 
489         @Override
490         public boolean isEOF()
491         {
492             return true;
493         }
494 
495         @Override
496         public String toString()
497         {
498             return "EARLY_EOF";
499         }
500     };
501 
502     protected static final State EOF = new State()
503     {
504         @Override
505         public boolean isEOF()
506         {
507             return true;
508         }
509 
510         @Override
511         public String toString()
512         {
513             return "EOF";
514         }
515     };
516 }