View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.ReadableByteChannel;
25  import java.nio.channels.WritePendingException;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import javax.servlet.RequestDispatcher;
29  import javax.servlet.ServletOutputStream;
30  import javax.servlet.ServletRequest;
31  import javax.servlet.ServletResponse;
32  import javax.servlet.WriteListener;
33  
34  import org.eclipse.jetty.http.HttpContent;
35  import org.eclipse.jetty.io.EofException;
36  import org.eclipse.jetty.util.BlockingCallback;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.Callback;
39  import org.eclipse.jetty.util.IteratingCallback;
40  import org.eclipse.jetty.util.IteratingNestedCallback;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  
44  /**
45   * <p>{@link HttpOutput} implements {@link ServletOutputStream}
46   * as required by the Servlet specification.</p>
47   * <p>{@link HttpOutput} buffers content written by the application until a
48   * further write will overflow the buffer, at which point it triggers a commit
49   * of the response.</p>
50   * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
51   * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
52   * close the stream, to be reopened after the inclusion ends.</p>
53   */
54  public class HttpOutput extends ServletOutputStream implements Runnable
55  {
56      private static Logger LOG = Log.getLogger(HttpOutput.class);
57      private final HttpChannel<?> _channel;
58      private long _written;
59      private ByteBuffer _aggregate;
60      private int _bufferSize;
61      private int _commitSize;
62      private WriteListener _writeListener;
63      private volatile Throwable _onError;
64  
65      /*
66      ACTION             OPEN       ASYNC      READY      PENDING       UNREADY
67      -------------------------------------------------------------------------------
68      setWriteListener() READY->owp ise        ise        ise           ise
69      write()            OPEN       ise        PENDING    wpe           wpe
70      flush()            OPEN       ise        PENDING    wpe           wpe
71      isReady()          OPEN:true  READY:true READY:true UNREADY:false UNREADY:false
72      write completed    -          -          -          ASYNC         READY->owp
73      */
74      enum State { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
75      private final AtomicReference<State> _state=new AtomicReference<>(State.OPEN);
76  
77      public HttpOutput(HttpChannel<?> channel)
78      {
79          _channel = channel;
80          _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
81          _commitSize=_bufferSize/4;
82      }
83  
84      public boolean isWritten()
85      {
86          return _written > 0;
87      }
88  
89      public long getWritten()
90      {
91          return _written;
92      }
93  
94      public void reset()
95      {
96          _written = 0;
97          reopen();
98      }
99  
100     public void reopen()
101     {
102         _state.set(State.OPEN);
103     }
104 
105     public boolean isAllContentWritten()
106     {
107         return _channel.getResponse().isAllContentWritten(_written);
108     }
109 
110     @Override
111     public void close()
112     {
113         State state=_state.get();
114         while(state!=State.CLOSED)
115         {
116             if (_state.compareAndSet(state,State.CLOSED))
117             {
118                 try
119                 {
120                     if (BufferUtil.hasContent(_aggregate))
121                         _channel.write(_aggregate, !_channel.getResponse().isIncluding());
122                     else
123                         _channel.write(BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
124                 }
125                 catch(IOException e)
126                 {
127                     LOG.debug(e);
128                     _channel.failed();
129                 }
130                 releaseBuffer();
131                 return;
132             }
133             state=_state.get();
134         }
135     }
136 
137     /* Called to indicated that the output is already closed and the state needs to be updated to match */
138     void closed()
139     {
140         State state=_state.get();
141         while(state!=State.CLOSED)
142         {
143             if (_state.compareAndSet(state,State.CLOSED))
144             {
145                 try
146                 {
147                     _channel.getResponse().closeOutput();
148                 }
149                 catch(IOException e)
150                 {
151                     LOG.debug(e);
152                     _channel.failed();
153                 }
154                 releaseBuffer();
155                 return;
156             }
157             state=_state.get();
158         }
159     }
160 
161     private void releaseBuffer()
162     {
163         if (_aggregate != null)
164         {
165             _channel.getConnector().getByteBufferPool().release(_aggregate);
166             _aggregate = null;
167         }
168     }
169 
170     public boolean isClosed()
171     {
172         return _state.get()==State.CLOSED;
173     }
174 
175     @Override
176     public void flush() throws IOException
177     {
178         while(true)
179         {
180             switch(_state.get())
181             {
182                 case OPEN:
183                     if (BufferUtil.hasContent(_aggregate))
184                         _channel.write(_aggregate, false);
185                     else
186                         _channel.write(BufferUtil.EMPTY_BUFFER, false);
187                     return;
188 
189                 case ASYNC:
190                     throw new IllegalStateException("isReady() not called");
191 
192                 case READY:
193                     if (!_state.compareAndSet(State.READY, State.PENDING))
194                         continue;
195                     new AsyncFlush().process();
196                     return;
197 
198                 case PENDING:
199                 case UNREADY:
200                     throw new WritePendingException();
201 
202                 case CLOSED:
203                     return;
204             }
205             break;
206         }
207     }
208 
209 
210     @Override
211     public void write(byte[] b, int off, int len) throws IOException
212     {
213         _written+=len;
214         boolean complete=_channel.getResponse().isAllContentWritten(_written);
215 
216         // Async or Blocking ?
217         while(true)
218         {
219             switch(_state.get())
220             {
221                 case OPEN:
222                     // process blocking below
223                     break;
224 
225                 case ASYNC:
226                     throw new IllegalStateException("isReady() not called");
227 
228                 case READY:
229                     if (!_state.compareAndSet(State.READY, State.PENDING))
230                         continue;
231 
232                     // Should we aggregate?
233                     int capacity = getBufferSize();
234                     if (!complete && len<=_commitSize)
235                     {
236                         if (_aggregate == null)
237                             _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
238 
239                         // YES - fill the aggregate with content from the buffer
240                         int filled = BufferUtil.fill(_aggregate, b, off, len);
241 
242                         // return if we are not complete, not full and filled all the content
243                         if (filled==len && !BufferUtil.isFull(_aggregate))
244                         {
245                             if (!_state.compareAndSet(State.PENDING, State.ASYNC))
246                                 throw new IllegalStateException();
247                             return;
248                         }
249 
250                         // adjust offset/length
251                         off+=filled;
252                         len-=filled;
253                     }
254 
255                     // Do the asynchronous writing from the callback
256                     new AsyncWrite(b,off,len,complete).process();
257                     return;
258 
259                 case PENDING:
260                 case UNREADY:
261                     throw new WritePendingException();
262 
263                 case CLOSED:
264                     throw new EofException("Closed");
265             }
266             break;
267         }
268 
269 
270         // handle blocking write
271 
272         // Should we aggregate?
273         int capacity = getBufferSize();
274         if (!complete && len<=_commitSize)
275         {
276             if (_aggregate == null)
277                 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
278 
279             // YES - fill the aggregate with content from the buffer
280             int filled = BufferUtil.fill(_aggregate, b, off, len);
281 
282             // return if we are not complete, not full and filled all the content
283             if (filled==len && !BufferUtil.isFull(_aggregate))
284                 return;
285 
286             // adjust offset/length
287             off+=filled;
288             len-=filled;
289         }
290 
291         // flush any content from the aggregate
292         if (BufferUtil.hasContent(_aggregate))
293         {
294             _channel.write(_aggregate, complete && len==0);
295 
296             // should we fill aggregate again from the buffer?
297             if (len>0 && !complete && len<=_commitSize)
298             {
299                 BufferUtil.append(_aggregate, b, off, len);
300                 return;
301             }
302         }
303 
304         // write any remaining content in the buffer directly
305         if (len>0)
306             // pass as readonly to avoid space stealing optimisation in HttpConnection 
307             _channel.write(ByteBuffer.wrap(b, off, len).asReadOnlyBuffer(), complete);
308         else if (complete)
309             _channel.write(BufferUtil.EMPTY_BUFFER,complete);
310 
311         if (complete)
312         {
313             closed();
314         }
315 
316     }
317 
318     public void write(ByteBuffer buffer) throws IOException
319     {
320         _written+=buffer.remaining();
321         boolean complete=_channel.getResponse().isAllContentWritten(_written);
322 
323         // Async or Blocking ?
324         while(true)
325         {
326             switch(_state.get())
327             {
328                 case OPEN:
329                     // process blocking below
330                     break;
331 
332                 case ASYNC:
333                     throw new IllegalStateException("isReady() not called");
334 
335                 case READY:
336                     if (!_state.compareAndSet(State.READY, State.PENDING))
337                         continue;
338 
339                     // Do the asynchronous writing from the callback
340                     new AsyncWrite(buffer,complete).process();
341                     return;
342 
343                 case PENDING:
344                 case UNREADY:
345                     throw new WritePendingException();
346 
347                 case CLOSED:
348                     throw new EofException("Closed");
349             }
350             break;
351         }
352 
353 
354         // handle blocking write
355         int len=BufferUtil.length(buffer);
356 
357         // flush any content from the aggregate
358         if (BufferUtil.hasContent(_aggregate))
359             _channel.write(_aggregate, complete && len==0);
360 
361         // write any remaining content in the buffer directly
362         if (len>0)
363             _channel.write(buffer, complete);
364         else if (complete)
365             _channel.write(BufferUtil.EMPTY_BUFFER,complete);
366 
367         if (complete)
368             closed();
369     }
370 
371     @Override
372     public void write(int b) throws IOException
373     {
374         _written+=1;
375         boolean complete=_channel.getResponse().isAllContentWritten(_written);
376 
377         // Async or Blocking ?
378         while(true)
379         {
380             switch(_state.get())
381             {
382                 case OPEN:
383                     if (_aggregate == null)
384                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
385                     BufferUtil.append(_aggregate, (byte)b);
386 
387                     // Check if all written or full
388                     if (complete || BufferUtil.isFull(_aggregate))
389                     {
390                         BlockingCallback callback = _channel.getWriteBlockingCallback();
391                         _channel.write(_aggregate, complete, callback);
392                         callback.block();
393                         if (complete)
394                             closed();
395                     }
396                     break;
397 
398                 case ASYNC:
399                     throw new IllegalStateException("isReady() not called");
400 
401                 case READY:
402                     if (!_state.compareAndSet(State.READY, State.PENDING))
403                         continue;
404 
405                     if (_aggregate == null)
406                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
407                     BufferUtil.append(_aggregate, (byte)b);
408 
409                     // Check if all written or full
410                     if (!complete && !BufferUtil.isFull(_aggregate))
411                     {
412                         if (!_state.compareAndSet(State.PENDING, State.ASYNC))
413                             throw new IllegalStateException();
414                         return;
415                     }
416 
417                     // Do the asynchronous writing from the callback
418                     new AsyncFlush().process();
419                     return;
420 
421                 case PENDING:
422                 case UNREADY:
423                     throw new WritePendingException();
424 
425                 case CLOSED:
426                     throw new EofException("Closed");
427             }
428             break;
429         }
430     }
431 
432 
433 
434     @Override
435     public void print(String s) throws IOException
436     {
437         if (isClosed())
438             throw new IOException("Closed");
439 
440         write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
441     }
442 
443     /* ------------------------------------------------------------ */
444     /** Blocking send of content.
445      * @param content The content to send.
446      * @throws IOException
447      */
448     public void sendContent(ByteBuffer content) throws IOException
449     {
450         final BlockingCallback callback =_channel.getWriteBlockingCallback();
451         if (content.hasArray()&&content.limit()<content.capacity())
452             content=content.asReadOnlyBuffer();
453         _channel.write(content,true,callback);
454         callback.block();
455     }
456 
457     /* ------------------------------------------------------------ */
458     /** Blocking send of content.
459      * @param in The content to send
460      * @throws IOException
461      */
462     public void sendContent(InputStream in) throws IOException
463     {
464         final BlockingCallback callback =_channel.getWriteBlockingCallback();
465         new InputStreamWritingCB(in,callback).iterate();
466         callback.block();
467     }
468 
469     /* ------------------------------------------------------------ */
470     /** Blocking send of content.
471      * @param in The content to send
472      * @throws IOException
473      */
474     public void sendContent(ReadableByteChannel in) throws IOException
475     {
476         final BlockingCallback callback =_channel.getWriteBlockingCallback();
477         new ReadableByteChannelWritingCB(in,callback).iterate();
478         callback.block();
479     }
480 
481 
482     /* ------------------------------------------------------------ */
483     /** Blocking send of content.
484      * @param content The content to send
485      * @throws IOException
486      */
487     public void sendContent(HttpContent content) throws IOException
488     {
489         final BlockingCallback callback =_channel.getWriteBlockingCallback();
490         sendContent(content,callback);
491         callback.block();
492     }
493 
494     /* ------------------------------------------------------------ */
495     /** Asynchronous send of content.
496      * @param content The content to send
497      * @param callback The callback to use to notify success or failure
498      */
499     public void sendContent(ByteBuffer content, final Callback callback)
500     {
501         if (content.hasArray()&&content.limit()<content.capacity())
502             content=content.asReadOnlyBuffer();
503         _channel.write(content,true,new Callback()
504         {
505             @Override
506             public void succeeded()
507             {
508                 closed();
509                 callback.succeeded();
510             }
511 
512             @Override
513             public void failed(Throwable x)
514             {
515                 callback.failed(x);
516             }
517         });
518     }
519 
520     /* ------------------------------------------------------------ */
521     /** Asynchronous send of content.
522      * @param in The content to send as a stream.  The stream will be closed
523      * after reading all content.
524      * @param callback The callback to use to notify success or failure
525      */
526     public void sendContent(InputStream in, Callback callback)
527     {
528         new InputStreamWritingCB(in,callback).iterate();
529     }
530 
531     /* ------------------------------------------------------------ */
532     /** Asynchronous send of content.
533      * @param in The content to send as a channel.  The channel will be closed
534      * after reading all content.
535      * @param callback The callback to use to notify success or failure
536      */
537     public void sendContent(ReadableByteChannel in, Callback callback)
538     {
539         new ReadableByteChannelWritingCB(in,callback).iterate();
540     }
541 
542     /* ------------------------------------------------------------ */
543     /** Asynchronous send of content.
544      * @param httpContent The content to send
545      * @param callback The callback to use to notify success or failure
546      */
547     public void sendContent(HttpContent httpContent, Callback callback) throws IOException
548     {
549         if (BufferUtil.hasContent(_aggregate))
550             throw new IOException("written");
551         if (_channel.isCommitted())
552             throw new IOException("committed");
553 
554         while (true)
555         {
556             switch(_state.get())
557             {
558                 case OPEN:
559                     if (!_state.compareAndSet(State.OPEN, State.PENDING))
560                         continue;
561                     break;
562                 case CLOSED:
563                     throw new EofException("Closed");
564                 default:
565                     throw new IllegalStateException();
566             }
567             break;
568         }
569         ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
570         if (buffer == null)
571             buffer = httpContent.getIndirectBuffer();
572 
573         if (buffer!=null)
574         {
575             sendContent(buffer,callback);
576             return;
577         }
578 
579         ReadableByteChannel rbc=httpContent.getReadableByteChannel();
580         if (rbc!=null)
581         {
582             // Close of the rbc is done by the async sendContent
583             sendContent(rbc,callback);
584             return;
585         }
586 
587         InputStream in = httpContent.getInputStream();
588         if ( in!=null )
589         {
590             sendContent(in,callback);
591             return;
592         }
593 
594         callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
595     }
596 
597     public int getBufferSize()
598     {
599         return _bufferSize;
600     }
601 
602     public void setBufferSize(int size)
603     {
604         _bufferSize = size;
605         _commitSize = size;
606     }
607 
608     public void resetBuffer()
609     {
610         if (BufferUtil.hasContent(_aggregate))
611             BufferUtil.clear(_aggregate);
612     }
613 
614     @Override
615     public void setWriteListener(WriteListener writeListener)
616     {
617         if (!_channel.getState().isAsync())
618             throw new IllegalStateException("!ASYNC");
619 
620         if (_state.compareAndSet(State.OPEN, State.READY))
621         {
622             _writeListener = writeListener;
623             _channel.getState().onWritePossible();
624         }
625         else
626             throw new IllegalStateException();
627     }
628 
629     /**
630      * @see javax.servlet.ServletOutputStream#isReady()
631      */
632     @Override
633     public boolean isReady()
634     {
635         while (true)
636         {
637             switch(_state.get())
638             {
639                 case OPEN:
640                     return true;
641                 case ASYNC:
642                     if (!_state.compareAndSet(State.ASYNC, State.READY))
643                         continue;
644                     return true;
645                 case READY:
646                     return true;
647                 case PENDING:
648                     if (!_state.compareAndSet(State.PENDING, State.UNREADY))
649                         continue;
650                     return false;
651                 case UNREADY:
652                     return false;
653                 case CLOSED:
654                     return false;
655             }
656         }
657     }
658 
659     @Override
660     public void run()
661     {
662         if(_onError!=null)
663         {
664             Throwable th=_onError;
665             _onError=null;
666             _writeListener.onError(th);
667             close();
668         }
669         if (_state.get()==State.READY)
670         {
671             try
672             {
673                 _writeListener.onWritePossible();
674             }
675             catch (Throwable e)
676             {
677                 _writeListener.onError(e);
678                 close();
679             }
680         }
681     }
682 
683     private class AsyncWrite extends AsyncFlush
684     {
685         private final ByteBuffer _buffer;
686         private final boolean _complete;
687         private final int _len;
688 
689         public AsyncWrite(byte[] b, int off, int len, boolean complete)
690         {
691             _buffer=ByteBuffer.wrap(b, off, len);
692             _complete=complete;
693             _len=len;
694         }
695 
696         public AsyncWrite(ByteBuffer buffer, boolean complete)
697         {
698             _buffer=buffer;
699             _complete=complete;
700             _len=buffer.remaining();
701         }
702 
703         @Override
704         protected boolean process()
705         {
706             // flush any content from the aggregate
707             if (BufferUtil.hasContent(_aggregate))
708             {
709                 _channel.write(_aggregate, _complete && _len==0, this);
710                 return false;
711             }
712 
713             if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
714             {
715                 BufferUtil.put(_buffer,_aggregate);
716             }
717             else if (_len>0 && !_flushed)
718             {
719                 _flushed=true;
720                 _channel.write(_buffer, _complete, this);
721                 return false;
722             }
723             else if (_len==0 && !_flushed)
724             {
725                 _flushed=true;
726                 _channel.write(BufferUtil.EMPTY_BUFFER, _complete, this);
727                 return false;
728             }
729 
730             if (_complete)
731                 closed();
732             return true;
733         }
734     }
735 
736     private class AsyncFlush extends IteratingCallback
737     {
738         protected boolean _flushed;
739 
740         public AsyncFlush()
741         {
742         }
743 
744         @Override
745         protected boolean process()
746         {
747             if (BufferUtil.hasContent(_aggregate))
748             {
749                 _flushed=true;
750                 _channel.write(_aggregate, false, this);
751                 return false;
752             }
753 
754             if (!_flushed)
755             {
756                 _flushed=true;
757                 _channel.write(BufferUtil.EMPTY_BUFFER,false,this);
758                 return false;
759             }
760 
761             return true;
762         }
763 
764         @Override
765         protected void completed()
766         {
767             try
768             {
769                 while(true)
770                 {
771                     State last=_state.get();
772                     switch(last)
773                     {
774                         case PENDING:
775                             if (!_state.compareAndSet(State.PENDING, State.ASYNC))
776                                 continue;
777                             break;
778 
779                         case UNREADY:
780                             if (!_state.compareAndSet(State.UNREADY, State.READY))
781                                 continue;
782                             _channel.getState().onWritePossible();
783                             break;
784 
785                         case CLOSED:
786                             _onError=new EofException("Closed");
787                             break;
788 
789                         default:
790                             throw new IllegalStateException();
791                     }
792                     break;
793                 }
794             }
795             catch (Exception e)
796             {
797                 _onError=e;
798                 _channel.getState().onWritePossible();
799             }
800         }
801 
802         @Override
803         public void failed(Throwable e)
804         {
805             super.failed(e);
806             _onError=e;
807             _channel.getState().onWritePossible();
808         }
809 
810 
811     }
812 
813 
814     /* ------------------------------------------------------------ */
815     /** An iterating callback that will take content from an
816      * InputStream and write it to the associated {@link HttpChannel}.
817      * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
818      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
819      * be notified as each buffer is written and only once all the input is consumed will the
820      * wrapped {@link Callback#succeeded()} method be called.
821      */
822     private class InputStreamWritingCB extends IteratingNestedCallback
823     {
824         private final InputStream _in;
825         private final ByteBuffer _buffer;
826         private boolean _eof;
827 
828         public InputStreamWritingCB(InputStream in, Callback callback)
829         {
830             super(callback);
831             _in=in;
832             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
833         }
834 
835         @Override
836         protected boolean process() throws Exception
837         {
838             // Only return if EOF has previously been read and thus
839             // a write done with EOF=true
840             if (_eof)
841             {
842                 // Handle EOF
843                 _in.close();
844                 closed();
845                 _channel.getByteBufferPool().release(_buffer);
846                 return true;
847             }
848             
849             // Read until buffer full or EOF
850             int len=0;
851             while (len<_buffer.capacity() && !_eof)
852             {
853                 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
854                 if (r<0)
855                     _eof=true;
856                 else
857                     len+=r;
858             }
859 
860             // write what we have
861             _buffer.position(0);
862             _buffer.limit(len);
863             _channel.write(_buffer,_eof,this);
864 
865             return false;
866         }
867 
868         @Override
869         public void failed(Throwable x)
870         {
871             super.failed(x);
872             _channel.getByteBufferPool().release(_buffer);
873             try
874             {
875                 _in.close();
876             }
877             catch (IOException e)
878             {
879                 LOG.ignore(e);
880             }
881         }
882 
883     }
884 
885     /* ------------------------------------------------------------ */
886     /** An iterating callback that will take content from a
887      * ReadableByteChannel and write it to the {@link HttpChannel}.
888      * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
889      * {@link HttpChannel#useDirectBuffers()} is true.
890      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
891      * be notified as each buffer is written and only once all the input is consumed will the
892      * wrapped {@link Callback#succeeded()} method be called.
893      */
894     private class ReadableByteChannelWritingCB extends IteratingNestedCallback
895     {
896         private final ReadableByteChannel _in;
897         private final ByteBuffer _buffer;
898         private boolean _eof;
899 
900         public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
901         {
902             super(callback);
903             _in=in;
904             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
905         }
906 
907         @Override
908         protected boolean process() throws Exception
909         {
910             // Only return if EOF has previously been read and thus
911             // a write done with EOF=true
912             if (_eof)
913             {
914                 _in.close();
915                 closed();
916                 _channel.getByteBufferPool().release(_buffer);
917                 return true;
918             }
919             
920             // Read from stream until buffer full or EOF
921             _buffer.clear();
922             while (_buffer.hasRemaining() && !_eof)
923               _eof = (_in.read(_buffer)) <  0;
924 
925             // write what we have
926             _buffer.flip();
927             _channel.write(_buffer,_eof,this);
928 
929             return false;
930         }
931 
932         @Override
933         public void failed(Throwable x)
934         {
935             super.failed(x);
936             _channel.getByteBufferPool().release(_buffer);
937             try
938             {
939                 _in.close();
940             }
941             catch (IOException e)
942             {
943                 LOG.ignore(e);
944             }
945         }
946     }
947 
948 }