View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.ReadableByteChannel;
25  import java.nio.channels.WritePendingException;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import javax.servlet.ServletOutputStream;
29  import javax.servlet.WriteListener;
30  
31  import org.eclipse.jetty.http.HttpContent;
32  import org.eclipse.jetty.io.EofException;
33  import org.eclipse.jetty.util.BufferUtil;
34  import org.eclipse.jetty.util.Callback;
35  import org.eclipse.jetty.util.IteratingCallback;
36  import org.eclipse.jetty.util.IteratingNestedCallback;
37  import org.eclipse.jetty.util.SharedBlockingCallback;
38  import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  
42  /**
43   * <p>{@link HttpOutput} implements {@link ServletOutputStream}
44   * as required by the Servlet specification.</p>
45   * <p>{@link HttpOutput} buffers content written by the application until a
46   * further write will overflow the buffer, at which point it triggers a commit
47   * of the response.</p>
48   * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
49   * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
50   * close the stream, to be reopened after the inclusion ends.</p>
51   */
52  public class HttpOutput extends ServletOutputStream implements Runnable
53  {
54      private static Logger LOG = Log.getLogger(HttpOutput.class);
55      private final HttpChannel<?> _channel;
56      private final SharedBlockingCallback _writeblock=new SharedBlockingCallback()
57      {
58          @Override
59          protected long getIdleTimeout()
60          {
61              return _channel.getIdleTimeout();
62          }
63      };
64      private long _written;
65      private ByteBuffer _aggregate;
66      private int _bufferSize;
67      private int _commitSize;
68      private WriteListener _writeListener;
69      private volatile Throwable _onError;
70  
71      /*
72      ACTION             OPEN       ASYNC      READY      PENDING       UNREADY       CLOSED
73      -----------------------------------------------------------------------------------------------------
74      setWriteListener() READY->owp ise        ise        ise           ise           ise
75      write()            OPEN       ise        PENDING    wpe           wpe           eof
76      flush()            OPEN       ise        PENDING    wpe           wpe           eof
77      close()            CLOSED     CLOSED     CLOSED     CLOSED        wpe           CLOSED
78      isReady()          OPEN:true  READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
79      write completed    -          -          -          ASYNC         READY->owp    -
80      
81      */
82      enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
83      private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
84  
85      public HttpOutput(HttpChannel<?> channel)
86      {
87          _channel = channel;
88          _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
89          _commitSize=_bufferSize/4;
90      }
91      
92      public HttpChannel<?> getHttpChannel()
93      {
94          return _channel;
95      }
96      
97      public boolean isWritten()
98      {
99          return _written > 0;
100     }
101 
102     public long getWritten()
103     {
104         return _written;
105     }
106 
107     public void reset()
108     {
109         _written = 0;
110         reopen();
111     }
112 
113     public void reopen()
114     {
115         _state.set(OutputState.OPEN);
116     }
117 
118     public boolean isAllContentWritten()
119     {
120         return _channel.getResponse().isAllContentWritten(_written);
121     }
122 
123     protected Blocker acquireWriteBlockingCallback() throws IOException
124     {
125         return _writeblock.acquire();
126     }
127     
128     protected void write(ByteBuffer content, boolean complete) throws IOException
129     {
130         try (Blocker blocker=_writeblock.acquire())
131         {        
132             write(content,complete,blocker);
133             blocker.block();
134         }
135     }
136     
137     protected void write(ByteBuffer content, boolean complete, Callback callback)
138     {
139         _channel.write(content,complete,callback);
140     }
141     
142     @Override
143     public void close()
144     {
145         loop: while(true)
146         {
147             OutputState state=_state.get();
148             switch (state)
149             {
150                 case CLOSED:
151                     break loop;
152                     
153                 case UNREADY:
154                     if (_state.compareAndSet(state,OutputState.ERROR))
155                         _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
156                     continue;
157                     
158                 default:
159                     if (_state.compareAndSet(state,OutputState.CLOSED))
160                     {
161                         try
162                         {
163                             write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
164                         }
165                         catch(IOException e)
166                         {
167                             LOG.debug(e);
168                             _channel.abort();
169                         }
170                         releaseBuffer();
171                         return;
172                     }
173             }
174         }
175     }
176 
177     /* Called to indicated that the output is already closed (write with last==true performed) and the state needs to be updated to match */
178     void closed()
179     {
180         loop: while(true)
181         {
182             OutputState state=_state.get();
183             switch (state)
184             {
185                 case CLOSED:
186                     break loop;
187                     
188                 case UNREADY:
189                     if (_state.compareAndSet(state,OutputState.ERROR))
190                         _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
191                     continue;
192                     
193                 default:
194                     if (_state.compareAndSet(state,OutputState.CLOSED))
195                     {
196                         try
197                         {
198                             _channel.getResponse().closeOutput();
199                         }
200                         catch(IOException e)
201                         {
202                             LOG.debug(e);
203                             _channel.abort();
204                         }
205                         releaseBuffer();
206                         return;
207                     }
208             }
209         }
210     }
211 
212     private void releaseBuffer()
213     {
214         if (_aggregate != null)
215         {
216             _channel.getConnector().getByteBufferPool().release(_aggregate);
217             _aggregate = null;
218         }
219     }
220 
221     public boolean isClosed()
222     {
223         return _state.get()==OutputState.CLOSED;
224     }
225 
226     @Override
227     public void flush() throws IOException
228     {
229         while(true)
230         {
231             switch(_state.get())
232             {
233                 case OPEN:
234                     write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
235                     return;
236 
237                 case ASYNC:
238                     throw new IllegalStateException("isReady() not called");
239 
240                 case READY:
241                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
242                         continue;
243                     new AsyncFlush().iterate();
244                     return;
245 
246                 case PENDING:
247                 case UNREADY:
248                     throw new WritePendingException();
249 
250                 case ERROR:
251                     throw new EofException(_onError);
252                     
253                 case CLOSED:
254                     return;
255             }
256             break;
257         }
258     }
259 
260 
261     @Override
262     public void write(byte[] b, int off, int len) throws IOException
263     {
264         _written+=len;
265         boolean complete=_channel.getResponse().isAllContentWritten(_written);
266 
267         // Async or Blocking ?
268         while(true)
269         {
270             switch(_state.get())
271             {
272                 case OPEN:
273                     // process blocking below
274                     break;
275 
276                 case ASYNC:
277                     throw new IllegalStateException("isReady() not called");
278 
279                 case READY:
280                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
281                         continue;
282 
283                     // Should we aggregate?
284                     if (!complete && len<=_commitSize)
285                     {
286                         if (_aggregate == null)
287                             _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
288 
289                         // YES - fill the aggregate with content from the buffer
290                         int filled = BufferUtil.fill(_aggregate, b, off, len);
291 
292                         // return if we are not complete, not full and filled all the content
293                         if (filled==len && !BufferUtil.isFull(_aggregate))
294                         {
295                             if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
296                                 throw new IllegalStateException();
297                             return;
298                         }
299 
300                         // adjust offset/length
301                         off+=filled;
302                         len-=filled;
303                     }
304 
305                     // Do the asynchronous writing from the callback
306                     new AsyncWrite(b,off,len,complete).iterate();
307                     return;
308 
309                 case PENDING:
310                 case UNREADY:
311                     throw new WritePendingException();
312 
313                 case ERROR:
314                     throw new EofException(_onError);
315                     
316                 case CLOSED:
317                     throw new EofException("Closed");
318             }
319             break;
320         }
321 
322 
323         // handle blocking write
324 
325         // Should we aggregate?
326         int capacity = getBufferSize();
327         if (!complete && len<=_commitSize)
328         {
329             if (_aggregate == null)
330                 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
331 
332             // YES - fill the aggregate with content from the buffer
333             int filled = BufferUtil.fill(_aggregate, b, off, len);
334 
335             // return if we are not complete, not full and filled all the content
336             if (filled==len && !BufferUtil.isFull(_aggregate))
337                 return;
338 
339             // adjust offset/length
340             off+=filled;
341             len-=filled;
342         }
343 
344         // flush any content from the aggregate
345         if (BufferUtil.hasContent(_aggregate))
346         {
347             write(_aggregate, complete && len==0);
348 
349             // should we fill aggregate again from the buffer?
350             if (len>0 && !complete && len<=_commitSize)
351             {
352                 BufferUtil.append(_aggregate, b, off, len);
353                 return;
354             }
355         }
356 
357         // write any remaining content in the buffer directly
358         if (len>0)
359         {
360             ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
361             ByteBuffer view = wrap.duplicate();
362 
363             // write a buffer capacity at a time to avoid JVM pooling large direct buffers
364             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
365             while (len>getBufferSize())
366             {
367                 int p=view.position();
368                 int l=p+getBufferSize();
369                 view.limit(p+getBufferSize());
370                 write(view,false);
371                 len-=getBufferSize();
372                 view.limit(l+Math.min(len,getBufferSize()));
373                 view.position(l);
374             }
375             write(view,complete);
376         }
377         else if (complete)
378             write(BufferUtil.EMPTY_BUFFER,complete);
379 
380         if (complete)
381             closed();
382 
383     }
384 
385     public void write(ByteBuffer buffer) throws IOException
386     {
387         _written+=buffer.remaining();
388         boolean complete=_channel.getResponse().isAllContentWritten(_written);
389 
390         // Async or Blocking ?
391         while(true)
392         {
393             switch(_state.get())
394             {
395                 case OPEN:
396                     // process blocking below
397                     break;
398 
399                 case ASYNC:
400                     throw new IllegalStateException("isReady() not called");
401 
402                 case READY:
403                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
404                         continue;
405 
406                     // Do the asynchronous writing from the callback
407                     new AsyncWrite(buffer,complete).iterate();
408                     return;
409 
410                 case PENDING:
411                 case UNREADY:
412                     throw new WritePendingException();
413 
414                 case ERROR:
415                     throw new EofException(_onError);
416                     
417                 case CLOSED:
418                     throw new EofException("Closed");
419             }
420             break;
421         }
422 
423 
424         // handle blocking write
425         int len=BufferUtil.length(buffer);
426 
427         // flush any content from the aggregate
428         if (BufferUtil.hasContent(_aggregate))
429             write(_aggregate, complete && len==0);
430 
431         // write any remaining content in the buffer directly
432         if (len>0)
433             write(buffer, complete);
434         else if (complete)
435             write(BufferUtil.EMPTY_BUFFER,complete);
436 
437         if (complete)
438             closed();
439     }
440 
441     @Override
442     public void write(int b) throws IOException
443     {
444         _written+=1;
445         boolean complete=_channel.getResponse().isAllContentWritten(_written);
446 
447         // Async or Blocking ?
448         while(true)
449         {
450             switch(_state.get())
451             {
452                 case OPEN:
453                     if (_aggregate == null)
454                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
455                     BufferUtil.append(_aggregate, (byte)b);
456 
457                     // Check if all written or full
458                     if (complete || BufferUtil.isFull(_aggregate))
459                     {
460                         try(Blocker blocker=_writeblock.acquire())
461                         {
462                             write(_aggregate, complete, blocker);
463                             blocker.block();
464                         }
465                         if (complete)
466                             closed();
467                     }
468                     break;
469 
470                 case ASYNC:
471                     throw new IllegalStateException("isReady() not called");
472 
473                 case READY:
474                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
475                         continue;
476 
477                     if (_aggregate == null)
478                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
479                     BufferUtil.append(_aggregate, (byte)b);
480 
481                     // Check if all written or full
482                     if (!complete && !BufferUtil.isFull(_aggregate))
483                     {
484                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
485                             throw new IllegalStateException();
486                         return;
487                     }
488 
489                     // Do the asynchronous writing from the callback
490                     new AsyncFlush().iterate();
491                     return;
492 
493                 case PENDING:
494                 case UNREADY:
495                     throw new WritePendingException();
496 
497                 case ERROR:
498                     throw new EofException(_onError);
499                     
500                 case CLOSED:
501                     throw new EofException("Closed");
502             }
503             break;
504         }
505     }
506 
507     @Override
508     public void print(String s) throws IOException
509     {
510         if (isClosed())
511             throw new IOException("Closed");
512 
513         write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
514     }
515 
516     /* ------------------------------------------------------------ */
517     /** Blocking send of content.
518      * @param content The content to send.
519      * @throws IOException
520      */
521     public void sendContent(ByteBuffer content) throws IOException
522     {
523         try(Blocker blocker=_writeblock.acquire())
524         {
525             write(content,true,blocker);
526             blocker.block();
527         }
528     }
529 
530     /* ------------------------------------------------------------ */
531     /** Blocking send of content.
532      * @param in The content to send
533      * @throws IOException
534      */
535     public void sendContent(InputStream in) throws IOException
536     {
537         try(Blocker blocker=_writeblock.acquire())
538         {
539             new InputStreamWritingCB(in,blocker).iterate();
540             blocker.block();
541         }
542     }
543 
544     /* ------------------------------------------------------------ */
545     /** Blocking send of content.
546      * @param in The content to send
547      * @throws IOException
548      */
549     public void sendContent(ReadableByteChannel in) throws IOException
550     {
551         try(Blocker blocker=_writeblock.acquire())
552         {
553             new ReadableByteChannelWritingCB(in,blocker).iterate();
554             blocker.block();
555         }
556     }
557 
558 
559     /* ------------------------------------------------------------ */
560     /** Blocking send of content.
561      * @param content The content to send
562      * @throws IOException
563      */
564     public void sendContent(HttpContent content) throws IOException
565     {
566         try(Blocker blocker=_writeblock.acquire())
567         {
568             sendContent(content,blocker);
569             blocker.block();
570         }
571     }
572 
573     /* ------------------------------------------------------------ */
574     /** Asynchronous send of content.
575      * @param content The content to send
576      * @param callback The callback to use to notify success or failure
577      */
578     public void sendContent(ByteBuffer content, final Callback callback)
579     {
580         write(content,true,new Callback()
581         {
582             @Override
583             public void succeeded()
584             {
585                 closed();
586                 callback.succeeded();
587             }
588 
589             @Override
590             public void failed(Throwable x)
591             {
592                 callback.failed(x);
593             }
594         });
595     }
596 
597     /* ------------------------------------------------------------ */
598     /** Asynchronous send of content.
599      * @param in The content to send as a stream.  The stream will be closed
600      * after reading all content.
601      * @param callback The callback to use to notify success or failure
602      */
603     public void sendContent(InputStream in, Callback callback)
604     {
605         new InputStreamWritingCB(in,callback).iterate();
606     }
607 
608     /* ------------------------------------------------------------ */
609     /** Asynchronous send of content.
610      * @param in The content to send as a channel.  The channel will be closed
611      * after reading all content.
612      * @param callback The callback to use to notify success or failure
613      */
614     public void sendContent(ReadableByteChannel in, Callback callback)
615     {
616         new ReadableByteChannelWritingCB(in,callback).iterate();
617     }
618 
619     /* ------------------------------------------------------------ */
620     /** Asynchronous send of content.
621      * @param httpContent The content to send
622      * @param callback The callback to use to notify success or failure
623      */
624     public void sendContent(HttpContent httpContent, Callback callback)
625     {
626         if (BufferUtil.hasContent(_aggregate))
627         {
628             callback.failed(new IOException("cannot sendContent() after write()"));
629             return;
630         }
631         if (_channel.isCommitted())
632         {
633             callback.failed(new IOException("committed"));
634             return;
635         }
636 
637         while (true)
638         {
639             switch(_state.get())
640             {
641                 case OPEN:
642                     if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
643                         continue;
644                     break;
645                 case ERROR:
646                     callback.failed(new EofException(_onError));
647                     return;
648                     
649                 case CLOSED:
650                     callback.failed(new EofException("Closed"));
651                     return;
652                 default:
653                     throw new IllegalStateException();
654             }
655             break;
656         }
657         ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
658         if (buffer == null)
659             buffer = httpContent.getIndirectBuffer();
660 
661         if (buffer!=null)
662         {
663             if (LOG.isDebugEnabled())
664                 LOG.debug("sendContent({}=={},{},direct={})",httpContent,BufferUtil.toDetailString(buffer),callback,_channel.useDirectBuffers());
665             
666             sendContent(buffer,callback);
667             return;
668         }
669 
670         try
671         {
672             ReadableByteChannel rbc=httpContent.getReadableByteChannel();
673             if (rbc!=null)
674             {
675                 if (LOG.isDebugEnabled())
676                     LOG.debug("sendContent({}=={},{},direct={})",httpContent,rbc,callback,_channel.useDirectBuffers());
677                 // Close of the rbc is done by the async sendContent
678                 sendContent(rbc,callback);
679                 return;
680             }
681 
682             InputStream in = httpContent.getInputStream();
683             if ( in!=null )
684             {
685                 if (LOG.isDebugEnabled())
686                     LOG.debug("sendContent({}=={},{},direct={})",httpContent,in,callback,_channel.useDirectBuffers());
687                 sendContent(in,callback);
688                 return;
689             }
690         }
691         catch(Throwable th)
692         {
693             callback.failed(th);
694             return;
695         }
696 
697         callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
698     }
699 
700     public int getBufferSize()
701     {
702         return _bufferSize;
703     }
704 
705     public void setBufferSize(int size)
706     {
707         _bufferSize = size;
708         _commitSize = size;
709     }
710 
711     public void resetBuffer()
712     {
713         if (BufferUtil.hasContent(_aggregate))
714             BufferUtil.clear(_aggregate);
715     }
716 
717     @Override
718     public void setWriteListener(WriteListener writeListener)
719     {
720         if (!_channel.getState().isAsync())
721             throw new IllegalStateException("!ASYNC");
722 
723         if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
724         {
725             _writeListener = writeListener;
726             _channel.getState().onWritePossible();
727         }
728         else
729             throw new IllegalStateException();
730     }
731 
732     /**
733      * @see javax.servlet.ServletOutputStream#isReady()
734      */
735     @Override
736     public boolean isReady()
737     {
738         while (true)
739         {
740             switch(_state.get())
741             {
742                 case OPEN:
743                     return true;
744                 case ASYNC:
745                     if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
746                         continue;
747                     return true;
748                 case READY:
749                     return true;
750                 case PENDING:
751                     if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
752                         continue;
753                     return false;
754                 case UNREADY:
755                     return false;
756 
757                 case ERROR:
758                     return true;
759                     
760                 case CLOSED:
761                     return true;
762             }
763         }
764     }
765 
766     @Override
767     public void run()
768     {
769         loop: while (true)
770         {
771             OutputState state = _state.get();
772 
773             if(_onError!=null)
774             {
775                 switch(state)
776                 {
777                     case CLOSED:
778                     case ERROR:
779                         _onError=null;
780                         break loop;
781 
782                     default:
783                         if (_state.compareAndSet(state, OutputState.ERROR))
784                         {
785                             Throwable th=_onError;
786                             _onError=null;
787                             if (LOG.isDebugEnabled())
788                                 LOG.debug("onError",th);
789                             _writeListener.onError(th);
790                             close();
791 
792                             break loop;
793                         }
794 
795                 }
796                 continue;
797             }
798             
799             switch(_state.get())
800             {
801                 case CLOSED:
802                     // even though a write is not possible, because a close has 
803                     // occurred, we need to call onWritePossible to tell async
804                     // producer that the last write completed.
805                     // so fall through
806                 case READY:
807                     try
808                     {
809                         _writeListener.onWritePossible();
810                         break loop;
811                     }
812                     catch (Throwable e)
813                     {
814                         _onError=e;
815                     }
816                     break;
817                     
818                 default:
819                     _onError=new IllegalStateException("state="+_state.get());
820             }
821         }
822     }
823     
824     @Override
825     public String toString()
826     {
827         return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
828     }
829     
830     private abstract class AsyncICB extends IteratingCallback
831     {
832         @Override
833         protected void onCompleteSuccess()
834         {
835             while(true)
836             {
837                 OutputState last=_state.get();
838                 switch(last)
839                 {
840                     case PENDING:
841                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
842                             continue;
843                         break;
844 
845                     case UNREADY:
846                         if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
847                             continue;
848                         _channel.getState().onWritePossible();
849                         break;
850 
851                     case CLOSED:
852                         break;
853 
854                     default:
855                         throw new IllegalStateException();
856                 }
857                 break;
858             }
859         }
860 
861         @Override
862         public void onCompleteFailure(Throwable e)
863         {
864             _onError=e==null?new IOException():e;
865             _channel.getState().onWritePossible();
866         }
867     }
868     
869     
870     private class AsyncFlush extends AsyncICB
871     {
872         protected volatile boolean _flushed;
873 
874         public AsyncFlush()
875         {
876         }
877 
878         @Override
879         protected Action process()
880         {
881             if (BufferUtil.hasContent(_aggregate))
882             {
883                 _flushed=true;
884                 write(_aggregate, false, this);
885                 return Action.SCHEDULED;
886             }
887 
888             if (!_flushed)
889             {
890                 _flushed=true;
891                 write(BufferUtil.EMPTY_BUFFER,false,this);
892                 return Action.SCHEDULED;
893             }
894 
895             return Action.SUCCEEDED;
896         }
897     }
898 
899 
900 
901     private class AsyncWrite extends AsyncICB
902     {
903         private final ByteBuffer _buffer;
904         private final ByteBuffer _slice;
905         private final boolean _complete;
906         private final int _len;
907         protected volatile boolean _completed;
908 
909         public AsyncWrite(byte[] b, int off, int len, boolean complete)
910         {
911             _buffer=ByteBuffer.wrap(b, off, len);
912             _len=len;
913             // always use a view for large byte arrays to avoid JVM pooling large direct buffers
914             _slice=_len<getBufferSize()?null:_buffer.duplicate();
915             _complete=complete;
916         }
917 
918         public AsyncWrite(ByteBuffer buffer, boolean complete)
919         {
920             _buffer=buffer;
921             _len=buffer.remaining();
922             // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
923             _slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
924             _complete=complete;
925         }
926 
927         @Override
928         protected Action process()
929         {
930             // flush any content from the aggregate
931             if (BufferUtil.hasContent(_aggregate))
932             {
933                 _completed=_len==0;
934                 write(_aggregate, _complete && _completed, this);
935                 return Action.SCHEDULED;
936             }
937 
938             // Can we just aggregate the remainder?
939             if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
940             {
941                 int position = BufferUtil.flipToFill(_aggregate);
942                 BufferUtil.put(_buffer,_aggregate);
943                 BufferUtil.flipToFlush(_aggregate, position);
944                 return Action.SUCCEEDED;
945             }
946             
947             // Is there data left to write?
948             if (_buffer.hasRemaining())
949             {
950                 // if there is no slice, just write it
951                 if (_slice==null)
952                 {
953                     _completed=true;
954                     write(_buffer, _complete, this);
955                     return Action.SCHEDULED;
956                 }
957                 
958                 // otherwise take a slice
959                 int p=_buffer.position();
960                 int l=Math.min(getBufferSize(),_buffer.remaining());
961                 int pl=p+l;
962                 _slice.limit(pl);
963                 _buffer.position(pl);
964                 _slice.position(p);
965                 _completed=!_buffer.hasRemaining();
966                 write(_slice, _complete && _completed, this);
967                 return Action.SCHEDULED;
968             }
969             
970             // all content written, but if we have not yet signal completion, we
971             // need to do so
972             if (_complete && !_completed)
973             {
974                 _completed=true;
975                 write(BufferUtil.EMPTY_BUFFER, _complete, this);
976                 return Action.SCHEDULED;
977             }
978 
979             return Action.SUCCEEDED;
980         }
981 
982         @Override
983         protected void onCompleteSuccess()
984         {
985             super.onCompleteSuccess();
986             if (_complete)
987                 closed();
988         }
989         
990         
991     }
992 
993 
994     /* ------------------------------------------------------------ */
995     /** An iterating callback that will take content from an
996      * InputStream and write it to the associated {@link HttpChannel}.
997      * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
998      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
999      * be notified as each buffer is written and only once all the input is consumed will the
1000      * wrapped {@link Callback#succeeded()} method be called.
1001      */
1002     private class InputStreamWritingCB extends IteratingNestedCallback
1003     {
1004         private final InputStream _in;
1005         private final ByteBuffer _buffer;
1006         private boolean _eof;
1007 
1008         public InputStreamWritingCB(InputStream in, Callback callback)
1009         {
1010             super(callback);
1011             _in=in;
1012             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1013         }
1014 
1015         @Override
1016         protected Action process() throws Exception
1017         {
1018             // Only return if EOF has previously been read and thus
1019             // a write done with EOF=true
1020             if (_eof)
1021             {
1022                 // Handle EOF
1023                 _in.close();
1024                 closed();
1025                 _channel.getByteBufferPool().release(_buffer);
1026                 return Action.SUCCEEDED;
1027             }
1028             
1029             // Read until buffer full or EOF
1030             int len=0;
1031             while (len<_buffer.capacity() && !_eof)
1032             {
1033                 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1034                 if (r<0)
1035                     _eof=true;
1036                 else
1037                     len+=r;
1038             }
1039 
1040             // write what we have
1041             _buffer.position(0);
1042             _buffer.limit(len);
1043             write(_buffer,_eof,this);
1044             return Action.SCHEDULED;
1045         }
1046 
1047         @Override
1048         public void onCompleteFailure(Throwable x)
1049         {
1050             super.onCompleteFailure(x);
1051             _channel.getByteBufferPool().release(_buffer);
1052             try
1053             {
1054                 _in.close();
1055             }
1056             catch (IOException e)
1057             {
1058                 LOG.ignore(e);
1059             }
1060         }
1061 
1062     }
1063 
1064     /* ------------------------------------------------------------ */
1065     /** An iterating callback that will take content from a
1066      * ReadableByteChannel and write it to the {@link HttpChannel}.
1067      * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
1068      * {@link HttpChannel#useDirectBuffers()} is true.
1069      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1070      * be notified as each buffer is written and only once all the input is consumed will the
1071      * wrapped {@link Callback#succeeded()} method be called.
1072      */
1073     private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1074     {
1075         private final ReadableByteChannel _in;
1076         private final ByteBuffer _buffer;
1077         private boolean _eof;
1078 
1079         public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1080         {
1081             super(callback);
1082             _in=in;
1083             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1084         }
1085 
1086         @Override
1087         protected Action process() throws Exception
1088         {
1089             // Only return if EOF has previously been read and thus
1090             // a write done with EOF=true
1091             if (_eof)
1092             {
1093                 _in.close();
1094                 closed();
1095                 _channel.getByteBufferPool().release(_buffer);
1096                 return Action.SUCCEEDED;
1097             }
1098             
1099             // Read from stream until buffer full or EOF
1100             _buffer.clear();
1101             while (_buffer.hasRemaining() && !_eof)
1102               _eof = (_in.read(_buffer)) <  0;
1103 
1104             // write what we have
1105             _buffer.flip();
1106             write(_buffer,_eof,this);
1107 
1108             return Action.SCHEDULED;
1109         }
1110 
1111         @Override
1112         public void onCompleteFailure(Throwable x)
1113         {
1114             super.onCompleteFailure(x);
1115             _channel.getByteBufferPool().release(_buffer);
1116             try
1117             {
1118                 _in.close();
1119             }
1120             catch (IOException e)
1121             {
1122                 LOG.ignore(e);
1123             }
1124         }
1125     }
1126 }