1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayDeque;
25 import java.util.Objects;
26 import java.util.Queue;
27 import java.util.concurrent.TimeoutException;
28
29 import javax.servlet.ReadListener;
30 import javax.servlet.ServletInputStream;
31
32 import org.eclipse.jetty.io.EofException;
33 import org.eclipse.jetty.io.RuntimeIOException;
34 import org.eclipse.jetty.util.BufferUtil;
35 import org.eclipse.jetty.util.Callback;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38
39
40
41
42
43
44
45
46
47 public class HttpInput extends ServletInputStream implements Runnable
48 {
49 private final static Logger LOG = Log.getLogger(HttpInput.class);
50 private final static Content EOF_CONTENT = new EofContent("EOF");
51 private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
52
53 private final byte[] _oneByteBuffer = new byte[1];
54 private final Queue<Content> _inputQ = new ArrayDeque<>();
55 private final HttpChannelState _channelState;
56 private ReadListener _listener;
57 private State _state = STREAM;
58 private long _contentConsumed;
59 private long _blockingTimeoutAt = -1;
60
61 public HttpInput(HttpChannelState state)
62 {
63 _channelState=state;
64 if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0)
65 _blockingTimeoutAt=0;
66 }
67
68 protected HttpChannelState getHttpChannelState()
69 {
70 return _channelState;
71 }
72
73 public void recycle()
74 {
75 synchronized (_inputQ)
76 {
77 Content item = _inputQ.poll();
78 while (item != null)
79 {
80 item.failed(null);
81 item = _inputQ.poll();
82 }
83 _listener = null;
84 _state = STREAM;
85 _contentConsumed = 0;
86 }
87 }
88
89 @Override
90 public int available()
91 {
92 int available=0;
93 boolean woken=false;
94 synchronized (_inputQ)
95 {
96 Content content = _inputQ.peek();
97 if (content==null)
98 {
99 try
100 {
101 produceContent();
102 }
103 catch(IOException e)
104 {
105 woken=failed(e);
106 }
107 content = _inputQ.peek();
108 }
109
110 if (content!=null)
111 available= remaining(content);
112 }
113
114 if (woken)
115 wake();
116 return available;
117 }
118
119 private void wake()
120 {
121 _channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel());
122 }
123
124
125 @Override
126 public int read() throws IOException
127 {
128 int read = read(_oneByteBuffer, 0, 1);
129 if (read==0)
130 throw new IllegalStateException("unready read=0");
131 return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
132 }
133
134 @Override
135 public int read(byte[] b, int off, int len) throws IOException
136 {
137 synchronized (_inputQ)
138 {
139 if (_blockingTimeoutAt>=0 && !isAsync())
140 _blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout();
141
142 while(true)
143 {
144 Content item = nextContent();
145 if (item!=null)
146 {
147 if (LOG.isDebugEnabled())
148 LOG.debug("{} read {} from {}",this,len,item);
149 int l = get(item, b, off, len);
150
151 consumeNonContent();
152
153 return l;
154 }
155
156 if (!_state.blockForContent(this))
157 return _state.noContent();
158 }
159 }
160 }
161
162
163
164
165
166
167
168
169 protected void produceContent() throws IOException
170 {
171 }
172
173
174
175
176
177
178
179
180 protected Content nextContent() throws IOException
181 {
182 Content content = pollContent();
183 if (content==null && !isFinished())
184 {
185 produceContent();
186 content = pollContent();
187 }
188 return content;
189 }
190
191
192
193
194
195
196 protected Content pollContent()
197 {
198
199 Content content = _inputQ.peek();
200
201 while (content != null && remaining(content) == 0)
202 {
203 _inputQ.poll();
204 content.succeeded();
205 if (LOG.isDebugEnabled())
206 LOG.debug("{} consumed {}", this, content);
207
208 if (content==EOF_CONTENT)
209 {
210 if (_listener==null)
211 _state=EOF;
212 else
213 {
214 _state=AEOF;
215 boolean woken = _channelState.onReadReady();
216 if (woken)
217 wake();
218 }
219 }
220 else if (content==EARLY_EOF_CONTENT)
221 _state=EARLY_EOF;
222
223 content = _inputQ.peek();
224 }
225
226 return content;
227 }
228
229
230
231 protected void consumeNonContent()
232 {
233
234 Content content = _inputQ.peek();
235
236 while (content != null && remaining(content) == 0)
237 {
238
239 if (content instanceof EofContent)
240 break;
241
242
243 _inputQ.poll();
244 content.succeeded();
245 if (LOG.isDebugEnabled())
246 LOG.debug("{} consumed {}", this, content);
247 content = _inputQ.peek();
248 }
249 }
250
251
252
253
254
255
256
257
258 protected Content nextReadable() throws IOException
259 {
260 Content content = pollReadable();
261 if (content==null && !isFinished())
262 {
263 produceContent();
264 content = pollReadable();
265 }
266 return content;
267 }
268
269
270
271
272
273
274 protected Content pollReadable()
275 {
276
277 Content content = _inputQ.peek();
278
279
280 while (content != null)
281 {
282 if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0)
283 return content;
284
285 _inputQ.poll();
286 content.succeeded();
287 if (LOG.isDebugEnabled())
288 LOG.debug("{} consumed {}", this, content);
289 content = _inputQ.peek();
290 }
291
292 return null;
293 }
294
295
296
297
298
299 protected int remaining(Content item)
300 {
301 return item.remaining();
302 }
303
304
305
306
307
308
309
310
311
312
313 protected int get(Content content, byte[] buffer, int offset, int length)
314 {
315 int l = Math.min(content.remaining(), length);
316 content.getContent().get(buffer, offset, l);
317 _contentConsumed+=l;
318 return l;
319 }
320
321
322
323
324
325
326
327
328 protected void skip(Content content, int length)
329 {
330 int l = Math.min(content.remaining(), length);
331 ByteBuffer buffer = content.getContent();
332 buffer.position(buffer.position()+l);
333 _contentConsumed+=l;
334 if (l>0 && !content.hasContent())
335 pollContent();
336
337 }
338
339
340
341
342
343
344 protected void blockForContent() throws IOException
345 {
346 try
347 {
348 long timeout=0;
349 if (_blockingTimeoutAt>=0)
350 {
351 timeout=_blockingTimeoutAt-System.currentTimeMillis();
352 if (timeout<=0)
353 throw new TimeoutException();
354 }
355
356 if (LOG.isDebugEnabled())
357 LOG.debug("{} blocking for content timeout={} ...", this,timeout);
358 if (timeout>0)
359 _inputQ.wait(timeout);
360 else
361 _inputQ.wait();
362
363 if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt)
364 throw new TimeoutException();
365 }
366 catch (Throwable e)
367 {
368 throw (IOException)new InterruptedIOException().initCause(e);
369 }
370 }
371
372
373
374
375
376
377
378 public boolean addContent(Content item)
379 {
380 boolean woken=false;
381 synchronized (_inputQ)
382 {
383 _inputQ.offer(item);
384 if (LOG.isDebugEnabled())
385 LOG.debug("{} addContent {}", this, item);
386
387 if (_listener==null)
388 _inputQ.notify();
389 else
390 woken=_channelState.onReadPossible();
391 }
392
393 return woken;
394 }
395
396 public boolean hasContent()
397 {
398 synchronized (_inputQ)
399 {
400 return _inputQ.size()>0;
401 }
402 }
403
404 public void unblock()
405 {
406 synchronized (_inputQ)
407 {
408 _inputQ.notify();
409 }
410 }
411
412 public long getContentConsumed()
413 {
414 synchronized (_inputQ)
415 {
416 return _contentConsumed;
417 }
418 }
419
420
421
422
423
424
425
426
427
428 public boolean earlyEOF()
429 {
430 return addContent(EARLY_EOF_CONTENT);
431 }
432
433
434
435
436
437
438 public boolean eof()
439 {
440 return addContent(EOF_CONTENT);
441 }
442
443 public boolean consumeAll()
444 {
445 synchronized (_inputQ)
446 {
447 try
448 {
449 while (!isFinished())
450 {
451 Content item = nextContent();
452 if (item == null)
453 break;
454
455 skip(item, remaining(item));
456 }
457 return isFinished() && !isError();
458 }
459 catch (IOException e)
460 {
461 LOG.debug(e);
462 return false;
463 }
464 }
465 }
466
467 public boolean isError()
468 {
469 synchronized (_inputQ)
470 {
471 return _state instanceof ErrorState;
472 }
473 }
474
475 public boolean isAsync()
476 {
477 synchronized (_inputQ)
478 {
479 return _state==ASYNC;
480 }
481 }
482
483 @Override
484 public boolean isFinished()
485 {
486 synchronized (_inputQ)
487 {
488 return _state instanceof EOFState;
489 }
490 }
491
492
493 @Override
494 public boolean isReady()
495 {
496 try
497 {
498 synchronized (_inputQ)
499 {
500 if (_listener == null )
501 return true;
502 if (_state instanceof EOFState)
503 return true;
504 if (nextReadable()!=null)
505 return true;
506
507 _channelState.onReadUnready();
508 }
509 return false;
510 }
511 catch(IOException e)
512 {
513 LOG.ignore(e);
514 return true;
515 }
516 }
517
518 @Override
519 public void setReadListener(ReadListener readListener)
520 {
521 readListener = Objects.requireNonNull(readListener);
522 boolean woken=false;
523 try
524 {
525 synchronized (_inputQ)
526 {
527 if (_listener != null)
528 throw new IllegalStateException("ReadListener already set");
529 if (_state != STREAM)
530 throw new IllegalStateException("State "+STREAM+" != " + _state);
531
532 _state = ASYNC;
533 _listener = readListener;
534 boolean content=nextContent()!=null;
535
536 if (content)
537 woken = _channelState.onReadReady();
538 else
539 _channelState.onReadUnready();
540 }
541 }
542 catch(IOException e)
543 {
544 throw new RuntimeIOException(e);
545 }
546
547 if (woken)
548 wake();
549 }
550
551 public boolean failed(Throwable x)
552 {
553 boolean woken=false;
554 synchronized (_inputQ)
555 {
556 if (_state instanceof ErrorState)
557 LOG.warn(x);
558 else
559 _state = new ErrorState(x);
560
561 if (_listener==null)
562 _inputQ.notify();
563 else
564 woken=_channelState.onReadPossible();
565 }
566
567 return woken;
568 }
569
570
571
572
573
574
575
576
577
578 @Override
579 public void run()
580 {
581 final Throwable error;
582 final ReadListener listener;
583 boolean aeof=false;
584
585 synchronized (_inputQ)
586 {
587 if (_state==EOF)
588 return;
589
590 if (_state==AEOF)
591 {
592 _state=EOF;
593 aeof=true;
594 }
595
596 listener = _listener;
597 error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
598 }
599
600 try
601 {
602 if (error!=null)
603 {
604 _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
605 listener.onError(error);
606 }
607 else if (aeof)
608 {
609 listener.onAllDataRead();
610 }
611 else
612 {
613 listener.onDataAvailable();
614 }
615 }
616 catch (Throwable e)
617 {
618 LOG.warn(e.toString());
619 LOG.debug(e);
620 try
621 {
622 if (aeof || error==null)
623 {
624 _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
625 listener.onError(e);
626 }
627 }
628 catch (Throwable e2)
629 {
630 LOG.warn(e2.toString());
631 LOG.debug(e2);
632 throw new RuntimeIOException(e2);
633 }
634 }
635 }
636
637 @Override
638 public String toString()
639 {
640 return String.format("%s@%x[c=%d,s=%s]",
641 getClass().getSimpleName(),
642 hashCode(),
643 _contentConsumed,
644 _state);
645 }
646
647 public static class PoisonPillContent extends Content
648 {
649 private final String _name;
650 public PoisonPillContent(String name)
651 {
652 super(BufferUtil.EMPTY_BUFFER);
653 _name=name;
654 }
655
656 @Override
657 public String toString()
658 {
659 return _name;
660 }
661 }
662
663 public static class EofContent extends PoisonPillContent
664 {
665 EofContent(String name)
666 {
667 super(name);
668 }
669 }
670
671 public static class Content implements Callback
672 {
673 private final ByteBuffer _content;
674
675 public Content(ByteBuffer content)
676 {
677 _content=content;
678 }
679
680 @Override
681 public boolean isNonBlocking()
682 {
683 return true;
684 }
685
686
687 public ByteBuffer getContent()
688 {
689 return _content;
690 }
691
692 public boolean hasContent()
693 {
694 return _content.hasRemaining();
695 }
696
697 public int remaining()
698 {
699 return _content.remaining();
700 }
701
702 @Override
703 public String toString()
704 {
705 return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
706 }
707 }
708
709
710 protected static abstract class State
711 {
712 public boolean blockForContent(HttpInput in) throws IOException
713 {
714 return false;
715 }
716
717 public int noContent() throws IOException
718 {
719 return -1;
720 }
721 }
722
723 protected static class EOFState extends State
724 {
725 }
726
727 protected class ErrorState extends EOFState
728 {
729 final Throwable _error;
730 ErrorState(Throwable error)
731 {
732 _error=error;
733 }
734
735 public Throwable getError()
736 {
737 return _error;
738 }
739
740 @Override
741 public int noContent() throws IOException
742 {
743 if (_error instanceof IOException)
744 throw (IOException)_error;
745 throw new IOException(_error);
746 }
747
748 @Override
749 public String toString()
750 {
751 return "ERROR:"+_error;
752 }
753 }
754
755 protected static final State STREAM = new State()
756 {
757 @Override
758 public boolean blockForContent(HttpInput input) throws IOException
759 {
760 input.blockForContent();
761 return true;
762 }
763
764 @Override
765 public String toString()
766 {
767 return "STREAM";
768 }
769 };
770
771 protected static final State ASYNC = new State()
772 {
773 @Override
774 public int noContent() throws IOException
775 {
776 return 0;
777 }
778
779 @Override
780 public String toString()
781 {
782 return "ASYNC";
783 }
784 };
785
786 protected static final State EARLY_EOF = new EOFState()
787 {
788 @Override
789 public int noContent() throws IOException
790 {
791 throw new EofException("Early EOF");
792 }
793
794 @Override
795 public String toString()
796 {
797 return "EARLY_EOF";
798 }
799 };
800
801 protected static final State EOF = new EOFState()
802 {
803 @Override
804 public String toString()
805 {
806 return "EOF";
807 }
808 };
809
810 protected static final State AEOF = new EOFState()
811 {
812 @Override
813 public String toString()
814 {
815 return "AEOF";
816 }
817 };
818
819 }