1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ReadableByteChannel;
25 import java.nio.channels.WritePendingException;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.servlet.RequestDispatcher;
28 import javax.servlet.ServletOutputStream;
29 import javax.servlet.ServletRequest;
30 import javax.servlet.ServletResponse;
31 import javax.servlet.WriteListener;
32
33 import org.eclipse.jetty.http.HttpContent;
34 import org.eclipse.jetty.io.EofException;
35 import org.eclipse.jetty.util.BufferUtil;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.IteratingCallback;
38 import org.eclipse.jetty.util.IteratingNestedCallback;
39 import org.eclipse.jetty.util.SharedBlockingCallback;
40 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43
44
45
46
47
48
49
50
51
52
53
54 public class HttpOutput extends ServletOutputStream implements Runnable
55 {
56 private static Logger LOG = Log.getLogger(HttpOutput.class);
57 private final HttpChannel<?> _channel;
58 private final SharedBlockingCallback _writeblock=new SharedBlockingCallback()
59 {
60 @Override
61 protected long getIdleTimeout()
62 {
63 return _channel.getIdleTimeout();
64 }
65 };
66 private long _written;
67 private ByteBuffer _aggregate;
68 private int _bufferSize;
69 private int _commitSize;
70 private WriteListener _writeListener;
71 private volatile Throwable _onError;
72
73
74
75
76
77
78
79
80
81
82
83
84 enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
85 private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
86
87 public HttpOutput(HttpChannel<?> channel)
88 {
89 _channel = channel;
90 HttpConfiguration config = channel.getHttpConfiguration();
91 _bufferSize = config.getOutputBufferSize();
92 _commitSize = config.getOutputAggregationSize();
93 }
94
95 public HttpChannel<?> getHttpChannel()
96 {
97 return _channel;
98 }
99
100 public boolean isWritten()
101 {
102 return _written > 0;
103 }
104
105 public long getWritten()
106 {
107 return _written;
108 }
109
110 public void reset()
111 {
112 _written = 0;
113 reopen();
114 }
115
116 public void reopen()
117 {
118 _state.set(OutputState.OPEN);
119 }
120
121 public boolean isAllContentWritten()
122 {
123 return _channel.getResponse().isAllContentWritten(_written);
124 }
125
126 protected Blocker acquireWriteBlockingCallback() throws IOException
127 {
128 return _writeblock.acquire();
129 }
130
131 protected void write(ByteBuffer content, boolean complete) throws IOException
132 {
133 try (Blocker blocker=_writeblock.acquire())
134 {
135 write(content,complete,blocker);
136 blocker.block();
137 }
138 }
139
140 protected void write(ByteBuffer content, boolean complete, Callback callback)
141 {
142 _channel.write(content,complete,callback);
143 }
144
145 @Override
146 public void close()
147 {
148 loop: while(true)
149 {
150 OutputState state=_state.get();
151 switch (state)
152 {
153 case CLOSED:
154 break loop;
155
156 case UNREADY:
157 if (_state.compareAndSet(state,OutputState.ERROR))
158 _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
159 continue;
160
161 default:
162 if (_state.compareAndSet(state,OutputState.CLOSED))
163 {
164 try
165 {
166 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
167 }
168 catch(IOException e)
169 {
170 LOG.debug(e);
171 _channel.abort();
172 }
173 releaseBuffer();
174 return;
175 }
176 }
177 }
178 }
179
180
181 void closed()
182 {
183 loop: while(true)
184 {
185 OutputState state=_state.get();
186 switch (state)
187 {
188 case CLOSED:
189 break loop;
190
191 case UNREADY:
192 if (_state.compareAndSet(state,OutputState.ERROR))
193 _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
194 continue;
195
196 default:
197 if (_state.compareAndSet(state,OutputState.CLOSED))
198 {
199 try
200 {
201 _channel.getResponse().closeOutput();
202 }
203 catch(IOException e)
204 {
205 LOG.debug(e);
206 _channel.abort();
207 }
208 releaseBuffer();
209 return;
210 }
211 }
212 }
213 }
214
215 private void releaseBuffer()
216 {
217 if (_aggregate != null)
218 {
219 _channel.getConnector().getByteBufferPool().release(_aggregate);
220 _aggregate = null;
221 }
222 }
223
224 public boolean isClosed()
225 {
226 return _state.get()==OutputState.CLOSED;
227 }
228
229 @Override
230 public void flush() throws IOException
231 {
232 while(true)
233 {
234 switch(_state.get())
235 {
236 case OPEN:
237 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
238 return;
239
240 case ASYNC:
241 throw new IllegalStateException("isReady() not called");
242
243 case READY:
244 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
245 continue;
246 new AsyncFlush().iterate();
247 return;
248
249 case PENDING:
250 case UNREADY:
251 throw new WritePendingException();
252
253 case ERROR:
254 throw new EofException(_onError);
255
256 case CLOSED:
257 return;
258 }
259 break;
260 }
261 }
262
263
264 @Override
265 public void write(byte[] b, int off, int len) throws IOException
266 {
267 _written+=len;
268 boolean complete=_channel.getResponse().isAllContentWritten(_written);
269
270
271 while(true)
272 {
273 switch(_state.get())
274 {
275 case OPEN:
276
277 break;
278
279 case ASYNC:
280 throw new IllegalStateException("isReady() not called");
281
282 case READY:
283 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
284 continue;
285
286
287 if (!complete && len<=_commitSize)
288 {
289 if (_aggregate == null)
290 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
291
292
293 int filled = BufferUtil.fill(_aggregate, b, off, len);
294
295
296 if (filled==len && !BufferUtil.isFull(_aggregate))
297 {
298 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
299 throw new IllegalStateException();
300 return;
301 }
302
303
304 off+=filled;
305 len-=filled;
306 }
307
308
309 new AsyncWrite(b,off,len,complete).iterate();
310 return;
311
312 case PENDING:
313 case UNREADY:
314 throw new WritePendingException();
315
316 case ERROR:
317 throw new EofException(_onError);
318
319 case CLOSED:
320 throw new EofException("Closed");
321 }
322 break;
323 }
324
325
326
327
328
329 int capacity = getBufferSize();
330 if (!complete && len<=_commitSize)
331 {
332 if (_aggregate == null)
333 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
334
335
336 int filled = BufferUtil.fill(_aggregate, b, off, len);
337
338
339 if (filled==len && !BufferUtil.isFull(_aggregate))
340 return;
341
342
343 off+=filled;
344 len-=filled;
345 }
346
347
348 if (BufferUtil.hasContent(_aggregate))
349 {
350 write(_aggregate, complete && len==0);
351
352
353 if (len>0 && !complete && len<=_commitSize)
354 {
355 BufferUtil.append(_aggregate, b, off, len);
356 return;
357 }
358 }
359
360
361 if (len>0)
362 {
363 ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
364 ByteBuffer view = wrap.duplicate();
365
366
367
368 while (len>getBufferSize())
369 {
370 int p=view.position();
371 int l=p+getBufferSize();
372 view.limit(p+getBufferSize());
373 write(view,false);
374 len-=getBufferSize();
375 view.limit(l+Math.min(len,getBufferSize()));
376 view.position(l);
377 }
378 write(view,complete);
379 }
380 else if (complete)
381 write(BufferUtil.EMPTY_BUFFER,complete);
382
383 if (complete)
384 closed();
385
386 }
387
388 public void write(ByteBuffer buffer) throws IOException
389 {
390 _written+=buffer.remaining();
391 boolean complete=_channel.getResponse().isAllContentWritten(_written);
392
393
394 while(true)
395 {
396 switch(_state.get())
397 {
398 case OPEN:
399
400 break;
401
402 case ASYNC:
403 throw new IllegalStateException("isReady() not called");
404
405 case READY:
406 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
407 continue;
408
409
410 new AsyncWrite(buffer,complete).iterate();
411 return;
412
413 case PENDING:
414 case UNREADY:
415 throw new WritePendingException();
416
417 case ERROR:
418 throw new EofException(_onError);
419
420 case CLOSED:
421 throw new EofException("Closed");
422 }
423 break;
424 }
425
426
427
428 int len=BufferUtil.length(buffer);
429
430
431 if (BufferUtil.hasContent(_aggregate))
432 write(_aggregate, complete && len==0);
433
434
435 if (len>0)
436 write(buffer, complete);
437 else if (complete)
438 write(BufferUtil.EMPTY_BUFFER,complete);
439
440 if (complete)
441 closed();
442 }
443
444 @Override
445 public void write(int b) throws IOException
446 {
447 _written+=1;
448 boolean complete=_channel.getResponse().isAllContentWritten(_written);
449
450
451 while(true)
452 {
453 switch(_state.get())
454 {
455 case OPEN:
456 if (_aggregate == null)
457 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
458 BufferUtil.append(_aggregate, (byte)b);
459
460
461 if (complete || BufferUtil.isFull(_aggregate))
462 {
463 try(Blocker blocker=_writeblock.acquire())
464 {
465 write(_aggregate, complete, blocker);
466 blocker.block();
467 }
468 if (complete)
469 closed();
470 }
471 break;
472
473 case ASYNC:
474 throw new IllegalStateException("isReady() not called");
475
476 case READY:
477 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
478 continue;
479
480 if (_aggregate == null)
481 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
482 BufferUtil.append(_aggregate, (byte)b);
483
484
485 if (!complete && !BufferUtil.isFull(_aggregate))
486 {
487 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
488 throw new IllegalStateException();
489 return;
490 }
491
492
493 new AsyncFlush().iterate();
494 return;
495
496 case PENDING:
497 case UNREADY:
498 throw new WritePendingException();
499
500 case ERROR:
501 throw new EofException(_onError);
502
503 case CLOSED:
504 throw new EofException("Closed");
505 }
506 break;
507 }
508 }
509
510 @Override
511 public void print(String s) throws IOException
512 {
513 if (isClosed())
514 throw new IOException("Closed");
515
516 write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
517 }
518
519
520
521
522
523
524 public void sendContent(ByteBuffer content) throws IOException
525 {
526 try(Blocker blocker=_writeblock.acquire())
527 {
528 write(content,true,blocker);
529 blocker.block();
530 }
531 }
532
533
534
535
536
537
538 public void sendContent(InputStream in) throws IOException
539 {
540 try(Blocker blocker=_writeblock.acquire())
541 {
542 new InputStreamWritingCB(in,blocker).iterate();
543 blocker.block();
544 }
545 }
546
547
548
549
550
551
552 public void sendContent(ReadableByteChannel in) throws IOException
553 {
554 try(Blocker blocker=_writeblock.acquire())
555 {
556 new ReadableByteChannelWritingCB(in,blocker).iterate();
557 blocker.block();
558 }
559 }
560
561
562
563
564
565
566
567 public void sendContent(HttpContent content) throws IOException
568 {
569 try(Blocker blocker=_writeblock.acquire())
570 {
571 sendContent(content,blocker);
572 blocker.block();
573 }
574 }
575
576
577
578
579
580
581 public void sendContent(ByteBuffer content, final Callback callback)
582 {
583 write(content,true,new Callback()
584 {
585 @Override
586 public void succeeded()
587 {
588 closed();
589 callback.succeeded();
590 }
591
592 @Override
593 public void failed(Throwable x)
594 {
595 callback.failed(x);
596 }
597 });
598 }
599
600
601
602
603
604
605
606 public void sendContent(InputStream in, Callback callback)
607 {
608 new InputStreamWritingCB(in,callback).iterate();
609 }
610
611
612
613
614
615
616
617 public void sendContent(ReadableByteChannel in, Callback callback)
618 {
619 new ReadableByteChannelWritingCB(in,callback).iterate();
620 }
621
622
623
624
625
626
627 public void sendContent(HttpContent httpContent, Callback callback)
628 {
629 if (BufferUtil.hasContent(_aggregate))
630 {
631 callback.failed(new IOException("cannot sendContent() after write()"));
632 return;
633 }
634 if (_channel.isCommitted())
635 {
636 callback.failed(new IOException("committed"));
637 return;
638 }
639
640 while (true)
641 {
642 switch(_state.get())
643 {
644 case OPEN:
645 if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
646 continue;
647 break;
648 case ERROR:
649 callback.failed(new EofException(_onError));
650 return;
651
652 case CLOSED:
653 callback.failed(new EofException("Closed"));
654 return;
655 default:
656 throw new IllegalStateException();
657 }
658 break;
659 }
660 ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
661 if (buffer == null)
662 buffer = httpContent.getIndirectBuffer();
663
664 if (buffer!=null)
665 {
666 if (LOG.isDebugEnabled())
667 LOG.debug("sendContent({}=={},{},direct={})",httpContent,BufferUtil.toDetailString(buffer),callback,_channel.useDirectBuffers());
668
669 sendContent(buffer,callback);
670 return;
671 }
672
673 try
674 {
675 ReadableByteChannel rbc=httpContent.getReadableByteChannel();
676 if (rbc!=null)
677 {
678 if (LOG.isDebugEnabled())
679 LOG.debug("sendContent({}=={},{},direct={})",httpContent,rbc,callback,_channel.useDirectBuffers());
680
681 sendContent(rbc,callback);
682 return;
683 }
684
685 InputStream in = httpContent.getInputStream();
686 if ( in!=null )
687 {
688 if (LOG.isDebugEnabled())
689 LOG.debug("sendContent({}=={},{},direct={})",httpContent,in,callback,_channel.useDirectBuffers());
690 sendContent(in,callback);
691 return;
692 }
693 }
694 catch(Throwable th)
695 {
696 callback.failed(th);
697 return;
698 }
699
700 callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
701 }
702
703 public int getBufferSize()
704 {
705 return _bufferSize;
706 }
707
708 public void setBufferSize(int size)
709 {
710 _bufferSize = size;
711 _commitSize = size;
712 }
713
714 public void resetBuffer()
715 {
716 if (BufferUtil.hasContent(_aggregate))
717 BufferUtil.clear(_aggregate);
718 }
719
720 @Override
721 public void setWriteListener(WriteListener writeListener)
722 {
723 if (!_channel.getState().isAsync())
724 throw new IllegalStateException("!ASYNC");
725
726 if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
727 {
728 _writeListener = writeListener;
729 _channel.getState().onWritePossible();
730 }
731 else
732 throw new IllegalStateException();
733 }
734
735
736
737
738 @Override
739 public boolean isReady()
740 {
741 while (true)
742 {
743 switch(_state.get())
744 {
745 case OPEN:
746 return true;
747 case ASYNC:
748 if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
749 continue;
750 return true;
751 case READY:
752 return true;
753 case PENDING:
754 if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
755 continue;
756 return false;
757 case UNREADY:
758 return false;
759
760 case ERROR:
761 return true;
762
763 case CLOSED:
764 return true;
765 }
766 }
767 }
768
769 @Override
770 public void run()
771 {
772 loop: while (true)
773 {
774 OutputState state = _state.get();
775
776 if(_onError!=null)
777 {
778 switch(state)
779 {
780 case CLOSED:
781 case ERROR:
782 _onError=null;
783 break loop;
784
785 default:
786 if (_state.compareAndSet(state, OutputState.ERROR))
787 {
788 Throwable th=_onError;
789 _onError=null;
790 if (LOG.isDebugEnabled())
791 LOG.debug("onError",th);
792 _writeListener.onError(th);
793 close();
794
795 break loop;
796 }
797
798 }
799 continue;
800 }
801
802 switch(_state.get())
803 {
804 case CLOSED:
805
806
807
808
809 case READY:
810 try
811 {
812 _writeListener.onWritePossible();
813 break loop;
814 }
815 catch (Throwable e)
816 {
817 _onError=e;
818 }
819 break;
820
821 default:
822 _onError=new IllegalStateException("state="+_state.get());
823 }
824 }
825 }
826
827 @Override
828 public String toString()
829 {
830 return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
831 }
832
833 private abstract class AsyncICB extends IteratingCallback
834 {
835 @Override
836 protected void onCompleteSuccess()
837 {
838 while(true)
839 {
840 OutputState last=_state.get();
841 switch(last)
842 {
843 case PENDING:
844 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
845 continue;
846 break;
847
848 case UNREADY:
849 if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
850 continue;
851 _channel.getState().onWritePossible();
852 break;
853
854 case CLOSED:
855 break;
856
857 default:
858 throw new IllegalStateException();
859 }
860 break;
861 }
862 }
863
864 @Override
865 public void onCompleteFailure(Throwable e)
866 {
867 _onError=e==null?new IOException():e;
868 _channel.getState().onWritePossible();
869 }
870 }
871
872
873 private class AsyncFlush extends AsyncICB
874 {
875 protected volatile boolean _flushed;
876
877 public AsyncFlush()
878 {
879 }
880
881 @Override
882 protected Action process()
883 {
884 if (BufferUtil.hasContent(_aggregate))
885 {
886 _flushed=true;
887 write(_aggregate, false, this);
888 return Action.SCHEDULED;
889 }
890
891 if (!_flushed)
892 {
893 _flushed=true;
894 write(BufferUtil.EMPTY_BUFFER,false,this);
895 return Action.SCHEDULED;
896 }
897
898 return Action.SUCCEEDED;
899 }
900 }
901
902
903
904 private class AsyncWrite extends AsyncICB
905 {
906 private final ByteBuffer _buffer;
907 private final ByteBuffer _slice;
908 private final boolean _complete;
909 private final int _len;
910 protected volatile boolean _completed;
911
912 public AsyncWrite(byte[] b, int off, int len, boolean complete)
913 {
914 _buffer=ByteBuffer.wrap(b, off, len);
915 _len=len;
916
917 _slice=_len<getBufferSize()?null:_buffer.duplicate();
918 _complete=complete;
919 }
920
921 public AsyncWrite(ByteBuffer buffer, boolean complete)
922 {
923 _buffer=buffer;
924 _len=buffer.remaining();
925
926 _slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
927 _complete=complete;
928 }
929
930 @Override
931 protected Action process()
932 {
933
934 if (BufferUtil.hasContent(_aggregate))
935 {
936 _completed=_len==0;
937 write(_aggregate, _complete && _completed, this);
938 return Action.SCHEDULED;
939 }
940
941
942 if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
943 {
944 int position = BufferUtil.flipToFill(_aggregate);
945 BufferUtil.put(_buffer,_aggregate);
946 BufferUtil.flipToFlush(_aggregate, position);
947 return Action.SUCCEEDED;
948 }
949
950
951 if (_buffer.hasRemaining())
952 {
953
954 if (_slice==null)
955 {
956 _completed=true;
957 write(_buffer, _complete, this);
958 return Action.SCHEDULED;
959 }
960
961
962 int p=_buffer.position();
963 int l=Math.min(getBufferSize(),_buffer.remaining());
964 int pl=p+l;
965 _slice.limit(pl);
966 _buffer.position(pl);
967 _slice.position(p);
968 _completed=!_buffer.hasRemaining();
969 write(_slice, _complete && _completed, this);
970 return Action.SCHEDULED;
971 }
972
973
974
975 if (_complete && !_completed)
976 {
977 _completed=true;
978 write(BufferUtil.EMPTY_BUFFER, _complete, this);
979 return Action.SCHEDULED;
980 }
981
982 return Action.SUCCEEDED;
983 }
984
985 @Override
986 protected void onCompleteSuccess()
987 {
988 super.onCompleteSuccess();
989 if (_complete)
990 closed();
991 }
992
993
994 }
995
996
997
998
999
1000
1001
1002
1003
1004
1005 private class InputStreamWritingCB extends IteratingNestedCallback
1006 {
1007 private final InputStream _in;
1008 private final ByteBuffer _buffer;
1009 private boolean _eof;
1010
1011 public InputStreamWritingCB(InputStream in, Callback callback)
1012 {
1013 super(callback);
1014 _in=in;
1015 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1016 }
1017
1018 @Override
1019 protected Action process() throws Exception
1020 {
1021
1022
1023 if (_eof)
1024 {
1025
1026 _in.close();
1027 closed();
1028 _channel.getByteBufferPool().release(_buffer);
1029 return Action.SUCCEEDED;
1030 }
1031
1032
1033 int len=0;
1034 while (len<_buffer.capacity() && !_eof)
1035 {
1036 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1037 if (r<0)
1038 _eof=true;
1039 else
1040 len+=r;
1041 }
1042
1043
1044 _buffer.position(0);
1045 _buffer.limit(len);
1046 write(_buffer,_eof,this);
1047 return Action.SCHEDULED;
1048 }
1049
1050 @Override
1051 public void onCompleteFailure(Throwable x)
1052 {
1053 super.onCompleteFailure(x);
1054 _channel.getByteBufferPool().release(_buffer);
1055 try
1056 {
1057 _in.close();
1058 }
1059 catch (IOException e)
1060 {
1061 LOG.ignore(e);
1062 }
1063 }
1064
1065 }
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076 private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1077 {
1078 private final ReadableByteChannel _in;
1079 private final ByteBuffer _buffer;
1080 private boolean _eof;
1081
1082 public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1083 {
1084 super(callback);
1085 _in=in;
1086 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1087 }
1088
1089 @Override
1090 protected Action process() throws Exception
1091 {
1092
1093
1094 if (_eof)
1095 {
1096 _in.close();
1097 closed();
1098 _channel.getByteBufferPool().release(_buffer);
1099 return Action.SUCCEEDED;
1100 }
1101
1102
1103 _buffer.clear();
1104 while (_buffer.hasRemaining() && !_eof)
1105 _eof = (_in.read(_buffer)) < 0;
1106
1107
1108 _buffer.flip();
1109 write(_buffer,_eof,this);
1110
1111 return Action.SCHEDULED;
1112 }
1113
1114 @Override
1115 public void onCompleteFailure(Throwable x)
1116 {
1117 super.onCompleteFailure(x);
1118 _channel.getByteBufferPool().release(_buffer);
1119 try
1120 {
1121 _in.close();
1122 }
1123 catch (IOException e)
1124 {
1125 LOG.ignore(e);
1126 }
1127 }
1128 }
1129 }