View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayDeque;
25  import java.util.Deque;
26  import java.util.Objects;
27  import java.util.Queue;
28  import java.util.concurrent.TimeoutException;
29  
30  import javax.servlet.ReadListener;
31  import javax.servlet.ServletInputStream;
32  
33  import org.eclipse.jetty.io.EofException;
34  import org.eclipse.jetty.io.RuntimeIOException;
35  import org.eclipse.jetty.util.BufferUtil;
36  import org.eclipse.jetty.util.Callback;
37  import org.eclipse.jetty.util.log.Log;
38  import org.eclipse.jetty.util.log.Logger;
39  
40  /**
41   * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
42   * <p>
43   * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
44   * maintains two states: the content state that tells whether there is content to consume and the EOF
45   * state that tells whether an EOF has arrived.
46   * Only once the content has been consumed the content state is moved to the EOF state.
47   */
48  public class HttpInput extends ServletInputStream implements Runnable
49  {
50      private final static Logger LOG = Log.getLogger(HttpInput.class);
51      private final static Content EOF_CONTENT = new EofContent("EOF");
52      private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
53  
54      private final byte[] _oneByteBuffer = new byte[1];
55      private final Deque<Content> _inputQ = new ArrayDeque<>();
56      private final HttpChannelState _channelState;
57      private ReadListener _listener;
58      private State _state = STREAM;
59      private long _contentConsumed;
60      private long _blockingTimeoutAt = -1;
61  
62      public HttpInput(HttpChannelState state)
63      {
64          _channelState=state;
65          if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0)
66              _blockingTimeoutAt=0;
67      }
68  
69      protected HttpChannelState getHttpChannelState()
70      {
71          return _channelState;
72      }
73  
74      public void recycle()
75      {
76          synchronized (_inputQ)
77          {
78              Content item = _inputQ.poll();
79              while (item != null)
80              {
81                  item.failed(null);
82                  item = _inputQ.poll();
83              }
84              _listener = null;
85              _state = STREAM;
86              _contentConsumed = 0;
87          }
88      }
89  
90      @Override
91      public int available()
92      {
93          int available=0;
94          boolean woken=false;
95          synchronized (_inputQ)
96          {
97              Content content = _inputQ.peek();
98              if (content==null)
99              {
100                 try
101                 {
102                     produceContent();
103                 }
104                 catch(IOException e)
105                 {
106                     woken=failed(e);
107                 }
108                 content = _inputQ.peek();
109             }
110 
111             if (content!=null)
112                 available= remaining(content);
113         }
114 
115         if (woken)
116             wake();
117         return available;
118     }
119 
120     private void wake()
121     {
122         _channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel());
123     }
124 
125 
126     @Override
127     public int read() throws IOException
128     {
129         int read = read(_oneByteBuffer, 0, 1);
130         if (read==0)
131             throw new IllegalStateException("unready read=0");
132         return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
133     }
134 
135     @Override
136     public int read(byte[] b, int off, int len) throws IOException
137     {
138         synchronized (_inputQ)
139         {
140             if (_blockingTimeoutAt>=0 && !isAsync())
141                 _blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout();
142 
143             while(true)
144             {
145                 Content item = nextContent();
146                 if (item!=null)
147                 {
148                     if (LOG.isDebugEnabled())
149                         LOG.debug("{} read {} from {}",this,len,item);
150                     int l = get(item, b, off, len);
151 
152                     consumeNonContent();
153 
154                     return l;
155                 }
156 
157                 if (!_state.blockForContent(this))
158                     return _state.noContent();
159             }
160         }
161     }
162 
163     /**
164      * Called when derived implementations should attempt to
165      * produce more Content and add it via {@link #addContent(Content)}.
166      * For protocols that are constantly producing (eg HTTP2) this can
167      * be left as a noop;
168      * @throws IOException if unable to produce content
169      */
170     protected void produceContent() throws IOException
171     {
172     }
173 
174     /**
175      * Get the next content from the inputQ, calling {@link #produceContent()}
176      * if need be.  EOF is processed and state changed.
177      *
178      * @return the content or null if none available.
179      * @throws IOException if retrieving the content fails
180      */
181     protected Content nextContent() throws IOException
182     {
183         Content content = pollContent();
184         if (content==null && !isFinished())
185         {
186             produceContent();
187             content = pollContent();
188         }
189         return content;
190     }
191 
192     /** Poll the inputQ for Content.
193      * Consumed buffers and {@link PoisonPillContent}s are removed and
194      * EOF state updated if need be.
195      * @return Content or null
196      */
197     protected Content pollContent()
198     {
199         // Items are removed only when they are fully consumed.
200         Content content = _inputQ.peek();
201         // Skip consumed items at the head of the queue.
202         while (content != null && remaining(content) == 0)
203         {
204             _inputQ.poll();
205             content.succeeded();
206             if (LOG.isDebugEnabled())
207                 LOG.debug("{} consumed {}", this, content);
208 
209             if (content==EOF_CONTENT)
210             {
211                 if (_listener==null)
212                     _state=EOF;
213                 else
214                 {
215                     _state=AEOF;
216                     boolean woken = _channelState.onReadReady(); // force callback?
217                     if (woken)
218                         wake();
219                 }
220             }
221             else if (content==EARLY_EOF_CONTENT)
222                 _state=EARLY_EOF;
223 
224             content = _inputQ.peek();
225         }
226 
227         return content;
228     }
229 
230     /**
231      */
232     protected void consumeNonContent()
233     {
234         // Items are removed only when they are fully consumed.
235         Content content = _inputQ.peek();
236         // Skip consumed items at the head of the queue.
237         while (content != null && remaining(content) == 0)
238         {
239             // Defer EOF until read
240             if (content instanceof EofContent)
241                 break;
242 
243             // Consume all other empty content
244             _inputQ.poll();
245             content.succeeded();
246             if (LOG.isDebugEnabled())
247                 LOG.debug("{} consumed {}", this, content);
248             content = _inputQ.peek();
249         }
250     }
251 
252     /**
253      * Get the next readable from the inputQ, calling {@link #produceContent()}
254      * if need be. EOF is NOT processed and state is not changed.
255      *
256      * @return the content or EOF or null if none available.
257      * @throws IOException if retrieving the content fails
258      */
259     protected Content nextReadable() throws IOException
260     {
261         Content content = pollReadable();
262         if (content==null && !isFinished())
263         {
264             produceContent();
265             content = pollReadable();
266         }
267         return content;
268     }
269 
270     /** Poll the inputQ for Content or EOF.
271      * Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
272      * EOF state is not updated.
273      * @return Content, EOF or null
274      */
275     protected Content pollReadable()
276     {
277         // Items are removed only when they are fully consumed.
278         Content content = _inputQ.peek();
279 
280         // Skip consumed items at the head of the queue except EOF
281         while (content != null)
282         {
283             if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0)
284                 return content;
285 
286             _inputQ.poll();
287             content.succeeded();
288             if (LOG.isDebugEnabled())
289                 LOG.debug("{} consumed {}", this, content);
290             content = _inputQ.peek();
291         }
292 
293         return null;
294     }
295 
296     /**
297      * @param item the content
298      * @return how many bytes remain in the given content
299      */
300     protected int remaining(Content item)
301     {
302         return item.remaining();
303     }
304 
305     /**
306      * Copies the given content into the given byte buffer.
307      *
308      * @param content   the content to copy from
309      * @param buffer the buffer to copy into
310      * @param offset the buffer offset to start copying from
311      * @param length the space available in the buffer
312      * @return the number of bytes actually copied
313      */
314     protected int get(Content content, byte[] buffer, int offset, int length)
315     {
316         int l = Math.min(content.remaining(), length);
317         content.getContent().get(buffer, offset, l);
318         _contentConsumed+=l;
319         return l;
320     }
321 
322     /**
323      * Consumes the given content.
324      * Calls the content succeeded if all content consumed.
325      *
326      * @param content   the content to consume
327      * @param length the number of bytes to consume
328      */
329     protected void skip(Content content, int length)
330     {
331         int l = Math.min(content.remaining(), length);
332         ByteBuffer buffer = content.getContent();
333         buffer.position(buffer.position()+l);
334         _contentConsumed+=l;
335         if (l>0 && !content.hasContent())
336             pollContent(); // hungry succeed
337 
338     }
339 
340     /**
341      * Blocks until some content or some end-of-file event arrives.
342      *
343      * @throws IOException if the wait is interrupted
344      */
345     protected void blockForContent() throws IOException
346     {
347         try
348         {
349             long timeout=0;
350             if (_blockingTimeoutAt>=0)
351             {
352                 timeout=_blockingTimeoutAt-System.currentTimeMillis();
353                 if (timeout<=0)
354                     throw new TimeoutException();
355             }
356 
357             if (LOG.isDebugEnabled())
358                 LOG.debug("{} blocking for content timeout={} ...", this,timeout);
359             if (timeout>0)
360                 _inputQ.wait(timeout);
361             else
362                 _inputQ.wait();
363 
364             if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt)
365                 throw new TimeoutException();
366         }
367         catch (Throwable e)
368         {
369             throw (IOException)new InterruptedIOException().initCause(e);
370         }
371     }
372 
373     /**
374      * Adds some content to the start of this input stream.
375      * <p>Typically used to push back content that has
376      * been read, perhaps mutated.  The bytes prepended are
377      * deducted for the contentConsumed total</p>
378      * @param item the content to add
379      * @return true if content channel woken for read
380      */
381     public boolean prependContent(Content item)
382     {
383         boolean woken=false;
384         synchronized (_inputQ)
385         {
386             _inputQ.push(item);
387             _contentConsumed-=item.remaining();
388             if (LOG.isDebugEnabled())
389                 LOG.debug("{} prependContent {}", this, item);
390 
391             if (_listener==null)
392                 _inputQ.notify();
393             else
394                 woken=_channelState.onReadPossible();
395         }
396 
397         return woken;
398     }
399     
400     /**
401      * Adds some content to this input stream.
402      *
403      * @param item the content to add
404      * @return true if content channel woken for read
405      */
406     public boolean addContent(Content item)
407     {
408         boolean woken=false;
409         synchronized (_inputQ)
410         {
411             _inputQ.offer(item);
412             if (LOG.isDebugEnabled())
413                 LOG.debug("{} addContent {}", this, item);
414 
415             if (_listener==null)
416                 _inputQ.notify();
417             else
418                 woken=_channelState.onReadPossible();
419         }
420 
421         return woken;
422     }
423 
424     public boolean hasContent()
425     {
426         synchronized (_inputQ)
427         {
428             return _inputQ.size()>0;
429         }
430     }
431 
432     public void unblock()
433     {
434         synchronized (_inputQ)
435         {
436             _inputQ.notify();
437         }
438     }
439 
440     public long getContentConsumed()
441     {
442         synchronized (_inputQ)
443         {
444             return _contentConsumed;
445         }
446     }
447 
448     /**
449      * This method should be called to signal that an EOF has been
450      * detected before all the expected content arrived.
451      * <p>
452      * Typically this will result in an EOFException being thrown
453      * from a subsequent read rather than a -1 return.
454      * @return true if content channel woken for read
455      */
456     public boolean earlyEOF()
457     {
458         return addContent(EARLY_EOF_CONTENT);
459     }
460 
461     /**
462      * This method should be called to signal that all the expected
463      * content arrived.
464      * @return true if content channel woken for read
465      */
466     public boolean eof()
467     {
468        return addContent(EOF_CONTENT);
469     }
470 
471     public boolean consumeAll()
472     {
473         synchronized (_inputQ)
474         {
475             try
476             {
477                 while (!isFinished())
478                 {
479                     Content item = nextContent();
480                     if (item == null)
481                         break; // Let's not bother blocking
482 
483                     skip(item, remaining(item));
484                 }
485                 return isFinished() && !isError();
486             }
487             catch (IOException e)
488             {
489                 LOG.debug(e);
490                 return false;
491             }
492         }
493     }
494 
495     public boolean isError()
496     {
497         synchronized (_inputQ)
498         {
499             return _state instanceof ErrorState;
500         }
501     }
502 
503     public boolean isAsync()
504     {
505         synchronized (_inputQ)
506         {
507             return _state==ASYNC;
508         }
509     }
510 
511     @Override
512     public boolean isFinished()
513     {
514         synchronized (_inputQ)
515         {
516             return _state instanceof EOFState;
517         }
518     }
519 
520 
521     @Override
522     public boolean isReady()
523     {
524         try
525         {
526             synchronized (_inputQ)
527             {
528                 if (_listener == null )
529                     return true;
530                 if (_state instanceof EOFState)
531                     return true;
532                 if (nextReadable()!=null)
533                     return true;
534 
535                 _channelState.onReadUnready();
536             }
537             return false;
538         }
539         catch(IOException e)
540         {
541             LOG.ignore(e);
542             return true;
543         }
544     }
545 
546     @Override
547     public void setReadListener(ReadListener readListener)
548     {
549         readListener = Objects.requireNonNull(readListener);
550         boolean woken=false;
551         try
552         {
553             synchronized (_inputQ)
554             {
555                 if (_listener != null)
556                     throw new IllegalStateException("ReadListener already set");
557                 if (_state != STREAM)
558                     throw new IllegalStateException("State "+STREAM+" != " + _state);
559 
560                 _state = ASYNC;
561                 _listener = readListener;
562                 boolean content=nextContent()!=null;
563 
564                 if (content)
565                     woken = _channelState.onReadReady();
566                 else
567                     _channelState.onReadUnready();
568             }
569         }
570         catch(IOException e)
571         {
572             throw new RuntimeIOException(e);
573         }
574 
575         if (woken)
576             wake();
577     }
578 
579     public boolean failed(Throwable x)
580     {
581         boolean woken=false;
582         synchronized (_inputQ)
583         {
584             if (_state instanceof ErrorState)
585                 LOG.warn(x);
586             else
587                 _state = new ErrorState(x);
588 
589             if (_listener==null)
590                 _inputQ.notify();
591             else
592                 woken=_channelState.onReadPossible();
593         }
594 
595         return woken;
596     }
597 
598     /* ------------------------------------------------------------ */
599     /*
600      * <p>
601      * While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
602      * runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)}
603      * to setup classloaders etc.
604      * </p>
605      */
606     @Override
607     public void run()
608     {
609         final Throwable error;
610         final ReadListener listener;
611         boolean aeof=false;
612 
613         synchronized (_inputQ)
614         {
615             if (_state==EOF)
616                 return;
617 
618             if (_state==AEOF)
619             {
620                 _state=EOF;
621                 aeof=true;
622             }
623 
624             listener = _listener;
625             error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
626         }
627 
628         try
629         {
630             if (error!=null)
631             {
632                 _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
633                 listener.onError(error);
634             }
635             else if (aeof)
636             {
637                 listener.onAllDataRead();
638             }
639             else
640             {
641                 listener.onDataAvailable();
642             }
643         }
644         catch (Throwable e)
645         {
646             LOG.warn(e.toString());
647             LOG.debug(e);
648             try
649             {
650                 if (aeof || error==null)
651                 {
652                     _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
653                     listener.onError(e);
654                 }
655             }
656             catch (Throwable e2)
657             {
658                 LOG.warn(e2.toString());
659                 LOG.debug(e2);
660                 throw new RuntimeIOException(e2);
661             }
662         }
663     }
664 
665     @Override
666     public String toString()
667     {
668         return String.format("%s@%x[c=%d,s=%s]",
669                 getClass().getSimpleName(),
670                 hashCode(),
671                 _contentConsumed,
672                 _state);
673     }
674 
675     public static class PoisonPillContent extends Content
676     {
677         private final String _name;
678         public PoisonPillContent(String name)
679         {
680             super(BufferUtil.EMPTY_BUFFER);
681             _name=name;
682         }
683 
684         @Override
685         public String toString()
686         {
687             return _name;
688         }
689     }
690 
691     public static class EofContent extends PoisonPillContent
692     {
693         EofContent(String name)
694         {
695             super(name);
696         }
697     }
698 
699     public static class Content implements Callback
700     {
701         private final ByteBuffer _content;
702 
703         public Content(ByteBuffer content)
704         {
705             _content=content;
706         }
707 
708         @Override
709         public boolean isNonBlocking()
710         {
711             return true;
712         }
713 
714 
715         public ByteBuffer getContent()
716         {
717             return _content;
718         }
719 
720         public boolean hasContent()
721         {
722             return _content.hasRemaining();
723         }
724 
725         public int remaining()
726         {
727             return _content.remaining();
728         }
729 
730         @Override
731         public String toString()
732         {
733             return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
734         }
735     }
736 
737 
738     protected static abstract class State
739     {
740         public boolean blockForContent(HttpInput in) throws IOException
741         {
742             return false;
743         }
744 
745         public int noContent() throws IOException
746         {
747             return -1;
748         }
749     }
750 
751     protected static class EOFState extends State
752     {
753     }
754 
755     protected class ErrorState extends EOFState
756     {
757         final Throwable _error;
758         ErrorState(Throwable error)
759         {
760             _error=error;
761         }
762 
763         public Throwable getError()
764         {
765             return _error;
766         }
767 
768         @Override
769         public int noContent() throws IOException
770         {
771             if (_error instanceof IOException)
772                 throw (IOException)_error;
773             throw new IOException(_error);
774         }
775 
776         @Override
777         public String toString()
778         {
779             return "ERROR:"+_error;
780         }
781     }
782 
783     protected static final State STREAM = new State()
784     {
785         @Override
786         public boolean blockForContent(HttpInput input) throws IOException
787         {
788             input.blockForContent();
789             return true;
790         }
791 
792         @Override
793         public String toString()
794         {
795             return "STREAM";
796         }
797     };
798 
799     protected static final State ASYNC = new State()
800     {
801         @Override
802         public int noContent() throws IOException
803         {
804             return 0;
805         }
806 
807         @Override
808         public String toString()
809         {
810             return "ASYNC";
811         }
812     };
813 
814     protected static final State EARLY_EOF = new EOFState()
815     {
816         @Override
817         public int noContent() throws IOException
818         {
819             throw new EofException("Early EOF");
820         }
821 
822         @Override
823         public String toString()
824         {
825             return "EARLY_EOF";
826         }
827     };
828 
829     protected static final State EOF = new EOFState()
830     {
831         @Override
832         public String toString()
833         {
834             return "EOF";
835         }
836     };
837 
838     protected static final State AEOF = new EOFState()
839     {
840         @Override
841         public String toString()
842         {
843             return "AEOF";
844         }
845     };
846 
847 }