View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.ReadableByteChannel;
25  import java.nio.channels.WritePendingException;
26  import java.util.concurrent.atomic.AtomicReference;
27  import javax.servlet.RequestDispatcher;
28  import javax.servlet.ServletOutputStream;
29  import javax.servlet.ServletRequest;
30  import javax.servlet.ServletResponse;
31  import javax.servlet.WriteListener;
32  
33  import org.eclipse.jetty.http.HttpContent;
34  import org.eclipse.jetty.io.EofException;
35  import org.eclipse.jetty.util.BufferUtil;
36  import org.eclipse.jetty.util.Callback;
37  import org.eclipse.jetty.util.IteratingCallback;
38  import org.eclipse.jetty.util.IteratingNestedCallback;
39  import org.eclipse.jetty.util.SharedBlockingCallback;
40  import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  
44  /**
45   * <p>{@link HttpOutput} implements {@link ServletOutputStream}
46   * as required by the Servlet specification.</p>
47   * <p>{@link HttpOutput} buffers content written by the application until a
48   * further write will overflow the buffer, at which point it triggers a commit
49   * of the response.</p>
50   * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
51   * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
52   * close the stream, to be reopened after the inclusion ends.</p>
53   */
54  public class HttpOutput extends ServletOutputStream implements Runnable
55  {
56      private static Logger LOG = Log.getLogger(HttpOutput.class);
57      private final HttpChannel<?> _channel;
58      private final SharedBlockingCallback _writeblock=new SharedBlockingCallback()
59      {
60          @Override
61          protected long getIdleTimeout()
62          {
63              return _channel.getIdleTimeout();
64          }
65      };
66      private long _written;
67      private ByteBuffer _aggregate;
68      private int _bufferSize;
69      private int _commitSize;
70      private WriteListener _writeListener;
71      private volatile Throwable _onError;
72  
73      /*
74      ACTION             OPEN       ASYNC      READY      PENDING       UNREADY       CLOSED
75      -----------------------------------------------------------------------------------------------------
76      setWriteListener() READY->owp ise        ise        ise           ise           ise
77      write()            OPEN       ise        PENDING    wpe           wpe           eof
78      flush()            OPEN       ise        PENDING    wpe           wpe           eof
79      close()            CLOSED     CLOSED     CLOSED     CLOSED        wpe           CLOSED
80      isReady()          OPEN:true  READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
81      write completed    -          -          -          ASYNC         READY->owp    -
82      
83      */
84      enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
85      private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
86  
87      public HttpOutput(HttpChannel<?> channel)
88      {
89          _channel = channel;
90          HttpConfiguration config = channel.getHttpConfiguration();
91          _bufferSize = config.getOutputBufferSize();
92          _commitSize = config.getOutputAggregationSize();
93      }
94      
95      public HttpChannel<?> getHttpChannel()
96      {
97          return _channel;
98      }
99      
100     public boolean isWritten()
101     {
102         return _written > 0;
103     }
104 
105     public long getWritten()
106     {
107         return _written;
108     }
109 
110     public void reset()
111     {
112         _written = 0;
113         reopen();
114     }
115 
116     public void reopen()
117     {
118         _state.set(OutputState.OPEN);
119     }
120 
121     public boolean isAllContentWritten()
122     {
123         return _channel.getResponse().isAllContentWritten(_written);
124     }
125 
126     protected Blocker acquireWriteBlockingCallback() throws IOException
127     {
128         return _writeblock.acquire();
129     }
130     
131     protected void write(ByteBuffer content, boolean complete) throws IOException
132     {
133         try (Blocker blocker=_writeblock.acquire())
134         {        
135             write(content,complete,blocker);
136             blocker.block();
137         }
138     }
139     
140     protected void write(ByteBuffer content, boolean complete, Callback callback)
141     {
142         _channel.write(content,complete,callback);
143     }
144     
145     @Override
146     public void close()
147     {
148         loop: while(true)
149         {
150             OutputState state=_state.get();
151             switch (state)
152             {
153                 case CLOSED:
154                     break loop;
155                     
156                 case UNREADY:
157                     if (_state.compareAndSet(state,OutputState.ERROR))
158                         _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
159                     continue;
160                     
161                 default:
162                     if (_state.compareAndSet(state,OutputState.CLOSED))
163                     {
164                         try
165                         {
166                             write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
167                         }
168                         catch(IOException e)
169                         {
170                             LOG.debug(e);
171                             _channel.abort();
172                         }
173                         releaseBuffer();
174                         return;
175                     }
176             }
177         }
178     }
179 
180     /* Called to indicated that the output is already closed (write with last==true performed) and the state needs to be updated to match */
181     void closed()
182     {
183         loop: while(true)
184         {
185             OutputState state=_state.get();
186             switch (state)
187             {
188                 case CLOSED:
189                     break loop;
190                     
191                 case UNREADY:
192                     if (_state.compareAndSet(state,OutputState.ERROR))
193                         _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
194                     continue;
195                     
196                 default:
197                     if (_state.compareAndSet(state,OutputState.CLOSED))
198                     {
199                         try
200                         {
201                             _channel.getResponse().closeOutput();
202                         }
203                         catch(IOException e)
204                         {
205                             LOG.debug(e);
206                             _channel.abort();
207                         }
208                         releaseBuffer();
209                         return;
210                     }
211             }
212         }
213     }
214 
215     private void releaseBuffer()
216     {
217         if (_aggregate != null)
218         {
219             _channel.getConnector().getByteBufferPool().release(_aggregate);
220             _aggregate = null;
221         }
222     }
223 
224     public boolean isClosed()
225     {
226         return _state.get()==OutputState.CLOSED;
227     }
228 
229     @Override
230     public void flush() throws IOException
231     {
232         while(true)
233         {
234             switch(_state.get())
235             {
236                 case OPEN:
237                     write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
238                     return;
239 
240                 case ASYNC:
241                     throw new IllegalStateException("isReady() not called");
242 
243                 case READY:
244                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
245                         continue;
246                     new AsyncFlush().iterate();
247                     return;
248 
249                 case PENDING:
250                 case UNREADY:
251                     throw new WritePendingException();
252 
253                 case ERROR:
254                     throw new EofException(_onError);
255                     
256                 case CLOSED:
257                     return;
258             }
259             break;
260         }
261     }
262 
263 
264     @Override
265     public void write(byte[] b, int off, int len) throws IOException
266     {
267         _written+=len;
268         boolean complete=_channel.getResponse().isAllContentWritten(_written);
269 
270         // Async or Blocking ?
271         while(true)
272         {
273             switch(_state.get())
274             {
275                 case OPEN:
276                     // process blocking below
277                     break;
278 
279                 case ASYNC:
280                     throw new IllegalStateException("isReady() not called");
281 
282                 case READY:
283                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
284                         continue;
285 
286                     // Should we aggregate?
287                     if (!complete && len<=_commitSize)
288                     {
289                         if (_aggregate == null)
290                             _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
291 
292                         // YES - fill the aggregate with content from the buffer
293                         int filled = BufferUtil.fill(_aggregate, b, off, len);
294 
295                         // return if we are not complete, not full and filled all the content
296                         if (filled==len && !BufferUtil.isFull(_aggregate))
297                         {
298                             if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
299                                 throw new IllegalStateException();
300                             return;
301                         }
302 
303                         // adjust offset/length
304                         off+=filled;
305                         len-=filled;
306                     }
307 
308                     // Do the asynchronous writing from the callback
309                     new AsyncWrite(b,off,len,complete).iterate();
310                     return;
311 
312                 case PENDING:
313                 case UNREADY:
314                     throw new WritePendingException();
315 
316                 case ERROR:
317                     throw new EofException(_onError);
318                     
319                 case CLOSED:
320                     throw new EofException("Closed");
321             }
322             break;
323         }
324 
325 
326         // handle blocking write
327 
328         // Should we aggregate?
329         int capacity = getBufferSize();
330         if (!complete && len<=_commitSize)
331         {
332             if (_aggregate == null)
333                 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
334 
335             // YES - fill the aggregate with content from the buffer
336             int filled = BufferUtil.fill(_aggregate, b, off, len);
337 
338             // return if we are not complete, not full and filled all the content
339             if (filled==len && !BufferUtil.isFull(_aggregate))
340                 return;
341 
342             // adjust offset/length
343             off+=filled;
344             len-=filled;
345         }
346 
347         // flush any content from the aggregate
348         if (BufferUtil.hasContent(_aggregate))
349         {
350             write(_aggregate, complete && len==0);
351 
352             // should we fill aggregate again from the buffer?
353             if (len>0 && !complete && len<=_commitSize)
354             {
355                 BufferUtil.append(_aggregate, b, off, len);
356                 return;
357             }
358         }
359 
360         // write any remaining content in the buffer directly
361         if (len>0)
362         {
363             ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
364             ByteBuffer view = wrap.duplicate();
365 
366             // write a buffer capacity at a time to avoid JVM pooling large direct buffers
367             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
368             while (len>getBufferSize())
369             {
370                 int p=view.position();
371                 int l=p+getBufferSize();
372                 view.limit(p+getBufferSize());
373                 write(view,false);
374                 len-=getBufferSize();
375                 view.limit(l+Math.min(len,getBufferSize()));
376                 view.position(l);
377             }
378             write(view,complete);
379         }
380         else if (complete)
381             write(BufferUtil.EMPTY_BUFFER,complete);
382 
383         if (complete)
384             closed();
385 
386     }
387 
388     public void write(ByteBuffer buffer) throws IOException
389     {
390         _written+=buffer.remaining();
391         boolean complete=_channel.getResponse().isAllContentWritten(_written);
392 
393         // Async or Blocking ?
394         while(true)
395         {
396             switch(_state.get())
397             {
398                 case OPEN:
399                     // process blocking below
400                     break;
401 
402                 case ASYNC:
403                     throw new IllegalStateException("isReady() not called");
404 
405                 case READY:
406                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
407                         continue;
408 
409                     // Do the asynchronous writing from the callback
410                     new AsyncWrite(buffer,complete).iterate();
411                     return;
412 
413                 case PENDING:
414                 case UNREADY:
415                     throw new WritePendingException();
416 
417                 case ERROR:
418                     throw new EofException(_onError);
419                     
420                 case CLOSED:
421                     throw new EofException("Closed");
422             }
423             break;
424         }
425 
426 
427         // handle blocking write
428         int len=BufferUtil.length(buffer);
429 
430         // flush any content from the aggregate
431         if (BufferUtil.hasContent(_aggregate))
432             write(_aggregate, complete && len==0);
433 
434         // write any remaining content in the buffer directly
435         if (len>0)
436             write(buffer, complete);
437         else if (complete)
438             write(BufferUtil.EMPTY_BUFFER,complete);
439 
440         if (complete)
441             closed();
442     }
443 
444     @Override
445     public void write(int b) throws IOException
446     {
447         _written+=1;
448         boolean complete=_channel.getResponse().isAllContentWritten(_written);
449 
450         // Async or Blocking ?
451         while(true)
452         {
453             switch(_state.get())
454             {
455                 case OPEN:
456                     if (_aggregate == null)
457                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
458                     BufferUtil.append(_aggregate, (byte)b);
459 
460                     // Check if all written or full
461                     if (complete || BufferUtil.isFull(_aggregate))
462                     {
463                         try(Blocker blocker=_writeblock.acquire())
464                         {
465                             write(_aggregate, complete, blocker);
466                             blocker.block();
467                         }
468                         if (complete)
469                             closed();
470                     }
471                     break;
472 
473                 case ASYNC:
474                     throw new IllegalStateException("isReady() not called");
475 
476                 case READY:
477                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
478                         continue;
479 
480                     if (_aggregate == null)
481                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
482                     BufferUtil.append(_aggregate, (byte)b);
483 
484                     // Check if all written or full
485                     if (!complete && !BufferUtil.isFull(_aggregate))
486                     {
487                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
488                             throw new IllegalStateException();
489                         return;
490                     }
491 
492                     // Do the asynchronous writing from the callback
493                     new AsyncFlush().iterate();
494                     return;
495 
496                 case PENDING:
497                 case UNREADY:
498                     throw new WritePendingException();
499 
500                 case ERROR:
501                     throw new EofException(_onError);
502                     
503                 case CLOSED:
504                     throw new EofException("Closed");
505             }
506             break;
507         }
508     }
509 
510     @Override
511     public void print(String s) throws IOException
512     {
513         if (isClosed())
514             throw new IOException("Closed");
515 
516         write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
517     }
518 
519     /* ------------------------------------------------------------ */
520     /** Blocking send of content.
521      * @param content The content to send.
522      * @throws IOException
523      */
524     public void sendContent(ByteBuffer content) throws IOException
525     {
526         try(Blocker blocker=_writeblock.acquire())
527         {
528             write(content,true,blocker);
529             blocker.block();
530         }
531     }
532 
533     /* ------------------------------------------------------------ */
534     /** Blocking send of content.
535      * @param in The content to send
536      * @throws IOException
537      */
538     public void sendContent(InputStream in) throws IOException
539     {
540         try(Blocker blocker=_writeblock.acquire())
541         {
542             new InputStreamWritingCB(in,blocker).iterate();
543             blocker.block();
544         }
545     }
546 
547     /* ------------------------------------------------------------ */
548     /** Blocking send of content.
549      * @param in The content to send
550      * @throws IOException
551      */
552     public void sendContent(ReadableByteChannel in) throws IOException
553     {
554         try(Blocker blocker=_writeblock.acquire())
555         {
556             new ReadableByteChannelWritingCB(in,blocker).iterate();
557             blocker.block();
558         }
559     }
560 
561 
562     /* ------------------------------------------------------------ */
563     /** Blocking send of content.
564      * @param content The content to send
565      * @throws IOException
566      */
567     public void sendContent(HttpContent content) throws IOException
568     {
569         try(Blocker blocker=_writeblock.acquire())
570         {
571             sendContent(content,blocker);
572             blocker.block();
573         }
574     }
575 
576     /* ------------------------------------------------------------ */
577     /** Asynchronous send of content.
578      * @param content The content to send
579      * @param callback The callback to use to notify success or failure
580      */
581     public void sendContent(ByteBuffer content, final Callback callback)
582     {
583         write(content,true,new Callback()
584         {
585             @Override
586             public void succeeded()
587             {
588                 closed();
589                 callback.succeeded();
590             }
591 
592             @Override
593             public void failed(Throwable x)
594             {
595                 callback.failed(x);
596             }
597         });
598     }
599 
600     /* ------------------------------------------------------------ */
601     /** Asynchronous send of content.
602      * @param in The content to send as a stream.  The stream will be closed
603      * after reading all content.
604      * @param callback The callback to use to notify success or failure
605      */
606     public void sendContent(InputStream in, Callback callback)
607     {
608         new InputStreamWritingCB(in,callback).iterate();
609     }
610 
611     /* ------------------------------------------------------------ */
612     /** Asynchronous send of content.
613      * @param in The content to send as a channel.  The channel will be closed
614      * after reading all content.
615      * @param callback The callback to use to notify success or failure
616      */
617     public void sendContent(ReadableByteChannel in, Callback callback)
618     {
619         new ReadableByteChannelWritingCB(in,callback).iterate();
620     }
621 
622     /* ------------------------------------------------------------ */
623     /** Asynchronous send of content.
624      * @param httpContent The content to send
625      * @param callback The callback to use to notify success or failure
626      */
627     public void sendContent(HttpContent httpContent, Callback callback)
628     {
629         if (BufferUtil.hasContent(_aggregate))
630         {
631             callback.failed(new IOException("cannot sendContent() after write()"));
632             return;
633         }
634         if (_channel.isCommitted())
635         {
636             callback.failed(new IOException("committed"));
637             return;
638         }
639 
640         while (true)
641         {
642             switch(_state.get())
643             {
644                 case OPEN:
645                     if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
646                         continue;
647                     break;
648                 case ERROR:
649                     callback.failed(new EofException(_onError));
650                     return;
651                     
652                 case CLOSED:
653                     callback.failed(new EofException("Closed"));
654                     return;
655                 default:
656                     throw new IllegalStateException();
657             }
658             break;
659         }
660         ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
661         if (buffer == null)
662             buffer = httpContent.getIndirectBuffer();
663 
664         if (buffer!=null)
665         {
666             if (LOG.isDebugEnabled())
667                 LOG.debug("sendContent({}=={},{},direct={})",httpContent,BufferUtil.toDetailString(buffer),callback,_channel.useDirectBuffers());
668             
669             sendContent(buffer,callback);
670             return;
671         }
672 
673         try
674         {
675             ReadableByteChannel rbc=httpContent.getReadableByteChannel();
676             if (rbc!=null)
677             {
678                 if (LOG.isDebugEnabled())
679                     LOG.debug("sendContent({}=={},{},direct={})",httpContent,rbc,callback,_channel.useDirectBuffers());
680                 // Close of the rbc is done by the async sendContent
681                 sendContent(rbc,callback);
682                 return;
683             }
684 
685             InputStream in = httpContent.getInputStream();
686             if ( in!=null )
687             {
688                 if (LOG.isDebugEnabled())
689                     LOG.debug("sendContent({}=={},{},direct={})",httpContent,in,callback,_channel.useDirectBuffers());
690                 sendContent(in,callback);
691                 return;
692             }
693         }
694         catch(Throwable th)
695         {
696             callback.failed(th);
697             return;
698         }
699 
700         callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
701     }
702 
703     public int getBufferSize()
704     {
705         return _bufferSize;
706     }
707 
708     public void setBufferSize(int size)
709     {
710         _bufferSize = size;
711         _commitSize = size;
712     }
713 
714     public void resetBuffer()
715     {
716         if (BufferUtil.hasContent(_aggregate))
717             BufferUtil.clear(_aggregate);
718     }
719 
720     @Override
721     public void setWriteListener(WriteListener writeListener)
722     {
723         if (!_channel.getState().isAsync())
724             throw new IllegalStateException("!ASYNC");
725 
726         if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
727         {
728             _writeListener = writeListener;
729             _channel.getState().onWritePossible();
730         }
731         else
732             throw new IllegalStateException();
733     }
734 
735     /**
736      * @see javax.servlet.ServletOutputStream#isReady()
737      */
738     @Override
739     public boolean isReady()
740     {
741         while (true)
742         {
743             switch(_state.get())
744             {
745                 case OPEN:
746                     return true;
747                 case ASYNC:
748                     if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
749                         continue;
750                     return true;
751                 case READY:
752                     return true;
753                 case PENDING:
754                     if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
755                         continue;
756                     return false;
757                 case UNREADY:
758                     return false;
759 
760                 case ERROR:
761                     return true;
762                     
763                 case CLOSED:
764                     return true;
765             }
766         }
767     }
768 
769     @Override
770     public void run()
771     {
772         loop: while (true)
773         {
774             OutputState state = _state.get();
775 
776             if(_onError!=null)
777             {
778                 switch(state)
779                 {
780                     case CLOSED:
781                     case ERROR:
782                         _onError=null;
783                         break loop;
784 
785                     default:
786                         if (_state.compareAndSet(state, OutputState.ERROR))
787                         {
788                             Throwable th=_onError;
789                             _onError=null;
790                             if (LOG.isDebugEnabled())
791                                 LOG.debug("onError",th);
792                             _writeListener.onError(th);
793                             close();
794 
795                             break loop;
796                         }
797 
798                 }
799                 continue;
800             }
801             
802             switch(_state.get())
803             {
804                 case CLOSED:
805                     // even though a write is not possible, because a close has 
806                     // occurred, we need to call onWritePossible to tell async
807                     // producer that the last write completed.
808                     // so fall through
809                 case READY:
810                     try
811                     {
812                         _writeListener.onWritePossible();
813                         break loop;
814                     }
815                     catch (Throwable e)
816                     {
817                         _onError=e;
818                     }
819                     break;
820                     
821                 default:
822                     _onError=new IllegalStateException("state="+_state.get());
823             }
824         }
825     }
826     
827     @Override
828     public String toString()
829     {
830         return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
831     }
832     
833     private abstract class AsyncICB extends IteratingCallback
834     {
835         @Override
836         protected void onCompleteSuccess()
837         {
838             while(true)
839             {
840                 OutputState last=_state.get();
841                 switch(last)
842                 {
843                     case PENDING:
844                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
845                             continue;
846                         break;
847 
848                     case UNREADY:
849                         if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
850                             continue;
851                         _channel.getState().onWritePossible();
852                         break;
853 
854                     case CLOSED:
855                         break;
856 
857                     default:
858                         throw new IllegalStateException();
859                 }
860                 break;
861             }
862         }
863 
864         @Override
865         public void onCompleteFailure(Throwable e)
866         {
867             _onError=e==null?new IOException():e;
868             _channel.getState().onWritePossible();
869         }
870     }
871     
872     
873     private class AsyncFlush extends AsyncICB
874     {
875         protected volatile boolean _flushed;
876 
877         public AsyncFlush()
878         {
879         }
880 
881         @Override
882         protected Action process()
883         {
884             if (BufferUtil.hasContent(_aggregate))
885             {
886                 _flushed=true;
887                 write(_aggregate, false, this);
888                 return Action.SCHEDULED;
889             }
890 
891             if (!_flushed)
892             {
893                 _flushed=true;
894                 write(BufferUtil.EMPTY_BUFFER,false,this);
895                 return Action.SCHEDULED;
896             }
897 
898             return Action.SUCCEEDED;
899         }
900     }
901 
902 
903 
904     private class AsyncWrite extends AsyncICB
905     {
906         private final ByteBuffer _buffer;
907         private final ByteBuffer _slice;
908         private final boolean _complete;
909         private final int _len;
910         protected volatile boolean _completed;
911 
912         public AsyncWrite(byte[] b, int off, int len, boolean complete)
913         {
914             _buffer=ByteBuffer.wrap(b, off, len);
915             _len=len;
916             // always use a view for large byte arrays to avoid JVM pooling large direct buffers
917             _slice=_len<getBufferSize()?null:_buffer.duplicate();
918             _complete=complete;
919         }
920 
921         public AsyncWrite(ByteBuffer buffer, boolean complete)
922         {
923             _buffer=buffer;
924             _len=buffer.remaining();
925             // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
926             _slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
927             _complete=complete;
928         }
929 
930         @Override
931         protected Action process()
932         {
933             // flush any content from the aggregate
934             if (BufferUtil.hasContent(_aggregate))
935             {
936                 _completed=_len==0;
937                 write(_aggregate, _complete && _completed, this);
938                 return Action.SCHEDULED;
939             }
940 
941             // Can we just aggregate the remainder?
942             if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
943             {
944                 int position = BufferUtil.flipToFill(_aggregate);
945                 BufferUtil.put(_buffer,_aggregate);
946                 BufferUtil.flipToFlush(_aggregate, position);
947                 return Action.SUCCEEDED;
948             }
949             
950             // Is there data left to write?
951             if (_buffer.hasRemaining())
952             {
953                 // if there is no slice, just write it
954                 if (_slice==null)
955                 {
956                     _completed=true;
957                     write(_buffer, _complete, this);
958                     return Action.SCHEDULED;
959                 }
960                 
961                 // otherwise take a slice
962                 int p=_buffer.position();
963                 int l=Math.min(getBufferSize(),_buffer.remaining());
964                 int pl=p+l;
965                 _slice.limit(pl);
966                 _buffer.position(pl);
967                 _slice.position(p);
968                 _completed=!_buffer.hasRemaining();
969                 write(_slice, _complete && _completed, this);
970                 return Action.SCHEDULED;
971             }
972             
973             // all content written, but if we have not yet signal completion, we
974             // need to do so
975             if (_complete && !_completed)
976             {
977                 _completed=true;
978                 write(BufferUtil.EMPTY_BUFFER, _complete, this);
979                 return Action.SCHEDULED;
980             }
981 
982             return Action.SUCCEEDED;
983         }
984 
985         @Override
986         protected void onCompleteSuccess()
987         {
988             super.onCompleteSuccess();
989             if (_complete)
990                 closed();
991         }
992         
993         
994     }
995 
996 
997     /* ------------------------------------------------------------ */
998     /** An iterating callback that will take content from an
999      * InputStream and write it to the associated {@link HttpChannel}.
1000      * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
1001      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1002      * be notified as each buffer is written and only once all the input is consumed will the
1003      * wrapped {@link Callback#succeeded()} method be called.
1004      */
1005     private class InputStreamWritingCB extends IteratingNestedCallback
1006     {
1007         private final InputStream _in;
1008         private final ByteBuffer _buffer;
1009         private boolean _eof;
1010 
1011         public InputStreamWritingCB(InputStream in, Callback callback)
1012         {
1013             super(callback);
1014             _in=in;
1015             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1016         }
1017 
1018         @Override
1019         protected Action process() throws Exception
1020         {
1021             // Only return if EOF has previously been read and thus
1022             // a write done with EOF=true
1023             if (_eof)
1024             {
1025                 // Handle EOF
1026                 _in.close();
1027                 closed();
1028                 _channel.getByteBufferPool().release(_buffer);
1029                 return Action.SUCCEEDED;
1030             }
1031             
1032             // Read until buffer full or EOF
1033             int len=0;
1034             while (len<_buffer.capacity() && !_eof)
1035             {
1036                 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1037                 if (r<0)
1038                     _eof=true;
1039                 else
1040                     len+=r;
1041             }
1042 
1043             // write what we have
1044             _buffer.position(0);
1045             _buffer.limit(len);
1046             write(_buffer,_eof,this);
1047             return Action.SCHEDULED;
1048         }
1049 
1050         @Override
1051         public void onCompleteFailure(Throwable x)
1052         {
1053             super.onCompleteFailure(x);
1054             _channel.getByteBufferPool().release(_buffer);
1055             try
1056             {
1057                 _in.close();
1058             }
1059             catch (IOException e)
1060             {
1061                 LOG.ignore(e);
1062             }
1063         }
1064 
1065     }
1066 
1067     /* ------------------------------------------------------------ */
1068     /** An iterating callback that will take content from a
1069      * ReadableByteChannel and write it to the {@link HttpChannel}.
1070      * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
1071      * {@link HttpChannel#useDirectBuffers()} is true.
1072      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1073      * be notified as each buffer is written and only once all the input is consumed will the
1074      * wrapped {@link Callback#succeeded()} method be called.
1075      */
1076     private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1077     {
1078         private final ReadableByteChannel _in;
1079         private final ByteBuffer _buffer;
1080         private boolean _eof;
1081 
1082         public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1083         {
1084             super(callback);
1085             _in=in;
1086             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1087         }
1088 
1089         @Override
1090         protected Action process() throws Exception
1091         {
1092             // Only return if EOF has previously been read and thus
1093             // a write done with EOF=true
1094             if (_eof)
1095             {
1096                 _in.close();
1097                 closed();
1098                 _channel.getByteBufferPool().release(_buffer);
1099                 return Action.SUCCEEDED;
1100             }
1101             
1102             // Read from stream until buffer full or EOF
1103             _buffer.clear();
1104             while (_buffer.hasRemaining() && !_eof)
1105               _eof = (_in.read(_buffer)) <  0;
1106 
1107             // write what we have
1108             _buffer.flip();
1109             write(_buffer,_eof,this);
1110 
1111             return Action.SCHEDULED;
1112         }
1113 
1114         @Override
1115         public void onCompleteFailure(Throwable x)
1116         {
1117             super.onCompleteFailure(x);
1118             _channel.getByteBufferPool().release(_buffer);
1119             try
1120             {
1121                 _in.close();
1122             }
1123             catch (IOException e)
1124             {
1125                 LOG.ignore(e);
1126             }
1127         }
1128     }
1129 }