View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayDeque;
25  import java.util.Objects;
26  import java.util.Queue;
27  import java.util.concurrent.TimeoutException;
28  
29  import javax.servlet.ReadListener;
30  import javax.servlet.ServletInputStream;
31  
32  import org.eclipse.jetty.io.EofException;
33  import org.eclipse.jetty.io.RuntimeIOException;
34  import org.eclipse.jetty.util.BufferUtil;
35  import org.eclipse.jetty.util.Callback;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  
39  /**
40   * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
41   * <p>
42   * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
43   * maintains two states: the content state that tells whether there is content to consume and the EOF
44   * state that tells whether an EOF has arrived.
45   * Only once the content has been consumed the content state is moved to the EOF state.
46   */
47  public class HttpInput extends ServletInputStream implements Runnable
48  {
49      private final static Logger LOG = Log.getLogger(HttpInput.class);
50      private final static Content EOF_CONTENT = new EofContent("EOF");
51      private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
52  
53      private final byte[] _oneByteBuffer = new byte[1];
54      private final Queue<Content> _inputQ = new ArrayDeque<>();
55      private final HttpChannelState _channelState;
56      private ReadListener _listener;
57      private State _state = STREAM;
58      private long _contentConsumed;
59      private long _blockingTimeoutAt = -1;
60  
61      public HttpInput(HttpChannelState state)
62      {
63          _channelState=state;
64          if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0)
65              _blockingTimeoutAt=0;
66      }
67  
68      protected HttpChannelState getHttpChannelState()
69      {
70          return _channelState;
71      }
72  
73      public void recycle()
74      {
75          synchronized (_inputQ)
76          {
77              Content item = _inputQ.poll();
78              while (item != null)
79              {
80                  item.failed(null);
81                  item = _inputQ.poll();
82              }
83              _listener = null;
84              _state = STREAM;
85              _contentConsumed = 0;
86          }
87      }
88  
89      @Override
90      public int available()
91      {
92          int available=0;
93          boolean woken=false;
94          synchronized (_inputQ)
95          {
96              Content content = _inputQ.peek();
97              if (content==null)
98              {
99                  try
100                 {
101                     produceContent();
102                 }
103                 catch(IOException e)
104                 {
105                     woken=failed(e);
106                 }
107                 content = _inputQ.peek();
108             }
109 
110             if (content!=null)
111                 available= remaining(content);
112         }
113 
114         if (woken)
115             wake();
116         return available;
117     }
118 
119     private void wake()
120     {
121         _channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel());
122     }
123 
124 
125     @Override
126     public int read() throws IOException
127     {
128         int read = read(_oneByteBuffer, 0, 1);
129         if (read==0)
130             throw new IllegalStateException("unready read=0");
131         return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
132     }
133 
134     @Override
135     public int read(byte[] b, int off, int len) throws IOException
136     {
137         synchronized (_inputQ)
138         {
139             if (_blockingTimeoutAt>=0 && !isAsync())
140                 _blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout();
141 
142             while(true)
143             {
144                 Content item = nextContent();
145                 if (item!=null)
146                 {
147                     if (LOG.isDebugEnabled())
148                         LOG.debug("{} read {} from {}",this,len,item);
149                     int l = get(item, b, off, len);
150 
151                     consumeNonContent();
152 
153                     return l;
154                 }
155 
156                 if (!_state.blockForContent(this))
157                     return _state.noContent();
158             }
159         }
160     }
161 
162     /**
163      * Called when derived implementations should attempt to
164      * produce more Content and add it via {@link #addContent(Content)}.
165      * For protocols that are constantly producing (eg HTTP2) this can
166      * be left as a noop;
167      * @throws IOException if unable to produce content
168      */
169     protected void produceContent() throws IOException
170     {
171     }
172 
173     /**
174      * Get the next content from the inputQ, calling {@link #produceContent()}
175      * if need be.  EOF is processed and state changed.
176      *
177      * @return the content or null if none available.
178      * @throws IOException if retrieving the content fails
179      */
180     protected Content nextContent() throws IOException
181     {
182         Content content = pollContent();
183         if (content==null && !isFinished())
184         {
185             produceContent();
186             content = pollContent();
187         }
188         return content;
189     }
190 
191     /** Poll the inputQ for Content.
192      * Consumed buffers and {@link PoisonPillContent}s are removed and
193      * EOF state updated if need be.
194      * @return Content or null
195      */
196     protected Content pollContent()
197     {
198         // Items are removed only when they are fully consumed.
199         Content content = _inputQ.peek();
200         // Skip consumed items at the head of the queue.
201         while (content != null && remaining(content) == 0)
202         {
203             _inputQ.poll();
204             content.succeeded();
205             if (LOG.isDebugEnabled())
206                 LOG.debug("{} consumed {}", this, content);
207 
208             if (content==EOF_CONTENT)
209             {
210                 if (_listener==null)
211                     _state=EOF;
212                 else
213                 {
214                     _state=AEOF;
215                     boolean woken = _channelState.onReadReady(); // force callback?
216                     if (woken)
217                         wake();
218                 }
219             }
220             else if (content==EARLY_EOF_CONTENT)
221                 _state=EARLY_EOF;
222 
223             content = _inputQ.peek();
224         }
225 
226         return content;
227     }
228 
229     /**
230      */
231     protected void consumeNonContent()
232     {
233         // Items are removed only when they are fully consumed.
234         Content content = _inputQ.peek();
235         // Skip consumed items at the head of the queue.
236         while (content != null && remaining(content) == 0)
237         {
238             // Defer EOF until read
239             if (content instanceof EofContent)
240                 break;
241 
242             // Consume all other empty content
243             _inputQ.poll();
244             content.succeeded();
245             if (LOG.isDebugEnabled())
246                 LOG.debug("{} consumed {}", this, content);
247             content = _inputQ.peek();
248         }
249     }
250 
251     /**
252      * Get the next readable from the inputQ, calling {@link #produceContent()}
253      * if need be. EOF is NOT processed and state is not changed.
254      *
255      * @return the content or EOF or null if none available.
256      * @throws IOException if retrieving the content fails
257      */
258     protected Content nextReadable() throws IOException
259     {
260         Content content = pollReadable();
261         if (content==null && !isFinished())
262         {
263             produceContent();
264             content = pollReadable();
265         }
266         return content;
267     }
268 
269     /** Poll the inputQ for Content or EOF.
270      * Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
271      * EOF state is not updated.
272      * @return Content, EOF or null
273      */
274     protected Content pollReadable()
275     {
276         // Items are removed only when they are fully consumed.
277         Content content = _inputQ.peek();
278 
279         // Skip consumed items at the head of the queue except EOF
280         while (content != null)
281         {
282             if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0)
283                 return content;
284 
285             _inputQ.poll();
286             content.succeeded();
287             if (LOG.isDebugEnabled())
288                 LOG.debug("{} consumed {}", this, content);
289             content = _inputQ.peek();
290         }
291 
292         return null;
293     }
294 
295     /**
296      * @param item the content
297      * @return how many bytes remain in the given content
298      */
299     protected int remaining(Content item)
300     {
301         return item.remaining();
302     }
303 
304     /**
305      * Copies the given content into the given byte buffer.
306      *
307      * @param content   the content to copy from
308      * @param buffer the buffer to copy into
309      * @param offset the buffer offset to start copying from
310      * @param length the space available in the buffer
311      * @return the number of bytes actually copied
312      */
313     protected int get(Content content, byte[] buffer, int offset, int length)
314     {
315         int l = Math.min(content.remaining(), length);
316         content.getContent().get(buffer, offset, l);
317         _contentConsumed+=l;
318         return l;
319     }
320 
321     /**
322      * Consumes the given content.
323      * Calls the content succeeded if all content consumed.
324      *
325      * @param content   the content to consume
326      * @param length the number of bytes to consume
327      */
328     protected void skip(Content content, int length)
329     {
330         int l = Math.min(content.remaining(), length);
331         ByteBuffer buffer = content.getContent();
332         buffer.position(buffer.position()+l);
333         _contentConsumed+=l;
334         if (l>0 && !content.hasContent())
335             pollContent(); // hungry succeed
336 
337     }
338 
339     /**
340      * Blocks until some content or some end-of-file event arrives.
341      *
342      * @throws IOException if the wait is interrupted
343      */
344     protected void blockForContent() throws IOException
345     {
346         try
347         {
348             long timeout=0;
349             if (_blockingTimeoutAt>=0)
350             {
351                 timeout=_blockingTimeoutAt-System.currentTimeMillis();
352                 if (timeout<=0)
353                     throw new TimeoutException();
354             }
355 
356             if (LOG.isDebugEnabled())
357                 LOG.debug("{} blocking for content timeout={} ...", this,timeout);
358             if (timeout>0)
359                 _inputQ.wait(timeout);
360             else
361                 _inputQ.wait();
362 
363             if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt)
364                 throw new TimeoutException();
365         }
366         catch (Throwable e)
367         {
368             throw (IOException)new InterruptedIOException().initCause(e);
369         }
370     }
371 
372     /**
373      * Adds some content to this input stream.
374      *
375      * @param item the content to add
376      * @return true if content channel woken for read
377      */
378     public boolean addContent(Content item)
379     {
380         boolean woken=false;
381         synchronized (_inputQ)
382         {
383             _inputQ.offer(item);
384             if (LOG.isDebugEnabled())
385                 LOG.debug("{} addContent {}", this, item);
386 
387             if (_listener==null)
388                 _inputQ.notify();
389             else
390                 woken=_channelState.onReadPossible();
391         }
392 
393         return woken;
394     }
395 
396     public boolean hasContent()
397     {
398         synchronized (_inputQ)
399         {
400             return _inputQ.size()>0;
401         }
402     }
403 
404     public void unblock()
405     {
406         synchronized (_inputQ)
407         {
408             _inputQ.notify();
409         }
410     }
411 
412     public long getContentConsumed()
413     {
414         synchronized (_inputQ)
415         {
416             return _contentConsumed;
417         }
418     }
419 
420     /**
421      * This method should be called to signal that an EOF has been
422      * detected before all the expected content arrived.
423      * <p>
424      * Typically this will result in an EOFException being thrown
425      * from a subsequent read rather than a -1 return.
426      * @return true if content channel woken for read
427      */
428     public boolean earlyEOF()
429     {
430         return addContent(EARLY_EOF_CONTENT);
431     }
432 
433     /**
434      * This method should be called to signal that all the expected
435      * content arrived.
436      * @return true if content channel woken for read
437      */
438     public boolean eof()
439     {
440        return addContent(EOF_CONTENT);
441     }
442 
443     public boolean consumeAll()
444     {
445         synchronized (_inputQ)
446         {
447             try
448             {
449                 while (!isFinished())
450                 {
451                     Content item = nextContent();
452                     if (item == null)
453                         break; // Let's not bother blocking
454 
455                     skip(item, remaining(item));
456                 }
457                 return isFinished() && !isError();
458             }
459             catch (IOException e)
460             {
461                 LOG.debug(e);
462                 return false;
463             }
464         }
465     }
466 
467     public boolean isError()
468     {
469         synchronized (_inputQ)
470         {
471             return _state instanceof ErrorState;
472         }
473     }
474 
475     public boolean isAsync()
476     {
477         synchronized (_inputQ)
478         {
479             return _state==ASYNC;
480         }
481     }
482 
483     @Override
484     public boolean isFinished()
485     {
486         synchronized (_inputQ)
487         {
488             return _state instanceof EOFState;
489         }
490     }
491 
492 
493     @Override
494     public boolean isReady()
495     {
496         try
497         {
498             synchronized (_inputQ)
499             {
500                 if (_listener == null )
501                     return true;
502                 if (_state instanceof EOFState)
503                     return true;
504                 if (nextReadable()!=null)
505                     return true;
506 
507                 _channelState.onReadUnready();
508             }
509             return false;
510         }
511         catch(IOException e)
512         {
513             LOG.ignore(e);
514             return true;
515         }
516     }
517 
518     @Override
519     public void setReadListener(ReadListener readListener)
520     {
521         readListener = Objects.requireNonNull(readListener);
522         boolean woken=false;
523         try
524         {
525             synchronized (_inputQ)
526             {
527                 if (_listener != null)
528                     throw new IllegalStateException("ReadListener already set");
529                 if (_state != STREAM)
530                     throw new IllegalStateException("State "+STREAM+" != " + _state);
531 
532                 _state = ASYNC;
533                 _listener = readListener;
534                 boolean content=nextContent()!=null;
535 
536                 if (content)
537                     woken = _channelState.onReadReady();
538                 else
539                     _channelState.onReadUnready();
540             }
541         }
542         catch(IOException e)
543         {
544             throw new RuntimeIOException(e);
545         }
546 
547         if (woken)
548             wake();
549     }
550 
551     public boolean failed(Throwable x)
552     {
553         boolean woken=false;
554         synchronized (_inputQ)
555         {
556             if (_state instanceof ErrorState)
557                 LOG.warn(x);
558             else
559                 _state = new ErrorState(x);
560 
561             if (_listener==null)
562                 _inputQ.notify();
563             else
564                 woken=_channelState.onReadPossible();
565         }
566 
567         return woken;
568     }
569 
570     /* ------------------------------------------------------------ */
571     /*
572      * <p>
573      * While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
574      * runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)}
575      * to setup classloaders etc.
576      * </p>
577      */
578     @Override
579     public void run()
580     {
581         final Throwable error;
582         final ReadListener listener;
583         boolean aeof=false;
584 
585         synchronized (_inputQ)
586         {
587             if (_state==EOF)
588                 return;
589 
590             if (_state==AEOF)
591             {
592                 _state=EOF;
593                 aeof=true;
594             }
595 
596             listener = _listener;
597             error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
598         }
599 
600         try
601         {
602             if (error!=null)
603             {
604                 _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
605                 listener.onError(error);
606             }
607             else if (aeof)
608             {
609                 listener.onAllDataRead();
610             }
611             else
612             {
613                 listener.onDataAvailable();
614             }
615         }
616         catch (Throwable e)
617         {
618             LOG.warn(e.toString());
619             LOG.debug(e);
620             try
621             {
622                 if (aeof || error==null)
623                 {
624                     _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
625                     listener.onError(e);
626                 }
627             }
628             catch (Throwable e2)
629             {
630                 LOG.warn(e2.toString());
631                 LOG.debug(e2);
632                 throw new RuntimeIOException(e2);
633             }
634         }
635     }
636 
637     @Override
638     public String toString()
639     {
640         return String.format("%s@%x[c=%d,s=%s]",
641                 getClass().getSimpleName(),
642                 hashCode(),
643                 _contentConsumed,
644                 _state);
645     }
646 
647     public static class PoisonPillContent extends Content
648     {
649         private final String _name;
650         public PoisonPillContent(String name)
651         {
652             super(BufferUtil.EMPTY_BUFFER);
653             _name=name;
654         }
655 
656         @Override
657         public String toString()
658         {
659             return _name;
660         }
661     }
662 
663     public static class EofContent extends PoisonPillContent
664     {
665         EofContent(String name)
666         {
667             super(name);
668         }
669     }
670 
671     public static class Content implements Callback
672     {
673         private final ByteBuffer _content;
674 
675         public Content(ByteBuffer content)
676         {
677             _content=content;
678         }
679 
680         @Override
681         public boolean isNonBlocking()
682         {
683             return true;
684         }
685 
686 
687         public ByteBuffer getContent()
688         {
689             return _content;
690         }
691 
692         public boolean hasContent()
693         {
694             return _content.hasRemaining();
695         }
696 
697         public int remaining()
698         {
699             return _content.remaining();
700         }
701 
702         @Override
703         public String toString()
704         {
705             return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
706         }
707     }
708 
709 
710     protected static abstract class State
711     {
712         public boolean blockForContent(HttpInput in) throws IOException
713         {
714             return false;
715         }
716 
717         public int noContent() throws IOException
718         {
719             return -1;
720         }
721     }
722 
723     protected static class EOFState extends State
724     {
725     }
726 
727     protected class ErrorState extends EOFState
728     {
729         final Throwable _error;
730         ErrorState(Throwable error)
731         {
732             _error=error;
733         }
734 
735         public Throwable getError()
736         {
737             return _error;
738         }
739 
740         @Override
741         public int noContent() throws IOException
742         {
743             if (_error instanceof IOException)
744                 throw (IOException)_error;
745             throw new IOException(_error);
746         }
747 
748         @Override
749         public String toString()
750         {
751             return "ERROR:"+_error;
752         }
753     }
754 
755     protected static final State STREAM = new State()
756     {
757         @Override
758         public boolean blockForContent(HttpInput input) throws IOException
759         {
760             input.blockForContent();
761             return true;
762         }
763 
764         @Override
765         public String toString()
766         {
767             return "STREAM";
768         }
769     };
770 
771     protected static final State ASYNC = new State()
772     {
773         @Override
774         public int noContent() throws IOException
775         {
776             return 0;
777         }
778 
779         @Override
780         public String toString()
781         {
782             return "ASYNC";
783         }
784     };
785 
786     protected static final State EARLY_EOF = new EOFState()
787     {
788         @Override
789         public int noContent() throws IOException
790         {
791             throw new EofException("Early EOF");
792         }
793 
794         @Override
795         public String toString()
796         {
797             return "EARLY_EOF";
798         }
799     };
800 
801     protected static final State EOF = new EOFState()
802     {
803         @Override
804         public String toString()
805         {
806             return "EOF";
807         }
808     };
809 
810     protected static final State AEOF = new EOFState()
811     {
812         @Override
813         public String toString()
814         {
815             return "AEOF";
816         }
817     };
818 
819 }