1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ReadableByteChannel;
25 import java.nio.channels.WritePendingException;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import javax.servlet.ServletOutputStream;
29 import javax.servlet.WriteListener;
30
31 import org.eclipse.jetty.http.HttpContent;
32 import org.eclipse.jetty.io.EofException;
33 import org.eclipse.jetty.util.BufferUtil;
34 import org.eclipse.jetty.util.Callback;
35 import org.eclipse.jetty.util.IteratingCallback;
36 import org.eclipse.jetty.util.IteratingNestedCallback;
37 import org.eclipse.jetty.util.SharedBlockingCallback;
38 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41
42
43
44
45
46
47
48
49
50
51
52 public class HttpOutput extends ServletOutputStream implements Runnable
53 {
54 private static Logger LOG = Log.getLogger(HttpOutput.class);
55 private final HttpChannel<?> _channel;
56 private final SharedBlockingCallback _writeblock=new SharedBlockingCallback()
57 {
58 @Override
59 protected long getIdleTimeout()
60 {
61 return _channel.getIdleTimeout();
62 }
63 };
64 private long _written;
65 private ByteBuffer _aggregate;
66 private int _bufferSize;
67 private int _commitSize;
68 private WriteListener _writeListener;
69 private volatile Throwable _onError;
70
71
72
73
74
75
76
77
78
79
80
81
82 enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
83 private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
84
85 public HttpOutput(HttpChannel<?> channel)
86 {
87 _channel = channel;
88 _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
89 _commitSize=_bufferSize/4;
90 }
91
92 public HttpChannel<?> getHttpChannel()
93 {
94 return _channel;
95 }
96
97 public boolean isWritten()
98 {
99 return _written > 0;
100 }
101
102 public long getWritten()
103 {
104 return _written;
105 }
106
107 public void reset()
108 {
109 _written = 0;
110 reopen();
111 }
112
113 public void reopen()
114 {
115 _state.set(OutputState.OPEN);
116 }
117
118 public boolean isAllContentWritten()
119 {
120 return _channel.getResponse().isAllContentWritten(_written);
121 }
122
123 protected Blocker acquireWriteBlockingCallback() throws IOException
124 {
125 return _writeblock.acquire();
126 }
127
128 protected void write(ByteBuffer content, boolean complete) throws IOException
129 {
130 try (Blocker blocker=_writeblock.acquire())
131 {
132 write(content,complete,blocker);
133 blocker.block();
134 }
135 }
136
137 protected void write(ByteBuffer content, boolean complete, Callback callback)
138 {
139 _channel.write(content,complete,callback);
140 }
141
142 @Override
143 public void close()
144 {
145 loop: while(true)
146 {
147 OutputState state=_state.get();
148 switch (state)
149 {
150 case CLOSED:
151 break loop;
152
153 case UNREADY:
154 if (_state.compareAndSet(state,OutputState.ERROR))
155 _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
156 continue;
157
158 default:
159 if (_state.compareAndSet(state,OutputState.CLOSED))
160 {
161 try
162 {
163 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
164 }
165 catch(IOException e)
166 {
167 LOG.debug(e);
168 _channel.abort();
169 }
170 releaseBuffer();
171 return;
172 }
173 }
174 }
175 }
176
177
178 void closed()
179 {
180 loop: while(true)
181 {
182 OutputState state=_state.get();
183 switch (state)
184 {
185 case CLOSED:
186 break loop;
187
188 case UNREADY:
189 if (_state.compareAndSet(state,OutputState.ERROR))
190 _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
191 continue;
192
193 default:
194 if (_state.compareAndSet(state,OutputState.CLOSED))
195 {
196 try
197 {
198 _channel.getResponse().closeOutput();
199 }
200 catch(IOException e)
201 {
202 LOG.debug(e);
203 _channel.abort();
204 }
205 releaseBuffer();
206 return;
207 }
208 }
209 }
210 }
211
212 private void releaseBuffer()
213 {
214 if (_aggregate != null)
215 {
216 _channel.getConnector().getByteBufferPool().release(_aggregate);
217 _aggregate = null;
218 }
219 }
220
221 public boolean isClosed()
222 {
223 return _state.get()==OutputState.CLOSED;
224 }
225
226 @Override
227 public void flush() throws IOException
228 {
229 while(true)
230 {
231 switch(_state.get())
232 {
233 case OPEN:
234 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
235 return;
236
237 case ASYNC:
238 throw new IllegalStateException("isReady() not called");
239
240 case READY:
241 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
242 continue;
243 new AsyncFlush().iterate();
244 return;
245
246 case PENDING:
247 case UNREADY:
248 throw new WritePendingException();
249
250 case ERROR:
251 throw new EofException(_onError);
252
253 case CLOSED:
254 return;
255 }
256 break;
257 }
258 }
259
260
261 @Override
262 public void write(byte[] b, int off, int len) throws IOException
263 {
264 _written+=len;
265 boolean complete=_channel.getResponse().isAllContentWritten(_written);
266
267
268 while(true)
269 {
270 switch(_state.get())
271 {
272 case OPEN:
273
274 break;
275
276 case ASYNC:
277 throw new IllegalStateException("isReady() not called");
278
279 case READY:
280 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
281 continue;
282
283
284 if (!complete && len<=_commitSize)
285 {
286 if (_aggregate == null)
287 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
288
289
290 int filled = BufferUtil.fill(_aggregate, b, off, len);
291
292
293 if (filled==len && !BufferUtil.isFull(_aggregate))
294 {
295 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
296 throw new IllegalStateException();
297 return;
298 }
299
300
301 off+=filled;
302 len-=filled;
303 }
304
305
306 new AsyncWrite(b,off,len,complete).iterate();
307 return;
308
309 case PENDING:
310 case UNREADY:
311 throw new WritePendingException();
312
313 case ERROR:
314 throw new EofException(_onError);
315
316 case CLOSED:
317 throw new EofException("Closed");
318 }
319 break;
320 }
321
322
323
324
325
326 int capacity = getBufferSize();
327 if (!complete && len<=_commitSize)
328 {
329 if (_aggregate == null)
330 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
331
332
333 int filled = BufferUtil.fill(_aggregate, b, off, len);
334
335
336 if (filled==len && !BufferUtil.isFull(_aggregate))
337 return;
338
339
340 off+=filled;
341 len-=filled;
342 }
343
344
345 if (BufferUtil.hasContent(_aggregate))
346 {
347 write(_aggregate, complete && len==0);
348
349
350 if (len>0 && !complete && len<=_commitSize)
351 {
352 BufferUtil.append(_aggregate, b, off, len);
353 return;
354 }
355 }
356
357
358 if (len>0)
359 {
360 ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
361 ByteBuffer view = wrap.duplicate();
362
363
364
365 while (len>getBufferSize())
366 {
367 int p=view.position();
368 int l=p+getBufferSize();
369 view.limit(p+getBufferSize());
370 write(view,false);
371 len-=getBufferSize();
372 view.limit(l+Math.min(len,getBufferSize()));
373 view.position(l);
374 }
375 write(view,complete);
376 }
377 else if (complete)
378 write(BufferUtil.EMPTY_BUFFER,complete);
379
380 if (complete)
381 closed();
382
383 }
384
385 public void write(ByteBuffer buffer) throws IOException
386 {
387 _written+=buffer.remaining();
388 boolean complete=_channel.getResponse().isAllContentWritten(_written);
389
390
391 while(true)
392 {
393 switch(_state.get())
394 {
395 case OPEN:
396
397 break;
398
399 case ASYNC:
400 throw new IllegalStateException("isReady() not called");
401
402 case READY:
403 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
404 continue;
405
406
407 new AsyncWrite(buffer,complete).iterate();
408 return;
409
410 case PENDING:
411 case UNREADY:
412 throw new WritePendingException();
413
414 case ERROR:
415 throw new EofException(_onError);
416
417 case CLOSED:
418 throw new EofException("Closed");
419 }
420 break;
421 }
422
423
424
425 int len=BufferUtil.length(buffer);
426
427
428 if (BufferUtil.hasContent(_aggregate))
429 write(_aggregate, complete && len==0);
430
431
432 if (len>0)
433 write(buffer, complete);
434 else if (complete)
435 write(BufferUtil.EMPTY_BUFFER,complete);
436
437 if (complete)
438 closed();
439 }
440
441 @Override
442 public void write(int b) throws IOException
443 {
444 _written+=1;
445 boolean complete=_channel.getResponse().isAllContentWritten(_written);
446
447
448 while(true)
449 {
450 switch(_state.get())
451 {
452 case OPEN:
453 if (_aggregate == null)
454 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
455 BufferUtil.append(_aggregate, (byte)b);
456
457
458 if (complete || BufferUtil.isFull(_aggregate))
459 {
460 try(Blocker blocker=_writeblock.acquire())
461 {
462 write(_aggregate, complete, blocker);
463 blocker.block();
464 }
465 if (complete)
466 closed();
467 }
468 break;
469
470 case ASYNC:
471 throw new IllegalStateException("isReady() not called");
472
473 case READY:
474 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
475 continue;
476
477 if (_aggregate == null)
478 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
479 BufferUtil.append(_aggregate, (byte)b);
480
481
482 if (!complete && !BufferUtil.isFull(_aggregate))
483 {
484 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
485 throw new IllegalStateException();
486 return;
487 }
488
489
490 new AsyncFlush().iterate();
491 return;
492
493 case PENDING:
494 case UNREADY:
495 throw new WritePendingException();
496
497 case ERROR:
498 throw new EofException(_onError);
499
500 case CLOSED:
501 throw new EofException("Closed");
502 }
503 break;
504 }
505 }
506
507 @Override
508 public void print(String s) throws IOException
509 {
510 if (isClosed())
511 throw new IOException("Closed");
512
513 write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
514 }
515
516
517
518
519
520
521 public void sendContent(ByteBuffer content) throws IOException
522 {
523 try(Blocker blocker=_writeblock.acquire())
524 {
525 write(content,true,blocker);
526 blocker.block();
527 }
528 }
529
530
531
532
533
534
535 public void sendContent(InputStream in) throws IOException
536 {
537 try(Blocker blocker=_writeblock.acquire())
538 {
539 new InputStreamWritingCB(in,blocker).iterate();
540 blocker.block();
541 }
542 }
543
544
545
546
547
548
549 public void sendContent(ReadableByteChannel in) throws IOException
550 {
551 try(Blocker blocker=_writeblock.acquire())
552 {
553 new ReadableByteChannelWritingCB(in,blocker).iterate();
554 blocker.block();
555 }
556 }
557
558
559
560
561
562
563
564 public void sendContent(HttpContent content) throws IOException
565 {
566 try(Blocker blocker=_writeblock.acquire())
567 {
568 sendContent(content,blocker);
569 blocker.block();
570 }
571 }
572
573
574
575
576
577
578 public void sendContent(ByteBuffer content, final Callback callback)
579 {
580 write(content,true,new Callback()
581 {
582 @Override
583 public void succeeded()
584 {
585 closed();
586 callback.succeeded();
587 }
588
589 @Override
590 public void failed(Throwable x)
591 {
592 callback.failed(x);
593 }
594 });
595 }
596
597
598
599
600
601
602
603 public void sendContent(InputStream in, Callback callback)
604 {
605 new InputStreamWritingCB(in,callback).iterate();
606 }
607
608
609
610
611
612
613
614 public void sendContent(ReadableByteChannel in, Callback callback)
615 {
616 new ReadableByteChannelWritingCB(in,callback).iterate();
617 }
618
619
620
621
622
623
624 public void sendContent(HttpContent httpContent, Callback callback)
625 {
626 if (BufferUtil.hasContent(_aggregate))
627 {
628 callback.failed(new IOException("cannot sendContent() after write()"));
629 return;
630 }
631 if (_channel.isCommitted())
632 {
633 callback.failed(new IOException("committed"));
634 return;
635 }
636
637 while (true)
638 {
639 switch(_state.get())
640 {
641 case OPEN:
642 if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
643 continue;
644 break;
645 case ERROR:
646 callback.failed(new EofException(_onError));
647 return;
648
649 case CLOSED:
650 callback.failed(new EofException("Closed"));
651 return;
652 default:
653 throw new IllegalStateException();
654 }
655 break;
656 }
657 ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
658 if (buffer == null)
659 buffer = httpContent.getIndirectBuffer();
660
661 if (buffer!=null)
662 {
663 if (LOG.isDebugEnabled())
664 LOG.debug("sendContent({}=={},{},direct={})",httpContent,BufferUtil.toDetailString(buffer),callback,_channel.useDirectBuffers());
665
666 sendContent(buffer,callback);
667 return;
668 }
669
670 try
671 {
672 ReadableByteChannel rbc=httpContent.getReadableByteChannel();
673 if (rbc!=null)
674 {
675 if (LOG.isDebugEnabled())
676 LOG.debug("sendContent({}=={},{},direct={})",httpContent,rbc,callback,_channel.useDirectBuffers());
677
678 sendContent(rbc,callback);
679 return;
680 }
681
682 InputStream in = httpContent.getInputStream();
683 if ( in!=null )
684 {
685 if (LOG.isDebugEnabled())
686 LOG.debug("sendContent({}=={},{},direct={})",httpContent,in,callback,_channel.useDirectBuffers());
687 sendContent(in,callback);
688 return;
689 }
690 }
691 catch(Throwable th)
692 {
693 callback.failed(th);
694 return;
695 }
696
697 callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
698 }
699
700 public int getBufferSize()
701 {
702 return _bufferSize;
703 }
704
705 public void setBufferSize(int size)
706 {
707 _bufferSize = size;
708 _commitSize = size;
709 }
710
711 public void resetBuffer()
712 {
713 if (BufferUtil.hasContent(_aggregate))
714 BufferUtil.clear(_aggregate);
715 }
716
717 @Override
718 public void setWriteListener(WriteListener writeListener)
719 {
720 if (!_channel.getState().isAsync())
721 throw new IllegalStateException("!ASYNC");
722
723 if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
724 {
725 _writeListener = writeListener;
726 _channel.getState().onWritePossible();
727 }
728 else
729 throw new IllegalStateException();
730 }
731
732
733
734
735 @Override
736 public boolean isReady()
737 {
738 while (true)
739 {
740 switch(_state.get())
741 {
742 case OPEN:
743 return true;
744 case ASYNC:
745 if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
746 continue;
747 return true;
748 case READY:
749 return true;
750 case PENDING:
751 if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
752 continue;
753 return false;
754 case UNREADY:
755 return false;
756
757 case ERROR:
758 return true;
759
760 case CLOSED:
761 return true;
762 }
763 }
764 }
765
766 @Override
767 public void run()
768 {
769 loop: while (true)
770 {
771 OutputState state = _state.get();
772
773 if(_onError!=null)
774 {
775 switch(state)
776 {
777 case CLOSED:
778 case ERROR:
779 _onError=null;
780 break loop;
781
782 default:
783 if (_state.compareAndSet(state, OutputState.ERROR))
784 {
785 Throwable th=_onError;
786 _onError=null;
787 if (LOG.isDebugEnabled())
788 LOG.debug("onError",th);
789 _writeListener.onError(th);
790 close();
791
792 break loop;
793 }
794
795 }
796 continue;
797 }
798
799 switch(_state.get())
800 {
801 case CLOSED:
802
803
804
805
806 case READY:
807 try
808 {
809 _writeListener.onWritePossible();
810 break loop;
811 }
812 catch (Throwable e)
813 {
814 _onError=e;
815 }
816 break;
817
818 default:
819 _onError=new IllegalStateException("state="+_state.get());
820 }
821 }
822 }
823
824 @Override
825 public String toString()
826 {
827 return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
828 }
829
830 private abstract class AsyncICB extends IteratingCallback
831 {
832 @Override
833 protected void onCompleteSuccess()
834 {
835 while(true)
836 {
837 OutputState last=_state.get();
838 switch(last)
839 {
840 case PENDING:
841 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
842 continue;
843 break;
844
845 case UNREADY:
846 if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
847 continue;
848 _channel.getState().onWritePossible();
849 break;
850
851 case CLOSED:
852 break;
853
854 default:
855 throw new IllegalStateException();
856 }
857 break;
858 }
859 }
860
861 @Override
862 public void onCompleteFailure(Throwable e)
863 {
864 _onError=e==null?new IOException():e;
865 _channel.getState().onWritePossible();
866 }
867 }
868
869
870 private class AsyncFlush extends AsyncICB
871 {
872 protected volatile boolean _flushed;
873
874 public AsyncFlush()
875 {
876 }
877
878 @Override
879 protected Action process()
880 {
881 if (BufferUtil.hasContent(_aggregate))
882 {
883 _flushed=true;
884 write(_aggregate, false, this);
885 return Action.SCHEDULED;
886 }
887
888 if (!_flushed)
889 {
890 _flushed=true;
891 write(BufferUtil.EMPTY_BUFFER,false,this);
892 return Action.SCHEDULED;
893 }
894
895 return Action.SUCCEEDED;
896 }
897 }
898
899
900
901 private class AsyncWrite extends AsyncICB
902 {
903 private final ByteBuffer _buffer;
904 private final ByteBuffer _slice;
905 private final boolean _complete;
906 private final int _len;
907 protected volatile boolean _completed;
908
909 public AsyncWrite(byte[] b, int off, int len, boolean complete)
910 {
911 _buffer=ByteBuffer.wrap(b, off, len);
912 _len=len;
913
914 _slice=_len<getBufferSize()?null:_buffer.duplicate();
915 _complete=complete;
916 }
917
918 public AsyncWrite(ByteBuffer buffer, boolean complete)
919 {
920 _buffer=buffer;
921 _len=buffer.remaining();
922
923 _slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
924 _complete=complete;
925 }
926
927 @Override
928 protected Action process()
929 {
930
931 if (BufferUtil.hasContent(_aggregate))
932 {
933 _completed=_len==0;
934 write(_aggregate, _complete && _completed, this);
935 return Action.SCHEDULED;
936 }
937
938
939 if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
940 {
941 int position = BufferUtil.flipToFill(_aggregate);
942 BufferUtil.put(_buffer,_aggregate);
943 BufferUtil.flipToFlush(_aggregate, position);
944 return Action.SUCCEEDED;
945 }
946
947
948 if (_buffer.hasRemaining())
949 {
950
951 if (_slice==null)
952 {
953 _completed=true;
954 write(_buffer, _complete, this);
955 return Action.SCHEDULED;
956 }
957
958
959 int p=_buffer.position();
960 int l=Math.min(getBufferSize(),_buffer.remaining());
961 int pl=p+l;
962 _slice.limit(pl);
963 _buffer.position(pl);
964 _slice.position(p);
965 _completed=!_buffer.hasRemaining();
966 write(_slice, _complete && _completed, this);
967 return Action.SCHEDULED;
968 }
969
970
971
972 if (_complete && !_completed)
973 {
974 _completed=true;
975 write(BufferUtil.EMPTY_BUFFER, _complete, this);
976 return Action.SCHEDULED;
977 }
978
979 return Action.SUCCEEDED;
980 }
981
982 @Override
983 protected void onCompleteSuccess()
984 {
985 super.onCompleteSuccess();
986 if (_complete)
987 closed();
988 }
989
990
991 }
992
993
994
995
996
997
998
999
1000
1001
1002 private class InputStreamWritingCB extends IteratingNestedCallback
1003 {
1004 private final InputStream _in;
1005 private final ByteBuffer _buffer;
1006 private boolean _eof;
1007
1008 public InputStreamWritingCB(InputStream in, Callback callback)
1009 {
1010 super(callback);
1011 _in=in;
1012 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1013 }
1014
1015 @Override
1016 protected Action process() throws Exception
1017 {
1018
1019
1020 if (_eof)
1021 {
1022
1023 _in.close();
1024 closed();
1025 _channel.getByteBufferPool().release(_buffer);
1026 return Action.SUCCEEDED;
1027 }
1028
1029
1030 int len=0;
1031 while (len<_buffer.capacity() && !_eof)
1032 {
1033 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1034 if (r<0)
1035 _eof=true;
1036 else
1037 len+=r;
1038 }
1039
1040
1041 _buffer.position(0);
1042 _buffer.limit(len);
1043 write(_buffer,_eof,this);
1044 return Action.SCHEDULED;
1045 }
1046
1047 @Override
1048 public void onCompleteFailure(Throwable x)
1049 {
1050 super.onCompleteFailure(x);
1051 _channel.getByteBufferPool().release(_buffer);
1052 try
1053 {
1054 _in.close();
1055 }
1056 catch (IOException e)
1057 {
1058 LOG.ignore(e);
1059 }
1060 }
1061
1062 }
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073 private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1074 {
1075 private final ReadableByteChannel _in;
1076 private final ByteBuffer _buffer;
1077 private boolean _eof;
1078
1079 public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1080 {
1081 super(callback);
1082 _in=in;
1083 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1084 }
1085
1086 @Override
1087 protected Action process() throws Exception
1088 {
1089
1090
1091 if (_eof)
1092 {
1093 _in.close();
1094 closed();
1095 _channel.getByteBufferPool().release(_buffer);
1096 return Action.SUCCEEDED;
1097 }
1098
1099
1100 _buffer.clear();
1101 while (_buffer.hasRemaining() && !_eof)
1102 _eof = (_in.read(_buffer)) < 0;
1103
1104
1105 _buffer.flip();
1106 write(_buffer,_eof,this);
1107
1108 return Action.SCHEDULED;
1109 }
1110
1111 @Override
1112 public void onCompleteFailure(Throwable x)
1113 {
1114 super.onCompleteFailure(x);
1115 _channel.getByteBufferPool().release(_buffer);
1116 try
1117 {
1118 _in.close();
1119 }
1120 catch (IOException e)
1121 {
1122 LOG.ignore(e);
1123 }
1124 }
1125 }
1126 }