1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ReadableByteChannel;
25 import java.nio.channels.WritePendingException;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.servlet.RequestDispatcher;
28 import javax.servlet.ServletOutputStream;
29 import javax.servlet.ServletRequest;
30 import javax.servlet.ServletResponse;
31 import javax.servlet.WriteListener;
32
33 import org.eclipse.jetty.http.HttpContent;
34 import org.eclipse.jetty.io.EofException;
35 import org.eclipse.jetty.util.BufferUtil;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.IteratingCallback;
38 import org.eclipse.jetty.util.IteratingNestedCallback;
39 import org.eclipse.jetty.util.SharedBlockingCallback;
40 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43
44
45
46
47
48
49
50
51
52
53
54 public class HttpOutput extends ServletOutputStream implements Runnable
55 {
56 private static Logger LOG = Log.getLogger(HttpOutput.class);
57 private final HttpChannel<?> _channel;
58 private final SharedBlockingCallback _writeblock=new SharedBlockingCallback();
59 private long _written;
60 private ByteBuffer _aggregate;
61 private int _bufferSize;
62 private int _commitSize;
63 private WriteListener _writeListener;
64 private volatile Throwable _onError;
65
66
67
68
69
70
71
72
73
74
75
76
77 enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
78 private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
79
80 public HttpOutput(HttpChannel<?> channel)
81 {
82 _channel = channel;
83 _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
84 _commitSize=_bufferSize/4;
85 }
86
87 public HttpChannel<?> getHttpChannel()
88 {
89 return _channel;
90 }
91
92 public boolean isWritten()
93 {
94 return _written > 0;
95 }
96
97 public long getWritten()
98 {
99 return _written;
100 }
101
102 public void reset()
103 {
104 _written = 0;
105 reopen();
106 }
107
108 public void reopen()
109 {
110 _state.set(OutputState.OPEN);
111 }
112
113 public boolean isAllContentWritten()
114 {
115 return _channel.getResponse().isAllContentWritten(_written);
116 }
117
118 protected Blocker acquireWriteBlockingCallback() throws IOException
119 {
120 return _writeblock.acquire();
121 }
122
123 protected void write(ByteBuffer content, boolean complete) throws IOException
124 {
125 try (Blocker blocker=_writeblock.acquire())
126 {
127 write(content,complete,blocker);
128 blocker.block();
129 }
130 }
131
132 protected void write(ByteBuffer content, boolean complete, Callback callback)
133 {
134 _channel.write(content,complete,callback);
135 }
136
137 @Override
138 public void close()
139 {
140 loop: while(true)
141 {
142 OutputState state=_state.get();
143 switch (state)
144 {
145 case CLOSED:
146 break loop;
147
148 case UNREADY:
149 if (_state.compareAndSet(state,OutputState.ERROR))
150 _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
151 continue;
152
153 default:
154 if (_state.compareAndSet(state,OutputState.CLOSED))
155 {
156 try
157 {
158 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
159 }
160 catch(IOException e)
161 {
162 LOG.debug(e);
163 _channel.failed();
164 }
165 releaseBuffer();
166 return;
167 }
168 }
169 }
170 }
171
172
173 void closed()
174 {
175 loop: while(true)
176 {
177 OutputState state=_state.get();
178 switch (state)
179 {
180 case CLOSED:
181 break loop;
182
183 case UNREADY:
184 if (_state.compareAndSet(state,OutputState.ERROR))
185 _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
186 continue;
187
188 default:
189 if (_state.compareAndSet(state,OutputState.CLOSED))
190 {
191 try
192 {
193 _channel.getResponse().closeOutput();
194 }
195 catch(IOException e)
196 {
197 LOG.debug(e);
198 _channel.failed();
199 }
200 releaseBuffer();
201 return;
202 }
203 }
204 }
205 }
206
207 private void releaseBuffer()
208 {
209 if (_aggregate != null)
210 {
211 _channel.getConnector().getByteBufferPool().release(_aggregate);
212 _aggregate = null;
213 }
214 }
215
216 public boolean isClosed()
217 {
218 return _state.get()==OutputState.CLOSED;
219 }
220
221 @Override
222 public void flush() throws IOException
223 {
224 while(true)
225 {
226 switch(_state.get())
227 {
228 case OPEN:
229 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
230 return;
231
232 case ASYNC:
233 throw new IllegalStateException("isReady() not called");
234
235 case READY:
236 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
237 continue;
238 new AsyncFlush().iterate();
239 return;
240
241 case PENDING:
242 case UNREADY:
243 throw new WritePendingException();
244
245 case ERROR:
246 throw new EofException(_onError);
247
248 case CLOSED:
249 return;
250 }
251 break;
252 }
253 }
254
255
256 @Override
257 public void write(byte[] b, int off, int len) throws IOException
258 {
259 _written+=len;
260 boolean complete=_channel.getResponse().isAllContentWritten(_written);
261
262
263 while(true)
264 {
265 switch(_state.get())
266 {
267 case OPEN:
268
269 break;
270
271 case ASYNC:
272 throw new IllegalStateException("isReady() not called");
273
274 case READY:
275 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
276 continue;
277
278
279 if (!complete && len<=_commitSize)
280 {
281 if (_aggregate == null)
282 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
283
284
285 int filled = BufferUtil.fill(_aggregate, b, off, len);
286
287
288 if (filled==len && !BufferUtil.isFull(_aggregate))
289 {
290 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
291 throw new IllegalStateException();
292 return;
293 }
294
295
296 off+=filled;
297 len-=filled;
298 }
299
300
301 new AsyncWrite(b,off,len,complete).iterate();
302 return;
303
304 case PENDING:
305 case UNREADY:
306 throw new WritePendingException();
307
308 case ERROR:
309 throw new EofException(_onError);
310
311 case CLOSED:
312 throw new EofException("Closed");
313 }
314 break;
315 }
316
317
318
319
320
321 int capacity = getBufferSize();
322 if (!complete && len<=_commitSize)
323 {
324 if (_aggregate == null)
325 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
326
327
328 int filled = BufferUtil.fill(_aggregate, b, off, len);
329
330
331 if (filled==len && !BufferUtil.isFull(_aggregate))
332 return;
333
334
335 off+=filled;
336 len-=filled;
337 }
338
339
340 if (BufferUtil.hasContent(_aggregate))
341 {
342 write(_aggregate, complete && len==0);
343
344
345 if (len>0 && !complete && len<=_commitSize)
346 {
347 BufferUtil.append(_aggregate, b, off, len);
348 return;
349 }
350 }
351
352
353 if (len>0)
354 {
355 ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
356 ByteBuffer view = wrap.duplicate();
357
358
359
360 while (len>getBufferSize())
361 {
362 int p=view.position();
363 int l=p+getBufferSize();
364 view.limit(p+getBufferSize());
365 write(view,false);
366 len-=getBufferSize();
367 view.limit(l+Math.min(len,getBufferSize()));
368 view.position(l);
369 }
370 write(view,complete);
371 }
372 else if (complete)
373 write(BufferUtil.EMPTY_BUFFER,complete);
374
375 if (complete)
376 closed();
377
378 }
379
380 public void write(ByteBuffer buffer) throws IOException
381 {
382 _written+=buffer.remaining();
383 boolean complete=_channel.getResponse().isAllContentWritten(_written);
384
385
386 while(true)
387 {
388 switch(_state.get())
389 {
390 case OPEN:
391
392 break;
393
394 case ASYNC:
395 throw new IllegalStateException("isReady() not called");
396
397 case READY:
398 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
399 continue;
400
401
402 new AsyncWrite(buffer,complete).iterate();
403 return;
404
405 case PENDING:
406 case UNREADY:
407 throw new WritePendingException();
408
409 case ERROR:
410 throw new EofException(_onError);
411
412 case CLOSED:
413 throw new EofException("Closed");
414 }
415 break;
416 }
417
418
419
420 int len=BufferUtil.length(buffer);
421
422
423 if (BufferUtil.hasContent(_aggregate))
424 write(_aggregate, complete && len==0);
425
426
427 if (len>0)
428 write(buffer, complete);
429 else if (complete)
430 write(BufferUtil.EMPTY_BUFFER,complete);
431
432 if (complete)
433 closed();
434 }
435
436 @Override
437 public void write(int b) throws IOException
438 {
439 _written+=1;
440 boolean complete=_channel.getResponse().isAllContentWritten(_written);
441
442
443 while(true)
444 {
445 switch(_state.get())
446 {
447 case OPEN:
448 if (_aggregate == null)
449 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
450 BufferUtil.append(_aggregate, (byte)b);
451
452
453 if (complete || BufferUtil.isFull(_aggregate))
454 {
455 try(Blocker blocker=_writeblock.acquire())
456 {
457 write(_aggregate, complete, blocker);
458 blocker.block();
459 }
460 if (complete)
461 closed();
462 }
463 break;
464
465 case ASYNC:
466 throw new IllegalStateException("isReady() not called");
467
468 case READY:
469 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
470 continue;
471
472 if (_aggregate == null)
473 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
474 BufferUtil.append(_aggregate, (byte)b);
475
476
477 if (!complete && !BufferUtil.isFull(_aggregate))
478 {
479 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
480 throw new IllegalStateException();
481 return;
482 }
483
484
485 new AsyncFlush().iterate();
486 return;
487
488 case PENDING:
489 case UNREADY:
490 throw new WritePendingException();
491
492 case ERROR:
493 throw new EofException(_onError);
494
495 case CLOSED:
496 throw new EofException("Closed");
497 }
498 break;
499 }
500 }
501
502 @Override
503 public void print(String s) throws IOException
504 {
505 if (isClosed())
506 throw new IOException("Closed");
507
508 write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
509 }
510
511
512
513
514
515
516 public void sendContent(ByteBuffer content) throws IOException
517 {
518 try(Blocker blocker=_writeblock.acquire())
519 {
520 write(content,true,blocker);
521 blocker.block();
522 }
523 }
524
525
526
527
528
529
530 public void sendContent(InputStream in) throws IOException
531 {
532 try(Blocker blocker=_writeblock.acquire())
533 {
534 new InputStreamWritingCB(in,blocker).iterate();
535 blocker.block();
536 }
537 }
538
539
540
541
542
543
544 public void sendContent(ReadableByteChannel in) throws IOException
545 {
546 try(Blocker blocker=_writeblock.acquire())
547 {
548 new ReadableByteChannelWritingCB(in,blocker).iterate();
549 blocker.block();
550 }
551 }
552
553
554
555
556
557
558
559 public void sendContent(HttpContent content) throws IOException
560 {
561 try(Blocker blocker=_writeblock.acquire())
562 {
563 sendContent(content,blocker);
564 blocker.block();
565 }
566 }
567
568
569
570
571
572
573 public void sendContent(ByteBuffer content, final Callback callback)
574 {
575 write(content,true,new Callback()
576 {
577 @Override
578 public void succeeded()
579 {
580 closed();
581 callback.succeeded();
582 }
583
584 @Override
585 public void failed(Throwable x)
586 {
587 callback.failed(x);
588 }
589 });
590 }
591
592
593
594
595
596
597
598 public void sendContent(InputStream in, Callback callback)
599 {
600 new InputStreamWritingCB(in,callback).iterate();
601 }
602
603
604
605
606
607
608
609 public void sendContent(ReadableByteChannel in, Callback callback)
610 {
611 new ReadableByteChannelWritingCB(in,callback).iterate();
612 }
613
614
615
616
617
618
619 public void sendContent(HttpContent httpContent, Callback callback) throws IOException
620 {
621 if (BufferUtil.hasContent(_aggregate))
622 throw new IOException("written");
623 if (_channel.isCommitted())
624 throw new IOException("committed");
625
626 while (true)
627 {
628 switch(_state.get())
629 {
630 case OPEN:
631 if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
632 continue;
633 break;
634 case ERROR:
635 throw new EofException(_onError);
636 case CLOSED:
637 throw new EofException("Closed");
638 default:
639 throw new IllegalStateException();
640 }
641 break;
642 }
643 ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
644 if (buffer == null)
645 buffer = httpContent.getIndirectBuffer();
646
647 if (buffer!=null)
648 {
649 sendContent(buffer,callback);
650 return;
651 }
652
653 ReadableByteChannel rbc=httpContent.getReadableByteChannel();
654 if (rbc!=null)
655 {
656
657 sendContent(rbc,callback);
658 return;
659 }
660
661 InputStream in = httpContent.getInputStream();
662 if ( in!=null )
663 {
664 sendContent(in,callback);
665 return;
666 }
667
668 callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
669 }
670
671 public int getBufferSize()
672 {
673 return _bufferSize;
674 }
675
676 public void setBufferSize(int size)
677 {
678 _bufferSize = size;
679 _commitSize = size;
680 }
681
682 public void resetBuffer()
683 {
684 if (BufferUtil.hasContent(_aggregate))
685 BufferUtil.clear(_aggregate);
686 }
687
688 @Override
689 public void setWriteListener(WriteListener writeListener)
690 {
691 if (!_channel.getState().isAsync())
692 throw new IllegalStateException("!ASYNC");
693
694 if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
695 {
696 _writeListener = writeListener;
697 _channel.getState().onWritePossible();
698 }
699 else
700 throw new IllegalStateException();
701 }
702
703
704
705
706 @Override
707 public boolean isReady()
708 {
709 while (true)
710 {
711 switch(_state.get())
712 {
713 case OPEN:
714 return true;
715 case ASYNC:
716 if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
717 continue;
718 return true;
719 case READY:
720 return true;
721 case PENDING:
722 if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
723 continue;
724 return false;
725 case UNREADY:
726 return false;
727
728 case ERROR:
729 return true;
730
731 case CLOSED:
732 return true;
733 }
734 }
735 }
736
737 @Override
738 public void run()
739 {
740 loop: while (true)
741 {
742 OutputState state = _state.get();
743
744 if(_onError!=null)
745 {
746 switch(state)
747 {
748 case CLOSED:
749 case ERROR:
750 _onError=null;
751 break loop;
752
753 default:
754 if (_state.compareAndSet(state, OutputState.ERROR))
755 {
756 Throwable th=_onError;
757 _onError=null;
758 LOG.debug("onError",th);
759 _writeListener.onError(th);
760 close();
761
762 break loop;
763 }
764
765 }
766 continue loop;
767 }
768
769 switch(_state.get())
770 {
771 case READY:
772 case CLOSED:
773
774
775
776 try
777 {
778 _writeListener.onWritePossible();
779 break loop;
780 }
781 catch (Throwable e)
782 {
783 _onError=e;
784 }
785 break;
786 default:
787
788 }
789 }
790 }
791
792 @Override
793 public String toString()
794 {
795 return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
796 }
797
798 private abstract class AsyncICB extends IteratingCallback
799 {
800 @Override
801 protected void completed()
802 {
803 while(true)
804 {
805 OutputState last=_state.get();
806 switch(last)
807 {
808 case PENDING:
809 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
810 continue;
811 break;
812
813 case UNREADY:
814 if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
815 continue;
816 _channel.getState().onWritePossible();
817 break;
818
819 case CLOSED:
820 break;
821
822 default:
823 throw new IllegalStateException();
824 }
825 break;
826 }
827 }
828
829 @Override
830 public void failed(Throwable e)
831 {
832 super.failed(e);
833 _onError=e;
834 _channel.getState().onWritePossible();
835 }
836 }
837
838
839 private class AsyncFlush extends AsyncICB
840 {
841 protected volatile boolean _flushed;
842
843 public AsyncFlush()
844 {
845 }
846
847 @Override
848 protected Action process()
849 {
850 if (BufferUtil.hasContent(_aggregate))
851 {
852 _flushed=true;
853 write(_aggregate, false, this);
854 return Action.SCHEDULED;
855 }
856
857 if (!_flushed)
858 {
859 _flushed=true;
860 write(BufferUtil.EMPTY_BUFFER,false,this);
861 return Action.SCHEDULED;
862 }
863
864 return Action.SUCCEEDED;
865 }
866 }
867
868
869
870 private class AsyncWrite extends AsyncICB
871 {
872 private final ByteBuffer _buffer;
873 private final ByteBuffer _slice;
874 private final boolean _complete;
875 private final int _len;
876 protected volatile boolean _completed;
877
878 public AsyncWrite(byte[] b, int off, int len, boolean complete)
879 {
880 _buffer=ByteBuffer.wrap(b, off, len);
881 _len=len;
882
883 _slice=_len<getBufferSize()?null:_buffer.duplicate();
884 _complete=complete;
885 }
886
887 public AsyncWrite(ByteBuffer buffer, boolean complete)
888 {
889 _buffer=buffer;
890 _len=buffer.remaining();
891
892 _slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
893 _complete=complete;
894 }
895
896 @Override
897 protected Action process()
898 {
899
900 if (BufferUtil.hasContent(_aggregate))
901 {
902 _completed=_len==0;
903 write(_aggregate, _complete && _completed, this);
904 return Action.SCHEDULED;
905 }
906
907
908 if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
909 {
910 int position = BufferUtil.flipToFill(_aggregate);
911 BufferUtil.put(_buffer,_aggregate);
912 BufferUtil.flipToFlush(_aggregate, position);
913 return Action.SUCCEEDED;
914 }
915
916
917 if (_buffer.hasRemaining())
918 {
919
920 if (_slice==null)
921 {
922 _completed=true;
923 write(_buffer, _complete, this);
924 return Action.SCHEDULED;
925 }
926
927
928 int p=_buffer.position();
929 int l=Math.min(getBufferSize(),_buffer.remaining());
930 int pl=p+l;
931 _slice.limit(pl);
932 _buffer.position(pl);
933 _slice.position(p);
934 _completed=!_buffer.hasRemaining();
935 write(_slice, _complete && _completed, this);
936 return Action.SCHEDULED;
937 }
938
939
940
941 if (_complete && !_completed)
942 {
943 _completed=true;
944 write(BufferUtil.EMPTY_BUFFER, _complete, this);
945 return Action.SCHEDULED;
946 }
947
948 return Action.SUCCEEDED;
949 }
950
951 @Override
952 protected void completed()
953 {
954 super.completed();
955 if (_complete)
956 closed();
957 }
958
959
960 }
961
962
963
964
965
966
967
968
969
970
971 private class InputStreamWritingCB extends IteratingNestedCallback
972 {
973 private final InputStream _in;
974 private final ByteBuffer _buffer;
975 private boolean _eof;
976
977 public InputStreamWritingCB(InputStream in, Callback callback)
978 {
979 super(callback);
980 _in=in;
981 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
982 }
983
984 @Override
985 protected Action process() throws Exception
986 {
987
988
989 if (_eof)
990 {
991
992 _in.close();
993 closed();
994 _channel.getByteBufferPool().release(_buffer);
995 return Action.SUCCEEDED;
996 }
997
998
999 int len=0;
1000 while (len<_buffer.capacity() && !_eof)
1001 {
1002 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1003 if (r<0)
1004 _eof=true;
1005 else
1006 len+=r;
1007 }
1008
1009
1010 _buffer.position(0);
1011 _buffer.limit(len);
1012 write(_buffer,_eof,this);
1013 return Action.SCHEDULED;
1014 }
1015
1016 @Override
1017 public void failed(Throwable x)
1018 {
1019 super.failed(x);
1020 _channel.getByteBufferPool().release(_buffer);
1021 try
1022 {
1023 _in.close();
1024 }
1025 catch (IOException e)
1026 {
1027 LOG.ignore(e);
1028 }
1029 }
1030
1031 }
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042 private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1043 {
1044 private final ReadableByteChannel _in;
1045 private final ByteBuffer _buffer;
1046 private boolean _eof;
1047
1048 public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1049 {
1050 super(callback);
1051 _in=in;
1052 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1053 }
1054
1055 @Override
1056 protected Action process() throws Exception
1057 {
1058
1059
1060 if (_eof)
1061 {
1062 _in.close();
1063 closed();
1064 _channel.getByteBufferPool().release(_buffer);
1065 return Action.SUCCEEDED;
1066 }
1067
1068
1069 _buffer.clear();
1070 while (_buffer.hasRemaining() && !_eof)
1071 _eof = (_in.read(_buffer)) < 0;
1072
1073
1074 _buffer.flip();
1075 write(_buffer,_eof,this);
1076
1077 return Action.SCHEDULED;
1078 }
1079
1080 @Override
1081 public void failed(Throwable x)
1082 {
1083 super.failed(x);
1084 _channel.getByteBufferPool().release(_buffer);
1085 try
1086 {
1087 _in.close();
1088 }
1089 catch (IOException e)
1090 {
1091 LOG.ignore(e);
1092 }
1093 }
1094 }
1095
1096 }