1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayDeque;
25 import java.util.Deque;
26 import java.util.Objects;
27 import java.util.Queue;
28 import java.util.concurrent.TimeoutException;
29
30 import javax.servlet.ReadListener;
31 import javax.servlet.ServletInputStream;
32
33 import org.eclipse.jetty.io.EofException;
34 import org.eclipse.jetty.io.RuntimeIOException;
35 import org.eclipse.jetty.util.BufferUtil;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.log.Logger;
39
40
41
42
43
44
45
46
47
48 public class HttpInput extends ServletInputStream implements Runnable
49 {
50 private final static Logger LOG = Log.getLogger(HttpInput.class);
51 private final static Content EOF_CONTENT = new EofContent("EOF");
52 private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
53
54 private final byte[] _oneByteBuffer = new byte[1];
55 private final Deque<Content> _inputQ = new ArrayDeque<>();
56 private final HttpChannelState _channelState;
57 private ReadListener _listener;
58 private State _state = STREAM;
59 private long _contentConsumed;
60 private long _blockingTimeoutAt = -1;
61
62 public HttpInput(HttpChannelState state)
63 {
64 _channelState=state;
65 if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0)
66 _blockingTimeoutAt=0;
67 }
68
69 protected HttpChannelState getHttpChannelState()
70 {
71 return _channelState;
72 }
73
74 public void recycle()
75 {
76 synchronized (_inputQ)
77 {
78 Content item = _inputQ.poll();
79 while (item != null)
80 {
81 item.failed(null);
82 item = _inputQ.poll();
83 }
84 _listener = null;
85 _state = STREAM;
86 _contentConsumed = 0;
87 }
88 }
89
90 @Override
91 public int available()
92 {
93 int available=0;
94 boolean woken=false;
95 synchronized (_inputQ)
96 {
97 Content content = _inputQ.peek();
98 if (content==null)
99 {
100 try
101 {
102 produceContent();
103 }
104 catch(IOException e)
105 {
106 woken=failed(e);
107 }
108 content = _inputQ.peek();
109 }
110
111 if (content!=null)
112 available= remaining(content);
113 }
114
115 if (woken)
116 wake();
117 return available;
118 }
119
120 private void wake()
121 {
122 _channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel());
123 }
124
125
126 @Override
127 public int read() throws IOException
128 {
129 int read = read(_oneByteBuffer, 0, 1);
130 if (read==0)
131 throw new IllegalStateException("unready read=0");
132 return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
133 }
134
135 @Override
136 public int read(byte[] b, int off, int len) throws IOException
137 {
138 synchronized (_inputQ)
139 {
140 if (_blockingTimeoutAt>=0 && !isAsync())
141 _blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout();
142
143 while(true)
144 {
145 Content item = nextContent();
146 if (item!=null)
147 {
148 if (LOG.isDebugEnabled())
149 LOG.debug("{} read {} from {}",this,len,item);
150 int l = get(item, b, off, len);
151
152 consumeNonContent();
153
154 return l;
155 }
156
157 if (!_state.blockForContent(this))
158 return _state.noContent();
159 }
160 }
161 }
162
163
164
165
166
167
168
169
170 protected void produceContent() throws IOException
171 {
172 }
173
174
175
176
177
178
179
180
181 protected Content nextContent() throws IOException
182 {
183 Content content = pollContent();
184 if (content==null && !isFinished())
185 {
186 produceContent();
187 content = pollContent();
188 }
189 return content;
190 }
191
192
193
194
195
196
197 protected Content pollContent()
198 {
199
200 Content content = _inputQ.peek();
201
202 while (content != null && remaining(content) == 0)
203 {
204 _inputQ.poll();
205 content.succeeded();
206 if (LOG.isDebugEnabled())
207 LOG.debug("{} consumed {}", this, content);
208
209 if (content==EOF_CONTENT)
210 {
211 if (_listener==null)
212 _state=EOF;
213 else
214 {
215 _state=AEOF;
216 boolean woken = _channelState.onReadReady();
217 if (woken)
218 wake();
219 }
220 }
221 else if (content==EARLY_EOF_CONTENT)
222 _state=EARLY_EOF;
223
224 content = _inputQ.peek();
225 }
226
227 return content;
228 }
229
230
231
232 protected void consumeNonContent()
233 {
234
235 Content content = _inputQ.peek();
236
237 while (content != null && remaining(content) == 0)
238 {
239
240 if (content instanceof EofContent)
241 break;
242
243
244 _inputQ.poll();
245 content.succeeded();
246 if (LOG.isDebugEnabled())
247 LOG.debug("{} consumed {}", this, content);
248 content = _inputQ.peek();
249 }
250 }
251
252
253
254
255
256
257
258
259 protected Content nextReadable() throws IOException
260 {
261 Content content = pollReadable();
262 if (content==null && !isFinished())
263 {
264 produceContent();
265 content = pollReadable();
266 }
267 return content;
268 }
269
270
271
272
273
274
275 protected Content pollReadable()
276 {
277
278 Content content = _inputQ.peek();
279
280
281 while (content != null)
282 {
283 if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0)
284 return content;
285
286 _inputQ.poll();
287 content.succeeded();
288 if (LOG.isDebugEnabled())
289 LOG.debug("{} consumed {}", this, content);
290 content = _inputQ.peek();
291 }
292
293 return null;
294 }
295
296
297
298
299
300 protected int remaining(Content item)
301 {
302 return item.remaining();
303 }
304
305
306
307
308
309
310
311
312
313
314 protected int get(Content content, byte[] buffer, int offset, int length)
315 {
316 int l = Math.min(content.remaining(), length);
317 content.getContent().get(buffer, offset, l);
318 _contentConsumed+=l;
319 return l;
320 }
321
322
323
324
325
326
327
328
329 protected void skip(Content content, int length)
330 {
331 int l = Math.min(content.remaining(), length);
332 ByteBuffer buffer = content.getContent();
333 buffer.position(buffer.position()+l);
334 _contentConsumed+=l;
335 if (l>0 && !content.hasContent())
336 pollContent();
337
338 }
339
340
341
342
343
344
345 protected void blockForContent() throws IOException
346 {
347 try
348 {
349 long timeout=0;
350 if (_blockingTimeoutAt>=0)
351 {
352 timeout=_blockingTimeoutAt-System.currentTimeMillis();
353 if (timeout<=0)
354 throw new TimeoutException();
355 }
356
357 if (LOG.isDebugEnabled())
358 LOG.debug("{} blocking for content timeout={} ...", this,timeout);
359 if (timeout>0)
360 _inputQ.wait(timeout);
361 else
362 _inputQ.wait();
363
364 if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt)
365 throw new TimeoutException();
366 }
367 catch (Throwable e)
368 {
369 throw (IOException)new InterruptedIOException().initCause(e);
370 }
371 }
372
373
374
375
376
377
378
379
380
381 public boolean prependContent(Content item)
382 {
383 boolean woken=false;
384 synchronized (_inputQ)
385 {
386 _inputQ.push(item);
387 _contentConsumed-=item.remaining();
388 if (LOG.isDebugEnabled())
389 LOG.debug("{} prependContent {}", this, item);
390
391 if (_listener==null)
392 _inputQ.notify();
393 else
394 woken=_channelState.onReadPossible();
395 }
396
397 return woken;
398 }
399
400
401
402
403
404
405
406 public boolean addContent(Content item)
407 {
408 boolean woken=false;
409 synchronized (_inputQ)
410 {
411 _inputQ.offer(item);
412 if (LOG.isDebugEnabled())
413 LOG.debug("{} addContent {}", this, item);
414
415 if (_listener==null)
416 _inputQ.notify();
417 else
418 woken=_channelState.onReadPossible();
419 }
420
421 return woken;
422 }
423
424 public boolean hasContent()
425 {
426 synchronized (_inputQ)
427 {
428 return _inputQ.size()>0;
429 }
430 }
431
432 public void unblock()
433 {
434 synchronized (_inputQ)
435 {
436 _inputQ.notify();
437 }
438 }
439
440 public long getContentConsumed()
441 {
442 synchronized (_inputQ)
443 {
444 return _contentConsumed;
445 }
446 }
447
448
449
450
451
452
453
454
455
456 public boolean earlyEOF()
457 {
458 return addContent(EARLY_EOF_CONTENT);
459 }
460
461
462
463
464
465
466 public boolean eof()
467 {
468 return addContent(EOF_CONTENT);
469 }
470
471 public boolean consumeAll()
472 {
473 synchronized (_inputQ)
474 {
475 try
476 {
477 while (!isFinished())
478 {
479 Content item = nextContent();
480 if (item == null)
481 break;
482
483 skip(item, remaining(item));
484 }
485 return isFinished() && !isError();
486 }
487 catch (IOException e)
488 {
489 LOG.debug(e);
490 return false;
491 }
492 }
493 }
494
495 public boolean isError()
496 {
497 synchronized (_inputQ)
498 {
499 return _state instanceof ErrorState;
500 }
501 }
502
503 public boolean isAsync()
504 {
505 synchronized (_inputQ)
506 {
507 return _state==ASYNC;
508 }
509 }
510
511 @Override
512 public boolean isFinished()
513 {
514 synchronized (_inputQ)
515 {
516 return _state instanceof EOFState;
517 }
518 }
519
520
521 @Override
522 public boolean isReady()
523 {
524 try
525 {
526 synchronized (_inputQ)
527 {
528 if (_listener == null )
529 return true;
530 if (_state instanceof EOFState)
531 return true;
532 if (nextReadable()!=null)
533 return true;
534
535 _channelState.onReadUnready();
536 }
537 return false;
538 }
539 catch(IOException e)
540 {
541 LOG.ignore(e);
542 return true;
543 }
544 }
545
546 @Override
547 public void setReadListener(ReadListener readListener)
548 {
549 readListener = Objects.requireNonNull(readListener);
550 boolean woken=false;
551 try
552 {
553 synchronized (_inputQ)
554 {
555 if (_listener != null)
556 throw new IllegalStateException("ReadListener already set");
557 if (_state != STREAM)
558 throw new IllegalStateException("State "+STREAM+" != " + _state);
559
560 _state = ASYNC;
561 _listener = readListener;
562 boolean content=nextContent()!=null;
563
564 if (content)
565 woken = _channelState.onReadReady();
566 else
567 _channelState.onReadUnready();
568 }
569 }
570 catch(IOException e)
571 {
572 throw new RuntimeIOException(e);
573 }
574
575 if (woken)
576 wake();
577 }
578
579 public boolean failed(Throwable x)
580 {
581 boolean woken=false;
582 synchronized (_inputQ)
583 {
584 if (_state instanceof ErrorState)
585 LOG.warn(x);
586 else
587 _state = new ErrorState(x);
588
589 if (_listener==null)
590 _inputQ.notify();
591 else
592 woken=_channelState.onReadPossible();
593 }
594
595 return woken;
596 }
597
598
599
600
601
602
603
604
605
606 @Override
607 public void run()
608 {
609 final Throwable error;
610 final ReadListener listener;
611 boolean aeof=false;
612
613 synchronized (_inputQ)
614 {
615 if (_state==EOF)
616 return;
617
618 if (_state==AEOF)
619 {
620 _state=EOF;
621 aeof=true;
622 }
623
624 listener = _listener;
625 error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
626 }
627
628 try
629 {
630 if (error!=null)
631 {
632 _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
633 listener.onError(error);
634 }
635 else if (aeof)
636 {
637 listener.onAllDataRead();
638 }
639 else
640 {
641 listener.onDataAvailable();
642 }
643 }
644 catch (Throwable e)
645 {
646 LOG.warn(e.toString());
647 LOG.debug(e);
648 try
649 {
650 if (aeof || error==null)
651 {
652 _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
653 listener.onError(e);
654 }
655 }
656 catch (Throwable e2)
657 {
658 LOG.warn(e2.toString());
659 LOG.debug(e2);
660 throw new RuntimeIOException(e2);
661 }
662 }
663 }
664
665 @Override
666 public String toString()
667 {
668 return String.format("%s@%x[c=%d,s=%s]",
669 getClass().getSimpleName(),
670 hashCode(),
671 _contentConsumed,
672 _state);
673 }
674
675 public static class PoisonPillContent extends Content
676 {
677 private final String _name;
678 public PoisonPillContent(String name)
679 {
680 super(BufferUtil.EMPTY_BUFFER);
681 _name=name;
682 }
683
684 @Override
685 public String toString()
686 {
687 return _name;
688 }
689 }
690
691 public static class EofContent extends PoisonPillContent
692 {
693 EofContent(String name)
694 {
695 super(name);
696 }
697 }
698
699 public static class Content implements Callback
700 {
701 private final ByteBuffer _content;
702
703 public Content(ByteBuffer content)
704 {
705 _content=content;
706 }
707
708 @Override
709 public boolean isNonBlocking()
710 {
711 return true;
712 }
713
714
715 public ByteBuffer getContent()
716 {
717 return _content;
718 }
719
720 public boolean hasContent()
721 {
722 return _content.hasRemaining();
723 }
724
725 public int remaining()
726 {
727 return _content.remaining();
728 }
729
730 @Override
731 public String toString()
732 {
733 return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
734 }
735 }
736
737
738 protected static abstract class State
739 {
740 public boolean blockForContent(HttpInput in) throws IOException
741 {
742 return false;
743 }
744
745 public int noContent() throws IOException
746 {
747 return -1;
748 }
749 }
750
751 protected static class EOFState extends State
752 {
753 }
754
755 protected class ErrorState extends EOFState
756 {
757 final Throwable _error;
758 ErrorState(Throwable error)
759 {
760 _error=error;
761 }
762
763 public Throwable getError()
764 {
765 return _error;
766 }
767
768 @Override
769 public int noContent() throws IOException
770 {
771 if (_error instanceof IOException)
772 throw (IOException)_error;
773 throw new IOException(_error);
774 }
775
776 @Override
777 public String toString()
778 {
779 return "ERROR:"+_error;
780 }
781 }
782
783 protected static final State STREAM = new State()
784 {
785 @Override
786 public boolean blockForContent(HttpInput input) throws IOException
787 {
788 input.blockForContent();
789 return true;
790 }
791
792 @Override
793 public String toString()
794 {
795 return "STREAM";
796 }
797 };
798
799 protected static final State ASYNC = new State()
800 {
801 @Override
802 public int noContent() throws IOException
803 {
804 return 0;
805 }
806
807 @Override
808 public String toString()
809 {
810 return "ASYNC";
811 }
812 };
813
814 protected static final State EARLY_EOF = new EOFState()
815 {
816 @Override
817 public int noContent() throws IOException
818 {
819 throw new EofException("Early EOF");
820 }
821
822 @Override
823 public String toString()
824 {
825 return "EARLY_EOF";
826 }
827 };
828
829 protected static final State EOF = new EOFState()
830 {
831 @Override
832 public String toString()
833 {
834 return "EOF";
835 }
836 };
837
838 protected static final State AEOF = new EOFState()
839 {
840 @Override
841 public String toString()
842 {
843 return "AEOF";
844 }
845 };
846
847 }