1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.ReadableByteChannel;
26 import java.nio.channels.WritePendingException;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import javax.servlet.RequestDispatcher;
30 import javax.servlet.ServletOutputStream;
31 import javax.servlet.ServletRequest;
32 import javax.servlet.ServletResponse;
33 import javax.servlet.WriteListener;
34
35 import org.eclipse.jetty.http.HttpContent;
36 import org.eclipse.jetty.io.EofException;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.Callback;
39 import org.eclipse.jetty.util.IteratingCallback;
40 import org.eclipse.jetty.util.IteratingNestedCallback;
41 import org.eclipse.jetty.util.SharedBlockingCallback;
42 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45
46
47
48
49
50
51
52
53
54
55
56 public class HttpOutput extends ServletOutputStream implements Runnable
57 {
58 public interface Interceptor
59 {
60 void write(ByteBuffer content, boolean complete, Callback callback);
61 Interceptor getNextInterceptor();
62 boolean isOptimizedForDirectBuffers();
63 }
64
65 private static Logger LOG = Log.getLogger(HttpOutput.class);
66
67 private final HttpChannel _channel;
68 private final SharedBlockingCallback _writeBlock;
69 private Interceptor _interceptor;
70
71
72 private long _written;
73
74 private ByteBuffer _aggregate;
75 private int _bufferSize;
76 private int _commitSize;
77 private WriteListener _writeListener;
78 private volatile Throwable _onError;
79
80
81
82
83
84
85
86
87
88
89 private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
90 private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
91
92 public HttpOutput(HttpChannel channel)
93 {
94 _channel = channel;
95 _interceptor = channel;
96 _writeBlock = new SharedBlockingCallback()
97 {
98 @Override
99 protected long getIdleTimeout()
100 {
101 long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout();
102 if (bto>0)
103 return bto;
104 if (bto<0)
105 return -1;
106 return _channel.getIdleTimeout();
107 }
108 };
109 HttpConfiguration config = channel.getHttpConfiguration();
110 _bufferSize = config.getOutputBufferSize();
111 _commitSize = config.getOutputAggregationSize();
112 if (_commitSize>_bufferSize)
113 {
114 LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize);
115 _commitSize=_bufferSize;
116 }
117 }
118
119 public HttpChannel getHttpChannel()
120 {
121 return _channel;
122 }
123
124 public Interceptor getInterceptor()
125 {
126 return _interceptor;
127 }
128
129 public void setInterceptor(Interceptor filter)
130 {
131 _interceptor=filter;
132 }
133
134 public boolean isWritten()
135 {
136 return _written > 0;
137 }
138
139 public long getWritten()
140 {
141 return _written;
142 }
143
144 public void reopen()
145 {
146 _state.set(OutputState.OPEN);
147 }
148
149 public boolean isAllContentWritten()
150 {
151 return _channel.getResponse().isAllContentWritten(_written);
152 }
153
154 protected Blocker acquireWriteBlockingCallback() throws IOException
155 {
156 return _writeBlock.acquire();
157 }
158
159 private void write(ByteBuffer content, boolean complete) throws IOException
160 {
161 try (Blocker blocker = _writeBlock.acquire())
162 {
163 write(content, complete, blocker);
164 blocker.block();
165 }
166 catch (Throwable failure)
167 {
168 if (LOG.isDebugEnabled())
169 LOG.debug(failure);
170 abort(failure);
171 throw failure;
172 }
173 }
174
175 protected void write(ByteBuffer content, boolean complete, Callback callback)
176 {
177 _interceptor.write(content, complete, callback);
178 }
179
180 private void abort(Throwable failure)
181 {
182 closed();
183 _channel.abort(failure);
184 }
185
186 @Override
187 public void close()
188 {
189 while(true)
190 {
191 OutputState state=_state.get();
192 switch (state)
193 {
194 case CLOSED:
195 {
196 return;
197 }
198 case UNREADY:
199 {
200 if (_state.compareAndSet(state,OutputState.ERROR))
201 _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
202 break;
203 }
204 default:
205 {
206 if (!_state.compareAndSet(state,OutputState.CLOSED))
207 break;
208
209 try
210 {
211 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
212 }
213 catch (IOException x)
214 {
215
216 }
217 finally
218 {
219 releaseBuffer();
220 }
221
222 return;
223 }
224 }
225 }
226 }
227
228
229
230
231
232 void closed()
233 {
234 while(true)
235 {
236 OutputState state=_state.get();
237 switch (state)
238 {
239 case CLOSED:
240 {
241 return;
242 }
243 case UNREADY:
244 {
245 if (_state.compareAndSet(state,OutputState.ERROR))
246 _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
247 break;
248 }
249 default:
250 {
251 if (!_state.compareAndSet(state, OutputState.CLOSED))
252 break;
253
254 try
255 {
256 _channel.getResponse().closeOutput();
257 }
258 catch (Throwable x)
259 {
260 if (LOG.isDebugEnabled())
261 LOG.debug(x);
262 abort(x);
263 }
264 finally
265 {
266 releaseBuffer();
267 }
268
269 return;
270 }
271 }
272 }
273 }
274
275 private void releaseBuffer()
276 {
277 if (_aggregate != null)
278 {
279 _channel.getConnector().getByteBufferPool().release(_aggregate);
280 _aggregate = null;
281 }
282 }
283
284 public boolean isClosed()
285 {
286 return _state.get()==OutputState.CLOSED;
287 }
288
289 @Override
290 public void flush() throws IOException
291 {
292 while(true)
293 {
294 switch(_state.get())
295 {
296 case OPEN:
297 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
298 return;
299
300 case ASYNC:
301 throw new IllegalStateException("isReady() not called");
302
303 case READY:
304 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
305 continue;
306 new AsyncFlush().iterate();
307 return;
308
309 case PENDING:
310 case UNREADY:
311 throw new WritePendingException();
312
313 case ERROR:
314 throw new EofException(_onError);
315
316 case CLOSED:
317 return;
318
319 default:
320 throw new IllegalStateException();
321 }
322 }
323 }
324
325 @Override
326 public void write(byte[] b, int off, int len) throws IOException
327 {
328 _written+=len;
329 boolean complete=_channel.getResponse().isAllContentWritten(_written);
330
331
332 while(true)
333 {
334 switch(_state.get())
335 {
336 case OPEN:
337
338 break;
339
340 case ASYNC:
341 throw new IllegalStateException("isReady() not called");
342
343 case READY:
344 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
345 continue;
346
347
348 if (!complete && len<=_commitSize)
349 {
350 if (_aggregate == null)
351 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
352
353
354 int filled = BufferUtil.fill(_aggregate, b, off, len);
355
356
357 if (filled==len && !BufferUtil.isFull(_aggregate))
358 {
359 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
360 throw new IllegalStateException();
361 return;
362 }
363
364
365 off+=filled;
366 len-=filled;
367 }
368
369
370 new AsyncWrite(b,off,len,complete).iterate();
371 return;
372
373 case PENDING:
374 case UNREADY:
375 throw new WritePendingException();
376
377 case ERROR:
378 throw new EofException(_onError);
379
380 case CLOSED:
381 throw new EofException("Closed");
382
383 default:
384 throw new IllegalStateException();
385 }
386 break;
387 }
388
389
390
391
392 int capacity = getBufferSize();
393 if (!complete && len<=_commitSize)
394 {
395 if (_aggregate == null)
396 _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
397
398
399 int filled = BufferUtil.fill(_aggregate, b, off, len);
400
401
402 if (filled==len && !BufferUtil.isFull(_aggregate))
403 return;
404
405
406 off+=filled;
407 len-=filled;
408 }
409
410
411 if (BufferUtil.hasContent(_aggregate))
412 {
413 write(_aggregate, complete && len==0);
414
415
416 if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate))
417 {
418 BufferUtil.append(_aggregate, b, off, len);
419 return;
420 }
421 }
422
423
424 if (len>0)
425 {
426 ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
427 ByteBuffer view = wrap.duplicate();
428
429
430
431 while (len>getBufferSize())
432 {
433 int p=view.position();
434 int l=p+getBufferSize();
435 view.limit(p+getBufferSize());
436 write(view,false);
437 len-=getBufferSize();
438 view.limit(l+Math.min(len,getBufferSize()));
439 view.position(l);
440 }
441 write(view,complete);
442 }
443 else if (complete)
444 {
445 write(BufferUtil.EMPTY_BUFFER,true);
446 }
447
448 if (complete)
449 closed();
450 }
451
452 public void write(ByteBuffer buffer) throws IOException
453 {
454 _written+=buffer.remaining();
455 boolean complete=_channel.getResponse().isAllContentWritten(_written);
456
457
458 while(true)
459 {
460 switch(_state.get())
461 {
462 case OPEN:
463
464 break;
465
466 case ASYNC:
467 throw new IllegalStateException("isReady() not called");
468
469 case READY:
470 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
471 continue;
472
473
474 new AsyncWrite(buffer,complete).iterate();
475 return;
476
477 case PENDING:
478 case UNREADY:
479 throw new WritePendingException();
480
481 case ERROR:
482 throw new EofException(_onError);
483
484 case CLOSED:
485 throw new EofException("Closed");
486
487 default:
488 throw new IllegalStateException();
489 }
490 break;
491 }
492
493
494
495 int len=BufferUtil.length(buffer);
496
497
498 if (BufferUtil.hasContent(_aggregate))
499 write(_aggregate, complete && len==0);
500
501
502 if (len>0)
503 write(buffer, complete);
504 else if (complete)
505 write(BufferUtil.EMPTY_BUFFER, true);
506
507 if (complete)
508 closed();
509 }
510
511 @Override
512 public void write(int b) throws IOException
513 {
514 _written+=1;
515 boolean complete=_channel.getResponse().isAllContentWritten(_written);
516
517
518 while(true)
519 {
520 switch(_state.get())
521 {
522 case OPEN:
523 if (_aggregate == null)
524 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
525 BufferUtil.append(_aggregate, (byte)b);
526
527
528 if (complete || BufferUtil.isFull(_aggregate))
529 {
530 write(_aggregate, complete);
531 if (complete)
532 closed();
533 }
534 break;
535
536 case ASYNC:
537 throw new IllegalStateException("isReady() not called");
538
539 case READY:
540 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
541 continue;
542
543 if (_aggregate == null)
544 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
545 BufferUtil.append(_aggregate, (byte)b);
546
547
548 if (!complete && !BufferUtil.isFull(_aggregate))
549 {
550 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
551 throw new IllegalStateException();
552 return;
553 }
554
555
556 new AsyncFlush().iterate();
557 return;
558
559 case PENDING:
560 case UNREADY:
561 throw new WritePendingException();
562
563 case ERROR:
564 throw new EofException(_onError);
565
566 case CLOSED:
567 throw new EofException("Closed");
568
569 default:
570 throw new IllegalStateException();
571 }
572 break;
573 }
574 }
575
576 @Override
577 public void print(String s) throws IOException
578 {
579 if (isClosed())
580 throw new IOException("Closed");
581
582 write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
583 }
584
585
586
587
588
589
590
591 public void sendContent(ByteBuffer content) throws IOException
592 {
593 if (LOG.isDebugEnabled())
594 LOG.debug("sendContent({})",BufferUtil.toDetailString(content));
595
596 write(content, true);
597 closed();
598 }
599
600
601
602
603
604
605
606 public void sendContent(InputStream in) throws IOException
607 {
608 try(Blocker blocker = _writeBlock.acquire())
609 {
610 new InputStreamWritingCB(in, blocker).iterate();
611 blocker.block();
612 }
613 catch (Throwable failure)
614 {
615 if (LOG.isDebugEnabled())
616 LOG.debug(failure);
617 abort(failure);
618 throw failure;
619 }
620 }
621
622
623
624
625
626
627
628 public void sendContent(ReadableByteChannel in) throws IOException
629 {
630 try(Blocker blocker = _writeBlock.acquire())
631 {
632 new ReadableByteChannelWritingCB(in, blocker).iterate();
633 blocker.block();
634 }
635 catch (Throwable failure)
636 {
637 if (LOG.isDebugEnabled())
638 LOG.debug(failure);
639 abort(failure);
640 throw failure;
641 }
642 }
643
644
645
646
647
648
649
650 public void sendContent(HttpContent content) throws IOException
651 {
652 try(Blocker blocker = _writeBlock.acquire())
653 {
654 sendContent(content, blocker);
655 blocker.block();
656 }
657 catch (Throwable failure)
658 {
659 if (LOG.isDebugEnabled())
660 LOG.debug(failure);
661 abort(failure);
662 throw failure;
663 }
664 }
665
666
667
668
669
670
671 public void sendContent(ByteBuffer content, final Callback callback)
672 {
673 if (LOG.isDebugEnabled())
674 LOG.debug("sendContent(buffer={},{})",BufferUtil.toDetailString(content),callback);
675
676 write(content, true, new Callback()
677 {
678 @Override
679 public void succeeded()
680 {
681 closed();
682 callback.succeeded();
683 }
684
685 @Override
686 public void failed(Throwable x)
687 {
688 abort(x);
689 callback.failed(x);
690 }
691 });
692 }
693
694
695
696
697
698
699
700
701 public void sendContent(InputStream in, Callback callback)
702 {
703 if (LOG.isDebugEnabled())
704 LOG.debug("sendContent(stream={},{})",in,callback);
705
706 new InputStreamWritingCB(in, callback).iterate();
707 }
708
709
710
711
712
713
714
715
716 public void sendContent(ReadableByteChannel in, Callback callback)
717 {
718 if (LOG.isDebugEnabled())
719 LOG.debug("sendContent(channel={},{})",in,callback);
720
721 new ReadableByteChannelWritingCB(in, callback).iterate();
722 }
723
724
725
726
727
728
729
730 public void sendContent(HttpContent httpContent, Callback callback)
731 {
732 if (LOG.isDebugEnabled())
733 LOG.debug("sendContent(http={},{})",httpContent,callback);
734
735 if (BufferUtil.hasContent(_aggregate))
736 {
737 callback.failed(new IOException("cannot sendContent() after write()"));
738 return;
739 }
740 if (_channel.isCommitted())
741 {
742 callback.failed(new IOException("committed"));
743 return;
744 }
745
746 while (true)
747 {
748 switch(_state.get())
749 {
750 case OPEN:
751 if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
752 continue;
753 break;
754
755 case ERROR:
756 callback.failed(new EofException(_onError));
757 return;
758
759 case CLOSED:
760 callback.failed(new EofException("Closed"));
761 return;
762
763 default:
764 throw new IllegalStateException();
765 }
766 break;
767 }
768
769
770 ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
771 if (buffer == null)
772 buffer = httpContent.getIndirectBuffer();
773
774 if (buffer!=null)
775 {
776 sendContent(buffer,callback);
777 return;
778 }
779
780 try
781 {
782 ReadableByteChannel rbc=httpContent.getReadableByteChannel();
783 if (rbc!=null)
784 {
785
786 sendContent(rbc,callback);
787 return;
788 }
789
790 InputStream in = httpContent.getInputStream();
791 if (in!=null)
792 {
793 sendContent(in,callback);
794 return;
795 }
796
797 throw new IllegalArgumentException("unknown content for "+httpContent);
798 }
799 catch(Throwable th)
800 {
801 abort(th);
802 callback.failed(th);
803 }
804 }
805
806 public int getBufferSize()
807 {
808 return _bufferSize;
809 }
810
811 public void setBufferSize(int size)
812 {
813 _bufferSize = size;
814 _commitSize = size;
815 }
816
817 public void recycle()
818 {
819 resetBuffer();
820 _interceptor=_channel;
821 }
822
823 public void resetBuffer()
824 {
825 _written = 0;
826 if (BufferUtil.hasContent(_aggregate))
827 BufferUtil.clear(_aggregate);
828 reopen();
829 }
830
831 @Override
832 public void setWriteListener(WriteListener writeListener)
833 {
834 if (!_channel.getState().isAsync())
835 throw new IllegalStateException("!ASYNC");
836
837 if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
838 {
839 _writeListener = writeListener;
840 if (_channel.getState().onWritePossible())
841 _channel.execute(_channel);
842 }
843 else
844 throw new IllegalStateException();
845 }
846
847
848
849
850 @Override
851 public boolean isReady()
852 {
853 while (true)
854 {
855 switch(_state.get())
856 {
857 case OPEN:
858 return true;
859
860 case ASYNC:
861 if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
862 continue;
863 return true;
864
865 case READY:
866 return true;
867
868 case PENDING:
869 if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
870 continue;
871 return false;
872
873 case UNREADY:
874 return false;
875
876 case ERROR:
877 return true;
878
879 case CLOSED:
880 return true;
881
882 default:
883 throw new IllegalStateException();
884 }
885 }
886 }
887
888 @Override
889 public void run()
890 {
891 loop: while (true)
892 {
893 OutputState state = _state.get();
894
895 if(_onError!=null)
896 {
897 switch(state)
898 {
899 case CLOSED:
900 case ERROR:
901 {
902 _onError=null;
903 break loop;
904 }
905 default:
906 {
907 if (_state.compareAndSet(state, OutputState.ERROR))
908 {
909 Throwable th=_onError;
910 _onError=null;
911 if (LOG.isDebugEnabled())
912 LOG.debug("onError",th);
913 _writeListener.onError(th);
914 close();
915 break loop;
916 }
917 }
918 }
919 continue;
920 }
921
922 switch(_state.get())
923 {
924 case CLOSED:
925
926
927
928
929 case PENDING:
930 case UNREADY:
931 case READY:
932 try
933 {
934 _writeListener.onWritePossible();
935 break loop;
936 }
937 catch (Throwable e)
938 {
939 _onError = e;
940 }
941 break;
942
943 default:
944 _onError=new IllegalStateException("state="+_state.get());
945 }
946 }
947 }
948
949 private void close(Closeable resource)
950 {
951 try
952 {
953 resource.close();
954 }
955 catch (Throwable x)
956 {
957 LOG.ignore(x);
958 }
959 }
960
961 @Override
962 public String toString()
963 {
964 return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
965 }
966
967 private abstract class AsyncICB extends IteratingCallback
968 {
969 @Override
970 protected void onCompleteSuccess()
971 {
972 while(true)
973 {
974 OutputState last=_state.get();
975 switch(last)
976 {
977 case PENDING:
978 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
979 continue;
980 break;
981
982 case UNREADY:
983 if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
984 continue;
985 if (_channel.getState().onWritePossible())
986 _channel.execute(_channel);
987 break;
988
989 case CLOSED:
990 break;
991
992 default:
993 throw new IllegalStateException();
994 }
995 break;
996 }
997 }
998
999 @Override
1000 public void onCompleteFailure(Throwable e)
1001 {
1002 _onError=e==null?new IOException():e;
1003 if (_channel.getState().onWritePossible())
1004 _channel.execute(_channel);
1005 }
1006 }
1007
1008 private class AsyncFlush extends AsyncICB
1009 {
1010 protected volatile boolean _flushed;
1011
1012 public AsyncFlush()
1013 {
1014 }
1015
1016 @Override
1017 protected Action process()
1018 {
1019 if (BufferUtil.hasContent(_aggregate))
1020 {
1021 _flushed=true;
1022 write(_aggregate, false, this);
1023 return Action.SCHEDULED;
1024 }
1025
1026 if (!_flushed)
1027 {
1028 _flushed=true;
1029 write(BufferUtil.EMPTY_BUFFER,false,this);
1030 return Action.SCHEDULED;
1031 }
1032
1033 return Action.SUCCEEDED;
1034 }
1035 }
1036
1037 private class AsyncWrite extends AsyncICB
1038 {
1039 private final ByteBuffer _buffer;
1040 private final ByteBuffer _slice;
1041 private final boolean _complete;
1042 private final int _len;
1043 protected volatile boolean _completed;
1044
1045 public AsyncWrite(byte[] b, int off, int len, boolean complete)
1046 {
1047 _buffer=ByteBuffer.wrap(b, off, len);
1048 _len=len;
1049
1050 _slice=_len<getBufferSize()?null:_buffer.duplicate();
1051 _complete=complete;
1052 }
1053
1054 public AsyncWrite(ByteBuffer buffer, boolean complete)
1055 {
1056 _buffer=buffer;
1057 _len=buffer.remaining();
1058
1059 if (_buffer.isDirect()||_len<getBufferSize())
1060 _slice=null;
1061 else
1062 {
1063 _slice=_buffer.duplicate();
1064 _buffer.position(_buffer.limit());
1065 }
1066 _complete=complete;
1067 }
1068
1069 @Override
1070 protected Action process()
1071 {
1072
1073 if (BufferUtil.hasContent(_aggregate))
1074 {
1075 _completed=_len==0;
1076 write(_aggregate, _complete && _completed, this);
1077 return Action.SCHEDULED;
1078 }
1079
1080
1081 if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
1082 {
1083 int position = BufferUtil.flipToFill(_aggregate);
1084 BufferUtil.put(_buffer,_aggregate);
1085 BufferUtil.flipToFlush(_aggregate, position);
1086 return Action.SUCCEEDED;
1087 }
1088
1089
1090 if (_buffer.hasRemaining())
1091 {
1092
1093 if (_slice==null)
1094 {
1095 _completed=true;
1096 write(_buffer, _complete, this);
1097 return Action.SCHEDULED;
1098 }
1099
1100
1101 int p=_buffer.position();
1102 int l=Math.min(getBufferSize(),_buffer.remaining());
1103 int pl=p+l;
1104 _slice.limit(pl);
1105 _buffer.position(pl);
1106 _slice.position(p);
1107 _completed=!_buffer.hasRemaining();
1108 write(_slice, _complete && _completed, this);
1109 return Action.SCHEDULED;
1110 }
1111
1112
1113
1114 if (_complete && !_completed)
1115 {
1116 _completed=true;
1117 write(BufferUtil.EMPTY_BUFFER, true, this);
1118 return Action.SCHEDULED;
1119 }
1120
1121 if (LOG.isDebugEnabled() && _completed)
1122 LOG.debug("EOF of {}",this);
1123 return Action.SUCCEEDED;
1124 }
1125
1126 @Override
1127 protected void onCompleteSuccess()
1128 {
1129 super.onCompleteSuccess();
1130 if (_complete)
1131 closed();
1132 }
1133 }
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143 private class InputStreamWritingCB extends IteratingNestedCallback
1144 {
1145 private final InputStream _in;
1146 private final ByteBuffer _buffer;
1147 private boolean _eof;
1148
1149 public InputStreamWritingCB(InputStream in, Callback callback)
1150 {
1151 super(callback);
1152 _in=in;
1153 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1154 }
1155
1156 @Override
1157 protected Action process() throws Exception
1158 {
1159
1160
1161 if (_eof)
1162 {
1163 if (LOG.isDebugEnabled())
1164 LOG.debug("EOF of {}",this);
1165
1166 _in.close();
1167 closed();
1168 _channel.getByteBufferPool().release(_buffer);
1169 return Action.SUCCEEDED;
1170 }
1171
1172
1173 int len=0;
1174 while (len<_buffer.capacity() && !_eof)
1175 {
1176 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1177 if (r<0)
1178 _eof=true;
1179 else
1180 len+=r;
1181 }
1182
1183
1184 _buffer.position(0);
1185 _buffer.limit(len);
1186 write(_buffer,_eof,this);
1187 return Action.SCHEDULED;
1188 }
1189
1190 @Override
1191 public void onCompleteFailure(Throwable x)
1192 {
1193 abort(x);
1194 _channel.getByteBufferPool().release(_buffer);
1195 HttpOutput.this.close(_in);
1196 super.onCompleteFailure(x);
1197 }
1198 }
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209 private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1210 {
1211 private final ReadableByteChannel _in;
1212 private final ByteBuffer _buffer;
1213 private boolean _eof;
1214
1215 public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1216 {
1217 super(callback);
1218 _in=in;
1219 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1220 }
1221
1222 @Override
1223 protected Action process() throws Exception
1224 {
1225
1226
1227 if (_eof)
1228 {
1229 if (LOG.isDebugEnabled())
1230 LOG.debug("EOF of {}",this);
1231 _in.close();
1232 closed();
1233 _channel.getByteBufferPool().release(_buffer);
1234 return Action.SUCCEEDED;
1235 }
1236
1237
1238 _buffer.clear();
1239 while (_buffer.hasRemaining() && !_eof)
1240 _eof = (_in.read(_buffer)) < 0;
1241
1242
1243 _buffer.flip();
1244 write(_buffer,_eof,this);
1245
1246 return Action.SCHEDULED;
1247 }
1248
1249 @Override
1250 public void onCompleteFailure(Throwable x)
1251 {
1252 abort(x);
1253 _channel.getByteBufferPool().release(_buffer);
1254 HttpOutput.this.close(_in);
1255 super.onCompleteFailure(x);
1256 }
1257 }
1258 }