1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.ReadableByteChannel;
26 import java.nio.channels.WritePendingException;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import javax.servlet.RequestDispatcher;
30 import javax.servlet.ServletOutputStream;
31 import javax.servlet.ServletRequest;
32 import javax.servlet.ServletResponse;
33 import javax.servlet.WriteListener;
34
35 import org.eclipse.jetty.http.HttpContent;
36 import org.eclipse.jetty.io.EofException;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.Callback;
39 import org.eclipse.jetty.util.IteratingCallback;
40 import org.eclipse.jetty.util.IteratingNestedCallback;
41 import org.eclipse.jetty.util.SharedBlockingCallback;
42 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45
46
47
48
49
50
51
52
53
54
55
56 public class HttpOutput extends ServletOutputStream implements Runnable
57 {
58 public interface Interceptor
59 {
60 void write(ByteBuffer content, boolean complete, Callback callback);
61 Interceptor getNextInterceptor();
62 boolean isOptimizedForDirectBuffers();
63 }
64
65 private static Logger LOG = Log.getLogger(HttpOutput.class);
66
67 private final HttpChannel _channel;
68 private final SharedBlockingCallback _writeBlock;
69 private Interceptor _interceptor;
70
71
72 private long _written;
73
74 private ByteBuffer _aggregate;
75 private int _bufferSize;
76 private int _commitSize;
77 private WriteListener _writeListener;
78 private volatile Throwable _onError;
79
80
81
82
83
84
85
86
87
88
89 private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
90 private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
91
92 public HttpOutput(HttpChannel channel)
93 {
94 _channel = channel;
95 _interceptor = channel;
96 _writeBlock = new SharedBlockingCallback()
97 {
98 @Override
99 protected long getIdleTimeout()
100 {
101 long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout();
102 if (bto>0)
103 return bto;
104 if (bto<0)
105 return -1;
106 return _channel.getIdleTimeout();
107 }
108 };
109 HttpConfiguration config = channel.getHttpConfiguration();
110 _bufferSize = config.getOutputBufferSize();
111 _commitSize = config.getOutputAggregationSize();
112 if (_commitSize>_bufferSize)
113 {
114 LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize);
115 _commitSize=_bufferSize;
116 }
117 }
118
119 public HttpChannel getHttpChannel()
120 {
121 return _channel;
122 }
123
124 public Interceptor getInterceptor()
125 {
126 return _interceptor;
127 }
128
129 public void setInterceptor(Interceptor filter)
130 {
131 _interceptor=filter;
132 }
133
134 public boolean isWritten()
135 {
136 return _written > 0;
137 }
138
139 public long getWritten()
140 {
141 return _written;
142 }
143
144 public void reopen()
145 {
146 _state.set(OutputState.OPEN);
147 }
148
149 public boolean isAllContentWritten()
150 {
151 return _channel.getResponse().isAllContentWritten(_written);
152 }
153
154 protected Blocker acquireWriteBlockingCallback() throws IOException
155 {
156 return _writeBlock.acquire();
157 }
158
159 private void write(ByteBuffer content, boolean complete) throws IOException
160 {
161 try (Blocker blocker = _writeBlock.acquire())
162 {
163 write(content, complete, blocker);
164 blocker.block();
165 }
166 catch (Throwable failure)
167 {
168 if (LOG.isDebugEnabled())
169 LOG.debug(failure);
170 abort(failure);
171 throw failure;
172 }
173 }
174
175 protected void write(ByteBuffer content, boolean complete, Callback callback)
176 {
177 _interceptor.write(content, complete, callback);
178 }
179
180 private void abort(Throwable failure)
181 {
182 closed();
183 _channel.abort(failure);
184 }
185
186 @Override
187 public void close()
188 {
189 while(true)
190 {
191 OutputState state=_state.get();
192 switch (state)
193 {
194 case CLOSED:
195 {
196 return;
197 }
198 case UNREADY:
199 {
200 if (_state.compareAndSet(state,OutputState.ERROR))
201 _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
202 break;
203 }
204 default:
205 {
206 if (!_state.compareAndSet(state,OutputState.CLOSED))
207 break;
208
209 try
210 {
211 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
212 }
213 catch (IOException x)
214 {
215
216 }
217 finally
218 {
219 releaseBuffer();
220 }
221
222 return;
223 }
224 }
225 }
226 }
227
228
229
230
231
232 void closed()
233 {
234 while(true)
235 {
236 OutputState state=_state.get();
237 switch (state)
238 {
239 case CLOSED:
240 {
241 return;
242 }
243 case UNREADY:
244 {
245 if (_state.compareAndSet(state,OutputState.ERROR))
246 _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
247 break;
248 }
249 default:
250 {
251 if (!_state.compareAndSet(state, OutputState.CLOSED))
252 break;
253
254 try
255 {
256 _channel.getResponse().closeOutput();
257 }
258 catch (Throwable x)
259 {
260 if (LOG.isDebugEnabled())
261 LOG.debug(x);
262 abort(x);
263 }
264 finally
265 {
266 releaseBuffer();
267 }
268
269 return;
270 }
271 }
272 }
273 }
274
275 private void releaseBuffer()
276 {
277 if (_aggregate != null)
278 {
279 _channel.getConnector().getByteBufferPool().release(_aggregate);
280 _aggregate = null;
281 }
282 }
283
284 public boolean isClosed()
285 {
286 return _state.get()==OutputState.CLOSED;
287 }
288
289 @Override
290 public void flush() throws IOException
291 {
292 while(true)
293 {
294 switch(_state.get())
295 {
296 case OPEN:
297 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
298 return;
299
300 case ASYNC:
301 throw new IllegalStateException("isReady() not called");
302
303 case READY:
304 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
305 continue;
306 new AsyncFlush().iterate();
307 return;
308
309 case PENDING:
310 case UNREADY:
311 throw new WritePendingException();
312
313 case ERROR:
314 throw new EofException(_onError);
315
316 case CLOSED:
317 return;
318
319 default:
320 throw new IllegalStateException();
321 }
322 }
323 }
324
325 @Override
326 public void write(byte[] b, int off, int len) throws IOException
327 {
328 _written+=len;
329 boolean complete=_channel.getResponse().isAllContentWritten(_written);
330
331
332 while(true)
333 {
334 switch(_state.get())
335 {
336 case OPEN:
337
338 break;
339
340 case ASYNC:
341 throw new IllegalStateException("isReady() not called");
342
343 case READY:
344 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
345 continue;
346
347
348 if (!complete && len<=_commitSize)
349 {
350 if (_aggregate == null)
351 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
352
353
354 int filled = BufferUtil.fill(_aggregate, b, off, len);
355
356
357 if (filled==len && !BufferUtil.isFull(_aggregate))
358 {
359 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
360 throw new IllegalStateException();
361 return;
362 }
363
364
365 off+=filled;
366 len-=filled;
367 }
368
369
370 new AsyncWrite(b,off,len,complete).iterate();
371 return;
372
373 case PENDING:
374 case UNREADY:
375 throw new WritePendingException();
376
377 case ERROR:
378 throw new EofException(_onError);
379
380 case CLOSED:
381 throw new EofException("Closed");
382
383 default:
384 throw new IllegalStateException();
385 }
386 break;
387 }
388
389
390
391
392 int capacity = getBufferSize();
393 if (!complete && len<=_commitSize)
394 {
395 if (_aggregate == null)
396 _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
397
398
399 int filled = BufferUtil.fill(_aggregate, b, off, len);
400
401
402 if (filled==len && !BufferUtil.isFull(_aggregate))
403 return;
404
405
406 off+=filled;
407 len-=filled;
408 }
409
410
411 if (BufferUtil.hasContent(_aggregate))
412 {
413 write(_aggregate, complete && len==0);
414
415
416 if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate))
417 {
418 BufferUtil.append(_aggregate, b, off, len);
419 return;
420 }
421 }
422
423
424 if (len>0)
425 {
426 ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
427 ByteBuffer view = wrap.duplicate();
428
429
430
431 while (len>getBufferSize())
432 {
433 int p=view.position();
434 int l=p+getBufferSize();
435 view.limit(p+getBufferSize());
436 write(view,false);
437 len-=getBufferSize();
438 view.limit(l+Math.min(len,getBufferSize()));
439 view.position(l);
440 }
441 write(view,complete);
442 }
443 else if (complete)
444 {
445 write(BufferUtil.EMPTY_BUFFER,true);
446 }
447
448 if (complete)
449 closed();
450 }
451
452 public void write(ByteBuffer buffer) throws IOException
453 {
454 _written+=buffer.remaining();
455 boolean complete=_channel.getResponse().isAllContentWritten(_written);
456
457
458 while(true)
459 {
460 switch(_state.get())
461 {
462 case OPEN:
463
464 break;
465
466 case ASYNC:
467 throw new IllegalStateException("isReady() not called");
468
469 case READY:
470 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
471 continue;
472
473
474 new AsyncWrite(buffer,complete).iterate();
475 return;
476
477 case PENDING:
478 case UNREADY:
479 throw new WritePendingException();
480
481 case ERROR:
482 throw new EofException(_onError);
483
484 case CLOSED:
485 throw new EofException("Closed");
486
487 default:
488 throw new IllegalStateException();
489 }
490 break;
491 }
492
493
494
495 int len=BufferUtil.length(buffer);
496
497
498 if (BufferUtil.hasContent(_aggregate))
499 write(_aggregate, complete && len==0);
500
501
502 if (len>0)
503 write(buffer, complete);
504 else if (complete)
505 write(BufferUtil.EMPTY_BUFFER, true);
506
507 if (complete)
508 closed();
509 }
510
511 @Override
512 public void write(int b) throws IOException
513 {
514 _written+=1;
515 boolean complete=_channel.getResponse().isAllContentWritten(_written);
516
517
518 while(true)
519 {
520 switch(_state.get())
521 {
522 case OPEN:
523 if (_aggregate == null)
524 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
525 BufferUtil.append(_aggregate, (byte)b);
526
527
528 if (complete || BufferUtil.isFull(_aggregate))
529 {
530 write(_aggregate, complete);
531 if (complete)
532 closed();
533 }
534 break;
535
536 case ASYNC:
537 throw new IllegalStateException("isReady() not called");
538
539 case READY:
540 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
541 continue;
542
543 if (_aggregate == null)
544 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
545 BufferUtil.append(_aggregate, (byte)b);
546
547
548 if (!complete && !BufferUtil.isFull(_aggregate))
549 {
550 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
551 throw new IllegalStateException();
552 return;
553 }
554
555
556 new AsyncFlush().iterate();
557 return;
558
559 case PENDING:
560 case UNREADY:
561 throw new WritePendingException();
562
563 case ERROR:
564 throw new EofException(_onError);
565
566 case CLOSED:
567 throw new EofException("Closed");
568
569 default:
570 throw new IllegalStateException();
571 }
572 break;
573 }
574 }
575
576 @Override
577 public void print(String s) throws IOException
578 {
579 if (isClosed())
580 throw new IOException("Closed");
581
582 write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
583 }
584
585
586
587
588
589
590
591 public void sendContent(ByteBuffer content) throws IOException
592 {
593 if (LOG.isDebugEnabled())
594 LOG.debug("sendContent({})",BufferUtil.toDetailString(content));
595
596 write(content, true);
597 closed();
598 }
599
600
601
602
603
604
605
606 public void sendContent(InputStream in) throws IOException
607 {
608 try(Blocker blocker = _writeBlock.acquire())
609 {
610 new InputStreamWritingCB(in, blocker).iterate();
611 blocker.block();
612 }
613 catch (Throwable failure)
614 {
615 if (LOG.isDebugEnabled())
616 LOG.debug(failure);
617 abort(failure);
618 throw failure;
619 }
620 }
621
622
623
624
625
626
627
628 public void sendContent(ReadableByteChannel in) throws IOException
629 {
630 try(Blocker blocker = _writeBlock.acquire())
631 {
632 new ReadableByteChannelWritingCB(in, blocker).iterate();
633 blocker.block();
634 }
635 catch (Throwable failure)
636 {
637 if (LOG.isDebugEnabled())
638 LOG.debug(failure);
639 abort(failure);
640 throw failure;
641 }
642 }
643
644
645
646
647
648
649
650 public void sendContent(HttpContent content) throws IOException
651 {
652 try(Blocker blocker = _writeBlock.acquire())
653 {
654 sendContent(content, blocker);
655 blocker.block();
656 }
657 catch (Throwable failure)
658 {
659 if (LOG.isDebugEnabled())
660 LOG.debug(failure);
661 abort(failure);
662 throw failure;
663 }
664 }
665
666
667
668
669
670
671 public void sendContent(ByteBuffer content, final Callback callback)
672 {
673 if (LOG.isDebugEnabled())
674 LOG.debug("sendContent(buffer={},{})",BufferUtil.toDetailString(content),callback);
675
676 write(content, true, new Callback()
677 {
678 @Override
679 public void succeeded()
680 {
681 closed();
682 callback.succeeded();
683 }
684
685 @Override
686 public void failed(Throwable x)
687 {
688 abort(x);
689 callback.failed(x);
690 }
691 });
692 }
693
694
695
696
697
698
699
700
701 public void sendContent(InputStream in, Callback callback)
702 {
703 if (LOG.isDebugEnabled())
704 LOG.debug("sendContent(stream={},{})",in,callback);
705
706 new InputStreamWritingCB(in, callback).iterate();
707 }
708
709
710
711
712
713
714
715
716 public void sendContent(ReadableByteChannel in, Callback callback)
717 {
718 if (LOG.isDebugEnabled())
719 LOG.debug("sendContent(channel={},{})",in,callback);
720
721 new ReadableByteChannelWritingCB(in, callback).iterate();
722 }
723
724
725
726
727
728
729
730 public void sendContent(HttpContent httpContent, Callback callback)
731 {
732 if (LOG.isDebugEnabled())
733 LOG.debug("sendContent(http={},{})",httpContent,callback);
734
735 if (BufferUtil.hasContent(_aggregate))
736 {
737 callback.failed(new IOException("cannot sendContent() after write()"));
738 return;
739 }
740 if (_channel.isCommitted())
741 {
742 callback.failed(new IOException("committed"));
743 return;
744 }
745
746 while (true)
747 {
748 switch(_state.get())
749 {
750 case OPEN:
751 if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
752 continue;
753 break;
754
755 case ERROR:
756 callback.failed(new EofException(_onError));
757 return;
758
759 case CLOSED:
760 callback.failed(new EofException("Closed"));
761 return;
762
763 default:
764 throw new IllegalStateException();
765 }
766 break;
767 }
768
769 ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
770 if (buffer == null)
771 buffer = httpContent.getIndirectBuffer();
772
773 if (buffer!=null)
774 {
775 sendContent(buffer,callback);
776 return;
777 }
778
779 try
780 {
781 ReadableByteChannel rbc=httpContent.getReadableByteChannel();
782 if (rbc!=null)
783 {
784
785 sendContent(rbc,callback);
786 return;
787 }
788
789 InputStream in = httpContent.getInputStream();
790 if (in!=null)
791 {
792 sendContent(in,callback);
793 return;
794 }
795
796 throw new IllegalArgumentException("unknown content for "+httpContent);
797 }
798 catch(Throwable th)
799 {
800 abort(th);
801 callback.failed(th);
802 }
803 }
804
805 public int getBufferSize()
806 {
807 return _bufferSize;
808 }
809
810 public void setBufferSize(int size)
811 {
812 _bufferSize = size;
813 _commitSize = size;
814 }
815
816 public void recycle()
817 {
818 resetBuffer();
819 _interceptor=_channel;
820 }
821
822 public void resetBuffer()
823 {
824 _written = 0;
825 if (BufferUtil.hasContent(_aggregate))
826 BufferUtil.clear(_aggregate);
827 reopen();
828 }
829
830 @Override
831 public void setWriteListener(WriteListener writeListener)
832 {
833 if (!_channel.getState().isAsync())
834 throw new IllegalStateException("!ASYNC");
835
836 if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
837 {
838 _writeListener = writeListener;
839 if (_channel.getState().onWritePossible())
840 _channel.execute(_channel);
841 }
842 else
843 throw new IllegalStateException();
844 }
845
846
847
848
849 @Override
850 public boolean isReady()
851 {
852 while (true)
853 {
854 switch(_state.get())
855 {
856 case OPEN:
857 return true;
858
859 case ASYNC:
860 if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
861 continue;
862 return true;
863
864 case READY:
865 return true;
866
867 case PENDING:
868 if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
869 continue;
870 return false;
871
872 case UNREADY:
873 return false;
874
875 case ERROR:
876 return true;
877
878 case CLOSED:
879 return true;
880
881 default:
882 throw new IllegalStateException();
883 }
884 }
885 }
886
887 @Override
888 public void run()
889 {
890 loop: while (true)
891 {
892 OutputState state = _state.get();
893
894 if(_onError!=null)
895 {
896 switch(state)
897 {
898 case CLOSED:
899 case ERROR:
900 {
901 _onError=null;
902 break loop;
903 }
904 default:
905 {
906 if (_state.compareAndSet(state, OutputState.ERROR))
907 {
908 Throwable th=_onError;
909 _onError=null;
910 if (LOG.isDebugEnabled())
911 LOG.debug("onError",th);
912 _writeListener.onError(th);
913 close();
914 break loop;
915 }
916 }
917 }
918 continue;
919 }
920
921 switch(_state.get())
922 {
923 case CLOSED:
924
925
926
927
928 case READY:
929 try
930 {
931 _writeListener.onWritePossible();
932 break loop;
933 }
934 catch (Throwable e)
935 {
936 _onError = e;
937 }
938 break;
939
940 default:
941 _onError=new IllegalStateException("state="+_state.get());
942 }
943 }
944 }
945
946 private void close(Closeable resource)
947 {
948 try
949 {
950 resource.close();
951 }
952 catch (Throwable x)
953 {
954 LOG.ignore(x);
955 }
956 }
957
958 @Override
959 public String toString()
960 {
961 return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
962 }
963
964 private abstract class AsyncICB extends IteratingCallback
965 {
966 @Override
967 protected void onCompleteSuccess()
968 {
969 while(true)
970 {
971 OutputState last=_state.get();
972 switch(last)
973 {
974 case PENDING:
975 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
976 continue;
977 break;
978
979 case UNREADY:
980 if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
981 continue;
982 if (_channel.getState().onWritePossible())
983 _channel.execute(_channel);
984 break;
985
986 case CLOSED:
987 break;
988
989 default:
990 throw new IllegalStateException();
991 }
992 break;
993 }
994 }
995
996 @Override
997 public void onCompleteFailure(Throwable e)
998 {
999 _onError=e==null?new IOException():e;
1000 if (_channel.getState().onWritePossible())
1001 _channel.execute(_channel);
1002 }
1003 }
1004
1005 private class AsyncFlush extends AsyncICB
1006 {
1007 protected volatile boolean _flushed;
1008
1009 public AsyncFlush()
1010 {
1011 }
1012
1013 @Override
1014 protected Action process()
1015 {
1016 if (BufferUtil.hasContent(_aggregate))
1017 {
1018 _flushed=true;
1019 write(_aggregate, false, this);
1020 return Action.SCHEDULED;
1021 }
1022
1023 if (!_flushed)
1024 {
1025 _flushed=true;
1026 write(BufferUtil.EMPTY_BUFFER,false,this);
1027 return Action.SCHEDULED;
1028 }
1029
1030 return Action.SUCCEEDED;
1031 }
1032 }
1033
1034 private class AsyncWrite extends AsyncICB
1035 {
1036 private final ByteBuffer _buffer;
1037 private final ByteBuffer _slice;
1038 private final boolean _complete;
1039 private final int _len;
1040 protected volatile boolean _completed;
1041
1042 public AsyncWrite(byte[] b, int off, int len, boolean complete)
1043 {
1044 _buffer=ByteBuffer.wrap(b, off, len);
1045 _len=len;
1046
1047 _slice=_len<getBufferSize()?null:_buffer.duplicate();
1048 _complete=complete;
1049 }
1050
1051 public AsyncWrite(ByteBuffer buffer, boolean complete)
1052 {
1053 _buffer=buffer;
1054 _len=buffer.remaining();
1055
1056 if (_buffer.isDirect()||_len<getBufferSize())
1057 _slice=null;
1058 else
1059 {
1060 _slice=_buffer.duplicate();
1061 _buffer.position(_buffer.limit());
1062 }
1063 _complete=complete;
1064 }
1065
1066 @Override
1067 protected Action process()
1068 {
1069
1070 if (BufferUtil.hasContent(_aggregate))
1071 {
1072 _completed=_len==0;
1073 write(_aggregate, _complete && _completed, this);
1074 return Action.SCHEDULED;
1075 }
1076
1077
1078 if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
1079 {
1080 int position = BufferUtil.flipToFill(_aggregate);
1081 BufferUtil.put(_buffer,_aggregate);
1082 BufferUtil.flipToFlush(_aggregate, position);
1083 return Action.SUCCEEDED;
1084 }
1085
1086
1087 if (_buffer.hasRemaining())
1088 {
1089
1090 if (_slice==null)
1091 {
1092 _completed=true;
1093 write(_buffer, _complete, this);
1094 return Action.SCHEDULED;
1095 }
1096
1097
1098 int p=_buffer.position();
1099 int l=Math.min(getBufferSize(),_buffer.remaining());
1100 int pl=p+l;
1101 _slice.limit(pl);
1102 _buffer.position(pl);
1103 _slice.position(p);
1104 _completed=!_buffer.hasRemaining();
1105 write(_slice, _complete && _completed, this);
1106 return Action.SCHEDULED;
1107 }
1108
1109
1110
1111 if (_complete && !_completed)
1112 {
1113 _completed=true;
1114 write(BufferUtil.EMPTY_BUFFER, true, this);
1115 return Action.SCHEDULED;
1116 }
1117
1118 if (LOG.isDebugEnabled() && _completed)
1119 LOG.debug("EOF of {}",this);
1120 return Action.SUCCEEDED;
1121 }
1122
1123 @Override
1124 protected void onCompleteSuccess()
1125 {
1126 super.onCompleteSuccess();
1127 if (_complete)
1128 closed();
1129 }
1130 }
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140 private class InputStreamWritingCB extends IteratingNestedCallback
1141 {
1142 private final InputStream _in;
1143 private final ByteBuffer _buffer;
1144 private boolean _eof;
1145
1146 public InputStreamWritingCB(InputStream in, Callback callback)
1147 {
1148 super(callback);
1149 _in=in;
1150 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1151 }
1152
1153 @Override
1154 protected Action process() throws Exception
1155 {
1156
1157
1158 if (_eof)
1159 {
1160 if (LOG.isDebugEnabled())
1161 LOG.debug("EOF of {}",this);
1162
1163 _in.close();
1164 closed();
1165 _channel.getByteBufferPool().release(_buffer);
1166 return Action.SUCCEEDED;
1167 }
1168
1169
1170 int len=0;
1171 while (len<_buffer.capacity() && !_eof)
1172 {
1173 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1174 if (r<0)
1175 _eof=true;
1176 else
1177 len+=r;
1178 }
1179
1180
1181 _buffer.position(0);
1182 _buffer.limit(len);
1183 write(_buffer,_eof,this);
1184 return Action.SCHEDULED;
1185 }
1186
1187 @Override
1188 public void onCompleteFailure(Throwable x)
1189 {
1190 abort(x);
1191 _channel.getByteBufferPool().release(_buffer);
1192 HttpOutput.this.close(_in);
1193 super.onCompleteFailure(x);
1194 }
1195 }
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206 private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1207 {
1208 private final ReadableByteChannel _in;
1209 private final ByteBuffer _buffer;
1210 private boolean _eof;
1211
1212 public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1213 {
1214 super(callback);
1215 _in=in;
1216 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1217 }
1218
1219 @Override
1220 protected Action process() throws Exception
1221 {
1222
1223
1224 if (_eof)
1225 {
1226 if (LOG.isDebugEnabled())
1227 LOG.debug("EOF of {}",this);
1228 _in.close();
1229 closed();
1230 _channel.getByteBufferPool().release(_buffer);
1231 return Action.SUCCEEDED;
1232 }
1233
1234
1235 _buffer.clear();
1236 while (_buffer.hasRemaining() && !_eof)
1237 _eof = (_in.read(_buffer)) < 0;
1238
1239
1240 _buffer.flip();
1241 write(_buffer,_eof,this);
1242
1243 return Action.SCHEDULED;
1244 }
1245
1246 @Override
1247 public void onCompleteFailure(Throwable x)
1248 {
1249 abort(x);
1250 _channel.getByteBufferPool().release(_buffer);
1251 HttpOutput.this.close(_in);
1252 super.onCompleteFailure(x);
1253 }
1254 }
1255 }