1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ReadableByteChannel;
25 import java.nio.channels.WritePendingException;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import javax.servlet.RequestDispatcher;
29 import javax.servlet.ServletOutputStream;
30 import javax.servlet.ServletRequest;
31 import javax.servlet.ServletResponse;
32 import javax.servlet.WriteListener;
33
34 import org.eclipse.jetty.http.HttpContent;
35 import org.eclipse.jetty.io.EofException;
36 import org.eclipse.jetty.util.BlockingCallback;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.Callback;
39 import org.eclipse.jetty.util.IteratingCallback;
40 import org.eclipse.jetty.util.IteratingNestedCallback;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43
44
45
46
47
48
49
50
51
52
53
54 public class HttpOutput extends ServletOutputStream implements Runnable
55 {
56 private static Logger LOG = Log.getLogger(HttpOutput.class);
57 private final HttpChannel<?> _channel;
58 private long _written;
59 private ByteBuffer _aggregate;
60 private int _bufferSize;
61 private int _commitSize;
62 private WriteListener _writeListener;
63 private volatile Throwable _onError;
64
65
66
67
68
69
70
71
72
73
74 enum State { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
75 private final AtomicReference<State> _state=new AtomicReference<>(State.OPEN);
76
77 public HttpOutput(HttpChannel<?> channel)
78 {
79 _channel = channel;
80 _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
81 _commitSize=_bufferSize/4;
82 }
83
84 public boolean isWritten()
85 {
86 return _written > 0;
87 }
88
89 public long getWritten()
90 {
91 return _written;
92 }
93
94 public void reset()
95 {
96 _written = 0;
97 reopen();
98 }
99
100 public void reopen()
101 {
102 _state.set(State.OPEN);
103 }
104
105 public boolean isAllContentWritten()
106 {
107 return _channel.getResponse().isAllContentWritten(_written);
108 }
109
110 @Override
111 public void close()
112 {
113 State state=_state.get();
114 while(state!=State.CLOSED)
115 {
116 if (_state.compareAndSet(state,State.CLOSED))
117 {
118 try
119 {
120 if (BufferUtil.hasContent(_aggregate))
121 _channel.write(_aggregate, !_channel.getResponse().isIncluding());
122 else
123 _channel.write(BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
124 }
125 catch(IOException e)
126 {
127 LOG.debug(e);
128 _channel.failed();
129 }
130 releaseBuffer();
131 return;
132 }
133 state=_state.get();
134 }
135 }
136
137
138 void closed()
139 {
140 State state=_state.get();
141 while(state!=State.CLOSED)
142 {
143 if (_state.compareAndSet(state,State.CLOSED))
144 {
145 try
146 {
147 _channel.getResponse().closeOutput();
148 }
149 catch(IOException e)
150 {
151 LOG.debug(e);
152 _channel.failed();
153 }
154 releaseBuffer();
155 return;
156 }
157 state=_state.get();
158 }
159 }
160
161 private void releaseBuffer()
162 {
163 if (_aggregate != null)
164 {
165 _channel.getConnector().getByteBufferPool().release(_aggregate);
166 _aggregate = null;
167 }
168 }
169
170 public boolean isClosed()
171 {
172 return _state.get()==State.CLOSED;
173 }
174
175 @Override
176 public void flush() throws IOException
177 {
178 while(true)
179 {
180 switch(_state.get())
181 {
182 case OPEN:
183 if (BufferUtil.hasContent(_aggregate))
184 _channel.write(_aggregate, false);
185 else
186 _channel.write(BufferUtil.EMPTY_BUFFER, false);
187 return;
188
189 case ASYNC:
190 throw new IllegalStateException("isReady() not called");
191
192 case READY:
193 if (!_state.compareAndSet(State.READY, State.PENDING))
194 continue;
195 new AsyncFlush().process();
196 return;
197
198 case PENDING:
199 case UNREADY:
200 throw new WritePendingException();
201
202 case CLOSED:
203 return;
204 }
205 break;
206 }
207 }
208
209
210 @Override
211 public void write(byte[] b, int off, int len) throws IOException
212 {
213 _written+=len;
214 boolean complete=_channel.getResponse().isAllContentWritten(_written);
215
216
217 while(true)
218 {
219 switch(_state.get())
220 {
221 case OPEN:
222
223 break;
224
225 case ASYNC:
226 throw new IllegalStateException("isReady() not called");
227
228 case READY:
229 if (!_state.compareAndSet(State.READY, State.PENDING))
230 continue;
231
232
233 int capacity = getBufferSize();
234 if (!complete && len<=_commitSize)
235 {
236 if (_aggregate == null)
237 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
238
239
240 int filled = BufferUtil.fill(_aggregate, b, off, len);
241
242
243 if (filled==len && !BufferUtil.isFull(_aggregate))
244 {
245 if (!_state.compareAndSet(State.PENDING, State.ASYNC))
246 throw new IllegalStateException();
247 return;
248 }
249
250
251 off+=filled;
252 len-=filled;
253 }
254
255
256 new AsyncWrite(b,off,len,complete).process();
257 return;
258
259 case PENDING:
260 case UNREADY:
261 throw new WritePendingException();
262
263 case CLOSED:
264 throw new EofException("Closed");
265 }
266 break;
267 }
268
269
270
271
272
273 int capacity = getBufferSize();
274 if (!complete && len<=_commitSize)
275 {
276 if (_aggregate == null)
277 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
278
279
280 int filled = BufferUtil.fill(_aggregate, b, off, len);
281
282
283 if (filled==len && !BufferUtil.isFull(_aggregate))
284 return;
285
286
287 off+=filled;
288 len-=filled;
289 }
290
291
292 if (BufferUtil.hasContent(_aggregate))
293 {
294 _channel.write(_aggregate, complete && len==0);
295
296
297 if (len>0 && !complete && len<=_commitSize)
298 {
299 BufferUtil.append(_aggregate, b, off, len);
300 return;
301 }
302 }
303
304
305 if (len>0)
306
307 _channel.write(ByteBuffer.wrap(b, off, len).asReadOnlyBuffer(), complete);
308 else if (complete)
309 _channel.write(BufferUtil.EMPTY_BUFFER,complete);
310
311 if (complete)
312 {
313 closed();
314 }
315
316 }
317
318 public void write(ByteBuffer buffer) throws IOException
319 {
320 _written+=buffer.remaining();
321 boolean complete=_channel.getResponse().isAllContentWritten(_written);
322
323
324 while(true)
325 {
326 switch(_state.get())
327 {
328 case OPEN:
329
330 break;
331
332 case ASYNC:
333 throw new IllegalStateException("isReady() not called");
334
335 case READY:
336 if (!_state.compareAndSet(State.READY, State.PENDING))
337 continue;
338
339
340 new AsyncWrite(buffer,complete).process();
341 return;
342
343 case PENDING:
344 case UNREADY:
345 throw new WritePendingException();
346
347 case CLOSED:
348 throw new EofException("Closed");
349 }
350 break;
351 }
352
353
354
355 int len=BufferUtil.length(buffer);
356
357
358 if (BufferUtil.hasContent(_aggregate))
359 _channel.write(_aggregate, complete && len==0);
360
361
362 if (len>0)
363 _channel.write(buffer, complete);
364 else if (complete)
365 _channel.write(BufferUtil.EMPTY_BUFFER,complete);
366
367 if (complete)
368 closed();
369 }
370
371 @Override
372 public void write(int b) throws IOException
373 {
374 _written+=1;
375 boolean complete=_channel.getResponse().isAllContentWritten(_written);
376
377
378 while(true)
379 {
380 switch(_state.get())
381 {
382 case OPEN:
383 if (_aggregate == null)
384 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
385 BufferUtil.append(_aggregate, (byte)b);
386
387
388 if (complete || BufferUtil.isFull(_aggregate))
389 {
390 BlockingCallback callback = _channel.getWriteBlockingCallback();
391 _channel.write(_aggregate, complete, callback);
392 callback.block();
393 if (complete)
394 closed();
395 }
396 break;
397
398 case ASYNC:
399 throw new IllegalStateException("isReady() not called");
400
401 case READY:
402 if (!_state.compareAndSet(State.READY, State.PENDING))
403 continue;
404
405 if (_aggregate == null)
406 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
407 BufferUtil.append(_aggregate, (byte)b);
408
409
410 if (!complete && !BufferUtil.isFull(_aggregate))
411 {
412 if (!_state.compareAndSet(State.PENDING, State.ASYNC))
413 throw new IllegalStateException();
414 return;
415 }
416
417
418 new AsyncFlush().process();
419 return;
420
421 case PENDING:
422 case UNREADY:
423 throw new WritePendingException();
424
425 case CLOSED:
426 throw new EofException("Closed");
427 }
428 break;
429 }
430 }
431
432
433
434 @Override
435 public void print(String s) throws IOException
436 {
437 if (isClosed())
438 throw new IOException("Closed");
439
440 write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
441 }
442
443
444
445
446
447
448 public void sendContent(ByteBuffer content) throws IOException
449 {
450 final BlockingCallback callback =_channel.getWriteBlockingCallback();
451 if (content.hasArray()&&content.limit()<content.capacity())
452 content=content.asReadOnlyBuffer();
453 _channel.write(content,true,callback);
454 callback.block();
455 }
456
457
458
459
460
461
462 public void sendContent(InputStream in) throws IOException
463 {
464 final BlockingCallback callback =_channel.getWriteBlockingCallback();
465 new InputStreamWritingCB(in,callback).iterate();
466 callback.block();
467 }
468
469
470
471
472
473
474 public void sendContent(ReadableByteChannel in) throws IOException
475 {
476 final BlockingCallback callback =_channel.getWriteBlockingCallback();
477 new ReadableByteChannelWritingCB(in,callback).iterate();
478 callback.block();
479 }
480
481
482
483
484
485
486
487 public void sendContent(HttpContent content) throws IOException
488 {
489 final BlockingCallback callback =_channel.getWriteBlockingCallback();
490 sendContent(content,callback);
491 callback.block();
492 }
493
494
495
496
497
498
499 public void sendContent(ByteBuffer content, final Callback callback)
500 {
501 if (content.hasArray()&&content.limit()<content.capacity())
502 content=content.asReadOnlyBuffer();
503 _channel.write(content,true,new Callback()
504 {
505 @Override
506 public void succeeded()
507 {
508 closed();
509 callback.succeeded();
510 }
511
512 @Override
513 public void failed(Throwable x)
514 {
515 callback.failed(x);
516 }
517 });
518 }
519
520
521
522
523
524
525
526 public void sendContent(InputStream in, Callback callback)
527 {
528 new InputStreamWritingCB(in,callback).iterate();
529 }
530
531
532
533
534
535
536
537 public void sendContent(ReadableByteChannel in, Callback callback)
538 {
539 new ReadableByteChannelWritingCB(in,callback).iterate();
540 }
541
542
543
544
545
546
547 public void sendContent(HttpContent httpContent, Callback callback) throws IOException
548 {
549 if (BufferUtil.hasContent(_aggregate))
550 throw new IOException("written");
551 if (_channel.isCommitted())
552 throw new IOException("committed");
553
554 while (true)
555 {
556 switch(_state.get())
557 {
558 case OPEN:
559 if (!_state.compareAndSet(State.OPEN, State.PENDING))
560 continue;
561 break;
562 case CLOSED:
563 throw new EofException("Closed");
564 default:
565 throw new IllegalStateException();
566 }
567 break;
568 }
569 ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
570 if (buffer == null)
571 buffer = httpContent.getIndirectBuffer();
572
573 if (buffer!=null)
574 {
575 sendContent(buffer,callback);
576 return;
577 }
578
579 ReadableByteChannel rbc=httpContent.getReadableByteChannel();
580 if (rbc!=null)
581 {
582
583 sendContent(rbc,callback);
584 return;
585 }
586
587 InputStream in = httpContent.getInputStream();
588 if ( in!=null )
589 {
590 sendContent(in,callback);
591 return;
592 }
593
594 callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
595 }
596
597 public int getBufferSize()
598 {
599 return _bufferSize;
600 }
601
602 public void setBufferSize(int size)
603 {
604 _bufferSize = size;
605 _commitSize = size;
606 }
607
608 public void resetBuffer()
609 {
610 if (BufferUtil.hasContent(_aggregate))
611 BufferUtil.clear(_aggregate);
612 }
613
614 @Override
615 public void setWriteListener(WriteListener writeListener)
616 {
617 if (!_channel.getState().isAsync())
618 throw new IllegalStateException("!ASYNC");
619
620 if (_state.compareAndSet(State.OPEN, State.READY))
621 {
622 _writeListener = writeListener;
623 _channel.getState().onWritePossible();
624 }
625 else
626 throw new IllegalStateException();
627 }
628
629
630
631
632 @Override
633 public boolean isReady()
634 {
635 while (true)
636 {
637 switch(_state.get())
638 {
639 case OPEN:
640 return true;
641 case ASYNC:
642 if (!_state.compareAndSet(State.ASYNC, State.READY))
643 continue;
644 return true;
645 case READY:
646 return true;
647 case PENDING:
648 if (!_state.compareAndSet(State.PENDING, State.UNREADY))
649 continue;
650 return false;
651 case UNREADY:
652 return false;
653 case CLOSED:
654 return false;
655 }
656 }
657 }
658
659 @Override
660 public void run()
661 {
662 if(_onError!=null)
663 {
664 Throwable th=_onError;
665 _onError=null;
666 _writeListener.onError(th);
667 close();
668 }
669 if (_state.get()==State.READY)
670 {
671 try
672 {
673 _writeListener.onWritePossible();
674 }
675 catch (Throwable e)
676 {
677 _writeListener.onError(e);
678 close();
679 }
680 }
681 }
682
683 private class AsyncWrite extends AsyncFlush
684 {
685 private final ByteBuffer _buffer;
686 private final boolean _complete;
687 private final int _len;
688
689 public AsyncWrite(byte[] b, int off, int len, boolean complete)
690 {
691 _buffer=ByteBuffer.wrap(b, off, len);
692 _complete=complete;
693 _len=len;
694 }
695
696 public AsyncWrite(ByteBuffer buffer, boolean complete)
697 {
698 _buffer=buffer;
699 _complete=complete;
700 _len=buffer.remaining();
701 }
702
703 @Override
704 protected boolean process()
705 {
706
707 if (BufferUtil.hasContent(_aggregate))
708 {
709 _channel.write(_aggregate, _complete && _len==0, this);
710 return false;
711 }
712
713 if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
714 {
715 BufferUtil.put(_buffer,_aggregate);
716 }
717 else if (_len>0 && !_flushed)
718 {
719 _flushed=true;
720 _channel.write(_buffer, _complete, this);
721 return false;
722 }
723 else if (_len==0 && !_flushed)
724 {
725 _flushed=true;
726 _channel.write(BufferUtil.EMPTY_BUFFER, _complete, this);
727 return false;
728 }
729
730 if (_complete)
731 closed();
732 return true;
733 }
734 }
735
736 private class AsyncFlush extends IteratingCallback
737 {
738 protected boolean _flushed;
739
740 public AsyncFlush()
741 {
742 }
743
744 @Override
745 protected boolean process()
746 {
747 if (BufferUtil.hasContent(_aggregate))
748 {
749 _flushed=true;
750 _channel.write(_aggregate, false, this);
751 return false;
752 }
753
754 if (!_flushed)
755 {
756 _flushed=true;
757 _channel.write(BufferUtil.EMPTY_BUFFER,false,this);
758 return false;
759 }
760
761 return true;
762 }
763
764 @Override
765 protected void completed()
766 {
767 try
768 {
769 while(true)
770 {
771 State last=_state.get();
772 switch(last)
773 {
774 case PENDING:
775 if (!_state.compareAndSet(State.PENDING, State.ASYNC))
776 continue;
777 break;
778
779 case UNREADY:
780 if (!_state.compareAndSet(State.UNREADY, State.READY))
781 continue;
782 _channel.getState().onWritePossible();
783 break;
784
785 case CLOSED:
786 _onError=new EofException("Closed");
787 break;
788
789 default:
790 throw new IllegalStateException();
791 }
792 break;
793 }
794 }
795 catch (Exception e)
796 {
797 _onError=e;
798 _channel.getState().onWritePossible();
799 }
800 }
801
802 @Override
803 public void failed(Throwable e)
804 {
805 super.failed(e);
806 _onError=e;
807 _channel.getState().onWritePossible();
808 }
809
810
811 }
812
813
814
815
816
817
818
819
820
821
822 private class InputStreamWritingCB extends IteratingNestedCallback
823 {
824 private final InputStream _in;
825 private final ByteBuffer _buffer;
826 private boolean _eof;
827
828 public InputStreamWritingCB(InputStream in, Callback callback)
829 {
830 super(callback);
831 _in=in;
832 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
833 }
834
835 @Override
836 protected boolean process() throws Exception
837 {
838
839
840 if (_eof)
841 {
842
843 _in.close();
844 closed();
845 _channel.getByteBufferPool().release(_buffer);
846 return true;
847 }
848
849
850 int len=0;
851 while (len<_buffer.capacity() && !_eof)
852 {
853 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
854 if (r<0)
855 _eof=true;
856 else
857 len+=r;
858 }
859
860
861 _buffer.position(0);
862 _buffer.limit(len);
863 _channel.write(_buffer,_eof,this);
864
865 return false;
866 }
867
868 @Override
869 public void failed(Throwable x)
870 {
871 super.failed(x);
872 _channel.getByteBufferPool().release(_buffer);
873 try
874 {
875 _in.close();
876 }
877 catch (IOException e)
878 {
879 LOG.ignore(e);
880 }
881 }
882
883 }
884
885
886
887
888
889
890
891
892
893
894 private class ReadableByteChannelWritingCB extends IteratingNestedCallback
895 {
896 private final ReadableByteChannel _in;
897 private final ByteBuffer _buffer;
898 private boolean _eof;
899
900 public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
901 {
902 super(callback);
903 _in=in;
904 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
905 }
906
907 @Override
908 protected boolean process() throws Exception
909 {
910
911
912 if (_eof)
913 {
914 _in.close();
915 closed();
916 _channel.getByteBufferPool().release(_buffer);
917 return true;
918 }
919
920
921 _buffer.clear();
922 while (_buffer.hasRemaining() && !_eof)
923 _eof = (_in.read(_buffer)) < 0;
924
925
926 _buffer.flip();
927 _channel.write(_buffer,_eof,this);
928
929 return false;
930 }
931
932 @Override
933 public void failed(Throwable x)
934 {
935 super.failed(x);
936 _channel.getByteBufferPool().release(_buffer);
937 try
938 {
939 _in.close();
940 }
941 catch (IOException e)
942 {
943 LOG.ignore(e);
944 }
945 }
946 }
947
948 }