View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ReadableByteChannel;
26  import java.nio.channels.WritePendingException;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import javax.servlet.RequestDispatcher;
30  import javax.servlet.ServletOutputStream;
31  import javax.servlet.ServletRequest;
32  import javax.servlet.ServletResponse;
33  import javax.servlet.WriteListener;
34  
35  import org.eclipse.jetty.http.HttpContent;
36  import org.eclipse.jetty.io.EofException;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.Callback;
39  import org.eclipse.jetty.util.IteratingCallback;
40  import org.eclipse.jetty.util.IteratingNestedCallback;
41  import org.eclipse.jetty.util.SharedBlockingCallback;
42  import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  
46  /**
47   * <p>{@link HttpOutput} implements {@link ServletOutputStream}
48   * as required by the Servlet specification.</p>
49   * <p>{@link HttpOutput} buffers content written by the application until a
50   * further write will overflow the buffer, at which point it triggers a commit
51   * of the response.</p>
52   * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
53   * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
54   * close the stream, to be reopened after the inclusion ends.</p>
55   */
56  public class HttpOutput extends ServletOutputStream implements Runnable
57  {
58      public interface Interceptor
59      {
60          void write(ByteBuffer content, boolean complete, Callback callback);
61          Interceptor getNextInterceptor();
62          boolean isOptimizedForDirectBuffers();
63      }
64      
65      private static Logger LOG = Log.getLogger(HttpOutput.class);
66  
67      private final HttpChannel _channel;
68      private final SharedBlockingCallback _writeBlock;
69      private Interceptor _interceptor;
70      
71      /** Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. */
72      private long _written;
73      
74      private ByteBuffer _aggregate;
75      private int _bufferSize;
76      private int _commitSize;
77      private WriteListener _writeListener;
78      private volatile Throwable _onError;
79      /*
80      ACTION             OPEN       ASYNC      READY      PENDING       UNREADY       CLOSED
81      -------------------------------------------------------------------------------------------
82      setWriteListener() READY->owp ise        ise        ise           ise           ise
83      write()            OPEN       ise        PENDING    wpe           wpe           eof
84      flush()            OPEN       ise        PENDING    wpe           wpe           eof
85      close()            CLOSED     CLOSED     CLOSED     CLOSED        wpe           CLOSED
86      isReady()          OPEN:true  READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
87      write completed    -          -          -          ASYNC         READY->owp    -
88      */
89      private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
90      private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
91  
92      public HttpOutput(HttpChannel channel)
93      {
94          _channel = channel;
95          _interceptor = channel;
96          _writeBlock = new SharedBlockingCallback()
97          {
98              @Override
99              protected long getIdleTimeout()
100             {
101                 long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout();
102                 if (bto>0)
103                     return bto;
104                 if (bto<0)
105                     return -1;
106                 return _channel.getIdleTimeout();
107             }
108         };
109         HttpConfiguration config = channel.getHttpConfiguration();
110         _bufferSize = config.getOutputBufferSize();
111         _commitSize = config.getOutputAggregationSize();
112         if (_commitSize>_bufferSize)
113         {
114             LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize);
115             _commitSize=_bufferSize;
116         }
117     }
118     
119     public HttpChannel getHttpChannel()
120     {
121         return _channel;
122     }
123 
124     public Interceptor getInterceptor()
125     {
126         return _interceptor;
127     }
128 
129     public void setInterceptor(Interceptor filter)
130     {
131         _interceptor=filter;
132     }
133     
134     public boolean isWritten()
135     {
136         return _written > 0;
137     }
138 
139     public long getWritten()
140     {
141         return _written;
142     }
143 
144     public void reopen()
145     {
146         _state.set(OutputState.OPEN);
147     }
148 
149     public boolean isAllContentWritten()
150     {
151         return _channel.getResponse().isAllContentWritten(_written);
152     }
153 
154     protected Blocker acquireWriteBlockingCallback() throws IOException
155     {
156         return _writeBlock.acquire();
157     }
158     
159     private void write(ByteBuffer content, boolean complete) throws IOException
160     {
161         try (Blocker blocker = _writeBlock.acquire())
162         {        
163             write(content, complete, blocker);
164             blocker.block();
165         }
166         catch (Throwable failure)
167         {
168             if (LOG.isDebugEnabled())
169                 LOG.debug(failure);
170             abort(failure);
171             throw failure;
172         }
173     }
174 
175     protected void write(ByteBuffer content, boolean complete, Callback callback)
176     {
177         _interceptor.write(content, complete, callback);
178     }
179 
180     private void abort(Throwable failure)
181     {
182         closed();
183         _channel.abort(failure);
184     }
185 
186     @Override
187     public void close()
188     {
189         while(true)
190         {
191             OutputState state=_state.get();
192             switch (state)
193             {
194                 case CLOSED:
195                 {
196                     return;
197                 }
198                 case UNREADY:
199                 {
200                     if (_state.compareAndSet(state,OutputState.ERROR))
201                         _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
202                     break;
203                 }
204                 default:
205                 {
206                     if (!_state.compareAndSet(state,OutputState.CLOSED))
207                         break;
208 
209                     try
210                     {
211                         write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
212                     }
213                     catch (IOException x)
214                     {
215                         // Ignore it, it's been already logged in write().
216                     }
217                     finally
218                     {
219                         releaseBuffer();
220                     }
221                     // Return even if an exception is thrown by write().
222                     return;
223                 }
224             }
225         }
226     }
227 
228     /**
229      * Called to indicate that the last write has been performed.
230      * It updates the state and performs cleanup operations.
231      */
232     void closed()
233     {
234         while(true)
235         {
236             OutputState state=_state.get();
237             switch (state)
238             {
239                 case CLOSED:
240                 {
241                     return;
242                 }
243                 case UNREADY:
244                 {
245                     if (_state.compareAndSet(state,OutputState.ERROR))
246                         _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
247                     break;
248                 }
249                 default:
250                 {
251                     if (!_state.compareAndSet(state, OutputState.CLOSED))
252                         break;
253 
254                     try
255                     {
256                         _channel.getResponse().closeOutput();
257                     }
258                     catch (Throwable x)
259                     {
260                         if (LOG.isDebugEnabled())
261                             LOG.debug(x);
262                         abort(x);
263                     }
264                     finally
265                     {
266                         releaseBuffer();
267                     }
268                     // Return even if an exception is thrown by closeOutput().
269                     return;
270                 }
271             }
272         }
273     }
274 
275     private void releaseBuffer()
276     {
277         if (_aggregate != null)
278         {
279             _channel.getConnector().getByteBufferPool().release(_aggregate);
280             _aggregate = null;
281         }
282     }
283 
284     public boolean isClosed()
285     {
286         return _state.get()==OutputState.CLOSED;
287     }
288 
289     @Override
290     public void flush() throws IOException
291     {
292         while(true)
293         {
294             switch(_state.get())
295             {
296                 case OPEN:
297                     write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
298                     return;
299 
300                 case ASYNC:
301                     throw new IllegalStateException("isReady() not called");
302 
303                 case READY:
304                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
305                         continue;
306                     new AsyncFlush().iterate();
307                     return;
308 
309                 case PENDING:
310                 case UNREADY:
311                     throw new WritePendingException();
312 
313                 case ERROR:
314                     throw new EofException(_onError);
315                     
316                 case CLOSED:
317                     return;
318 
319                 default:
320                     throw new IllegalStateException();
321             }
322         }
323     }
324 
325     @Override
326     public void write(byte[] b, int off, int len) throws IOException
327     {
328         _written+=len;
329         boolean complete=_channel.getResponse().isAllContentWritten(_written);
330 
331         // Async or Blocking ?
332         while(true)
333         {
334             switch(_state.get())
335             {
336                 case OPEN:
337                     // process blocking below
338                     break;
339 
340                 case ASYNC:
341                     throw new IllegalStateException("isReady() not called");
342 
343                 case READY:
344                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
345                         continue;
346 
347                     // Should we aggregate?
348                     if (!complete && len<=_commitSize)
349                     {
350                         if (_aggregate == null)
351                             _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
352 
353                         // YES - fill the aggregate with content from the buffer
354                         int filled = BufferUtil.fill(_aggregate, b, off, len);
355 
356                         // return if we are not complete, not full and filled all the content
357                         if (filled==len && !BufferUtil.isFull(_aggregate))
358                         {
359                             if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
360                                 throw new IllegalStateException();
361                             return;
362                         }
363 
364                         // adjust offset/length
365                         off+=filled;
366                         len-=filled;
367                     }
368 
369                     // Do the asynchronous writing from the callback
370                     new AsyncWrite(b,off,len,complete).iterate();
371                     return;
372 
373                 case PENDING:
374                 case UNREADY:
375                     throw new WritePendingException();
376 
377                 case ERROR:
378                     throw new EofException(_onError);
379                     
380                 case CLOSED:
381                     throw new EofException("Closed");
382 
383                 default:
384                     throw new IllegalStateException();
385             }
386             break;
387         }
388 
389         // handle blocking write
390 
391         // Should we aggregate?
392         int capacity = getBufferSize();
393         if (!complete && len<=_commitSize)
394         {
395             if (_aggregate == null)
396                 _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
397 
398             // YES - fill the aggregate with content from the buffer
399             int filled = BufferUtil.fill(_aggregate, b, off, len);
400 
401             // return if we are not complete, not full and filled all the content
402             if (filled==len && !BufferUtil.isFull(_aggregate))
403                 return;
404 
405             // adjust offset/length
406             off+=filled;
407             len-=filled;
408         }
409 
410         // flush any content from the aggregate
411         if (BufferUtil.hasContent(_aggregate))
412         {
413             write(_aggregate, complete && len==0);
414 
415             // should we fill aggregate again from the buffer?
416             if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate))
417             {
418                 BufferUtil.append(_aggregate, b, off, len);
419                 return;
420             }
421         }
422 
423         // write any remaining content in the buffer directly
424         if (len>0)
425         {
426             ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
427             ByteBuffer view = wrap.duplicate();
428 
429             // write a buffer capacity at a time to avoid JVM pooling large direct buffers
430             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
431             while (len>getBufferSize())
432             {
433                 int p=view.position();
434                 int l=p+getBufferSize();
435                 view.limit(p+getBufferSize());
436                 write(view,false);
437                 len-=getBufferSize();
438                 view.limit(l+Math.min(len,getBufferSize()));
439                 view.position(l);
440             }
441             write(view,complete);
442         }
443         else if (complete)
444         {
445             write(BufferUtil.EMPTY_BUFFER,true);
446         }
447 
448         if (complete)
449             closed();
450     }
451 
452     public void write(ByteBuffer buffer) throws IOException
453     {
454         _written+=buffer.remaining();
455         boolean complete=_channel.getResponse().isAllContentWritten(_written);
456 
457         // Async or Blocking ?
458         while(true)
459         {
460             switch(_state.get())
461             {
462                 case OPEN:
463                     // process blocking below
464                     break;
465 
466                 case ASYNC:
467                     throw new IllegalStateException("isReady() not called");
468 
469                 case READY:
470                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
471                         continue;
472 
473                     // Do the asynchronous writing from the callback
474                     new AsyncWrite(buffer,complete).iterate();
475                     return;
476 
477                 case PENDING:
478                 case UNREADY:
479                     throw new WritePendingException();
480 
481                 case ERROR:
482                     throw new EofException(_onError);
483                     
484                 case CLOSED:
485                     throw new EofException("Closed");
486 
487                 default:
488                     throw new IllegalStateException();
489             }
490             break;
491         }
492 
493 
494         // handle blocking write
495         int len=BufferUtil.length(buffer);
496 
497         // flush any content from the aggregate
498         if (BufferUtil.hasContent(_aggregate))
499             write(_aggregate, complete && len==0);
500 
501         // write any remaining content in the buffer directly
502         if (len>0)
503             write(buffer, complete);
504         else if (complete)
505             write(BufferUtil.EMPTY_BUFFER, true);
506 
507         if (complete)
508             closed();
509     }
510 
511     @Override
512     public void write(int b) throws IOException
513     {
514         _written+=1;
515         boolean complete=_channel.getResponse().isAllContentWritten(_written);
516 
517         // Async or Blocking ?
518         while(true)
519         {
520             switch(_state.get())
521             {
522                 case OPEN:
523                     if (_aggregate == null)
524                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
525                     BufferUtil.append(_aggregate, (byte)b);
526 
527                     // Check if all written or full
528                     if (complete || BufferUtil.isFull(_aggregate))
529                     {
530                         write(_aggregate, complete);
531                         if (complete)
532                             closed();
533                     }
534                     break;
535 
536                 case ASYNC:
537                     throw new IllegalStateException("isReady() not called");
538 
539                 case READY:
540                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
541                         continue;
542 
543                     if (_aggregate == null)
544                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
545                     BufferUtil.append(_aggregate, (byte)b);
546 
547                     // Check if all written or full
548                     if (!complete && !BufferUtil.isFull(_aggregate))
549                     {
550                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
551                             throw new IllegalStateException();
552                         return;
553                     }
554 
555                     // Do the asynchronous writing from the callback
556                     new AsyncFlush().iterate();
557                     return;
558 
559                 case PENDING:
560                 case UNREADY:
561                     throw new WritePendingException();
562 
563                 case ERROR:
564                     throw new EofException(_onError);
565                     
566                 case CLOSED:
567                     throw new EofException("Closed");
568 
569                 default:
570                     throw new IllegalStateException();
571             }
572             break;
573         }
574     }
575 
576     @Override
577     public void print(String s) throws IOException
578     {
579         if (isClosed())
580             throw new IOException("Closed");
581 
582         write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
583     }
584 
585     /**
586      * Blocking send of whole content.
587      *
588      * @param content The whole content to send
589      * @throws IOException if the send fails
590      */
591     public void sendContent(ByteBuffer content) throws IOException
592     {
593         if (LOG.isDebugEnabled())
594             LOG.debug("sendContent({})",BufferUtil.toDetailString(content));
595         
596         write(content, true);
597         closed();
598     }
599 
600     /**
601      * Blocking send of stream content.
602      *
603      * @param in The stream content to send
604      * @throws IOException if the send fails
605      */
606     public void sendContent(InputStream in) throws IOException
607     {
608         try(Blocker blocker = _writeBlock.acquire())
609         {
610             new InputStreamWritingCB(in, blocker).iterate();
611             blocker.block();
612         }
613         catch (Throwable failure)
614         {
615             if (LOG.isDebugEnabled())
616                 LOG.debug(failure);
617             abort(failure);
618             throw failure;
619         }
620     }
621 
622     /**
623      * Blocking send of channel content.
624      *
625      * @param in The channel content to send
626      * @throws IOException if the send fails
627      */
628     public void sendContent(ReadableByteChannel in) throws IOException
629     {
630         try(Blocker blocker = _writeBlock.acquire())
631         {
632             new ReadableByteChannelWritingCB(in, blocker).iterate();
633             blocker.block();
634         }
635         catch (Throwable failure)
636         {
637             if (LOG.isDebugEnabled())
638                 LOG.debug(failure);
639             abort(failure);
640             throw failure;
641         }
642     }
643 
644     /**
645      * Blocking send of HTTP content.
646      *
647      * @param content The HTTP content to send
648      * @throws IOException if the send fails
649      */
650     public void sendContent(HttpContent content) throws IOException
651     {
652         try(Blocker blocker = _writeBlock.acquire())
653         {
654             sendContent(content, blocker);
655             blocker.block();
656         }
657         catch (Throwable failure)
658         {
659             if (LOG.isDebugEnabled())
660                 LOG.debug(failure);
661             abort(failure);
662             throw failure;
663         }
664     }
665 
666     /**
667      * Asynchronous send of whole content.
668      * @param content The whole content to send
669      * @param callback The callback to use to notify success or failure
670      */
671     public void sendContent(ByteBuffer content, final Callback callback)
672     {
673         if (LOG.isDebugEnabled())
674             LOG.debug("sendContent(buffer={},{})",BufferUtil.toDetailString(content),callback);
675         
676         write(content, true, new Callback()
677         {
678             @Override
679             public void succeeded()
680             {
681                 closed();
682                 callback.succeeded();
683             }
684 
685             @Override
686             public void failed(Throwable x)
687             {
688                 abort(x);
689                 callback.failed(x);
690             }
691         });
692     }
693 
694     /**
695      * Asynchronous send of stream content.
696      * The stream will be closed after reading all content.
697      *
698      * @param in The stream content to send
699      * @param callback The callback to use to notify success or failure
700      */
701     public void sendContent(InputStream in, Callback callback)
702     {
703         if (LOG.isDebugEnabled())
704             LOG.debug("sendContent(stream={},{})",in,callback);
705         
706         new InputStreamWritingCB(in, callback).iterate();
707     }
708 
709     /**
710      * Asynchronous send of channel content.
711      * The channel will be closed after reading all content.
712      *
713      * @param in The channel content to send
714      * @param callback The callback to use to notify success or failure
715      */
716     public void sendContent(ReadableByteChannel in, Callback callback)
717     {
718         if (LOG.isDebugEnabled())
719             LOG.debug("sendContent(channel={},{})",in,callback);
720         
721         new ReadableByteChannelWritingCB(in, callback).iterate();
722     }
723 
724     /**
725      * Asynchronous send of HTTP content.
726      *
727      * @param httpContent The HTTP content to send
728      * @param callback The callback to use to notify success or failure
729      */
730     public void sendContent(HttpContent httpContent, Callback callback)
731     {
732         if (LOG.isDebugEnabled())
733             LOG.debug("sendContent(http={},{})",httpContent,callback);
734         
735         if (BufferUtil.hasContent(_aggregate))
736         {
737             callback.failed(new IOException("cannot sendContent() after write()"));
738             return;
739         }
740         if (_channel.isCommitted())
741         {
742             callback.failed(new IOException("committed"));
743             return;
744         }
745 
746         while (true)
747         {
748             switch(_state.get())
749             {
750                 case OPEN:
751                     if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
752                         continue;
753                     break;
754 
755                 case ERROR:
756                     callback.failed(new EofException(_onError));
757                     return;
758                     
759                 case CLOSED:
760                     callback.failed(new EofException("Closed"));
761                     return;
762 
763                 default:
764                     throw new IllegalStateException();
765             }
766             break;
767         }
768         
769 
770         ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
771         if (buffer == null)
772             buffer = httpContent.getIndirectBuffer();
773 
774         if (buffer!=null)
775         {
776             sendContent(buffer,callback);
777             return;
778         }
779 
780         try
781         {
782             ReadableByteChannel rbc=httpContent.getReadableByteChannel();
783             if (rbc!=null)
784             {
785                 // Close of the rbc is done by the async sendContent
786                 sendContent(rbc,callback);
787                 return;
788             }
789 
790             InputStream in = httpContent.getInputStream();
791             if (in!=null)
792             {
793                 sendContent(in,callback);
794                 return;
795             }
796 
797             throw new IllegalArgumentException("unknown content for "+httpContent);
798         }
799         catch(Throwable th)
800         {
801             abort(th);
802             callback.failed(th);
803         }
804     }
805 
806     public int getBufferSize()
807     {
808         return _bufferSize;
809     }
810 
811     public void setBufferSize(int size)
812     {
813         _bufferSize = size;
814         _commitSize = size;
815     }
816 
817     public void recycle()
818     {
819         resetBuffer();
820         _interceptor=_channel;
821     }
822     
823     public void resetBuffer()
824     {
825         _written = 0;
826         if (BufferUtil.hasContent(_aggregate))
827             BufferUtil.clear(_aggregate);
828         reopen();
829     }
830 
831     @Override
832     public void setWriteListener(WriteListener writeListener)
833     {
834         if (!_channel.getState().isAsync())
835             throw new IllegalStateException("!ASYNC");
836 
837         if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
838         {
839             _writeListener = writeListener;
840             if (_channel.getState().onWritePossible())
841                 _channel.execute(_channel);
842         }
843         else
844             throw new IllegalStateException();
845     }
846 
847     /**
848      * @see javax.servlet.ServletOutputStream#isReady()
849      */
850     @Override
851     public boolean isReady()
852     {
853         while (true)
854         {
855             switch(_state.get())
856             {
857                 case OPEN:
858                     return true;
859 
860                 case ASYNC:
861                     if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
862                         continue;
863                     return true;
864 
865                 case READY:
866                     return true;
867 
868                 case PENDING:
869                     if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
870                         continue;
871                     return false;
872 
873                 case UNREADY:
874                     return false;
875 
876                 case ERROR:
877                     return true;
878                     
879                 case CLOSED:
880                     return true;
881 
882                 default:
883                     throw new IllegalStateException();
884             }
885         }
886     }
887 
888     @Override
889     public void run()
890     {
891         loop: while (true)
892         {
893             OutputState state = _state.get();
894 
895             if(_onError!=null)
896             {
897                 switch(state)
898                 {
899                     case CLOSED:
900                     case ERROR:
901                     {
902                         _onError=null;
903                         break loop;
904                     }
905                     default:
906                     {
907                         if (_state.compareAndSet(state, OutputState.ERROR))
908                         {
909                             Throwable th=_onError;
910                             _onError=null;
911                             if (LOG.isDebugEnabled())
912                                 LOG.debug("onError",th);
913                             _writeListener.onError(th);
914                             close();
915                             break loop;
916                         }
917                     }
918                 }
919                 continue;
920             }
921             
922             switch(_state.get())
923             {
924                 case CLOSED:
925                     // Even though a write is not possible, because a close has
926                     // occurred, we need to call onWritePossible to tell async
927                     // producer that the last write completed.
928                     // So fall through
929                 case PENDING:
930                 case UNREADY:
931                 case READY:
932                     try
933                     {
934                         _writeListener.onWritePossible();
935                         break loop;
936                     }
937                     catch (Throwable e)
938                     {
939                         _onError = e;
940                     }
941                     break;
942                     
943                 default:
944                     _onError=new IllegalStateException("state="+_state.get());
945             }
946         }
947     }
948 
949     private void close(Closeable resource)
950     {
951         try
952         {
953             resource.close();
954         }
955         catch (Throwable x)
956         {
957             LOG.ignore(x);
958         }
959     }
960 
961     @Override
962     public String toString()
963     {
964         return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
965     }
966 
967     private abstract class AsyncICB extends IteratingCallback
968     {
969         @Override
970         protected void onCompleteSuccess()
971         {
972             while(true)
973             {
974                 OutputState last=_state.get();
975                 switch(last)
976                 {
977                     case PENDING:
978                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
979                             continue;
980                         break;
981 
982                     case UNREADY:
983                         if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
984                             continue;
985                         if (_channel.getState().onWritePossible())
986                             _channel.execute(_channel);
987                         break;
988 
989                     case CLOSED:
990                         break;
991 
992                     default:
993                         throw new IllegalStateException();
994                 }
995                 break;
996             }
997         }
998 
999         @Override
1000         public void onCompleteFailure(Throwable e)
1001         {
1002             _onError=e==null?new IOException():e;
1003             if (_channel.getState().onWritePossible())
1004                 _channel.execute(_channel);
1005         }
1006     }
1007 
1008     private class AsyncFlush extends AsyncICB
1009     {
1010         protected volatile boolean _flushed;
1011 
1012         public AsyncFlush()
1013         {
1014         }
1015 
1016         @Override
1017         protected Action process()
1018         {
1019             if (BufferUtil.hasContent(_aggregate))
1020             {
1021                 _flushed=true;
1022                 write(_aggregate, false, this);
1023                 return Action.SCHEDULED;
1024             }
1025 
1026             if (!_flushed)
1027             {
1028                 _flushed=true;
1029                 write(BufferUtil.EMPTY_BUFFER,false,this);
1030                 return Action.SCHEDULED;
1031             }
1032 
1033             return Action.SUCCEEDED;
1034         }
1035     }
1036 
1037     private class AsyncWrite extends AsyncICB
1038     {
1039         private final ByteBuffer _buffer;
1040         private final ByteBuffer _slice;
1041         private final boolean _complete;
1042         private final int _len;
1043         protected volatile boolean _completed;
1044 
1045         public AsyncWrite(byte[] b, int off, int len, boolean complete)
1046         {
1047             _buffer=ByteBuffer.wrap(b, off, len);
1048             _len=len;
1049             // always use a view for large byte arrays to avoid JVM pooling large direct buffers
1050             _slice=_len<getBufferSize()?null:_buffer.duplicate();
1051             _complete=complete;
1052         }
1053 
1054         public AsyncWrite(ByteBuffer buffer, boolean complete)
1055         {
1056             _buffer=buffer;
1057             _len=buffer.remaining();
1058             // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
1059             if (_buffer.isDirect()||_len<getBufferSize())
1060                 _slice=null;
1061             else
1062             {
1063                 _slice=_buffer.duplicate();
1064                 _buffer.position(_buffer.limit());
1065             }                
1066             _complete=complete;
1067         }
1068 
1069         @Override
1070         protected Action process()
1071         {
1072             // flush any content from the aggregate
1073             if (BufferUtil.hasContent(_aggregate))
1074             {
1075                 _completed=_len==0;
1076                 write(_aggregate, _complete && _completed, this);
1077                 return Action.SCHEDULED;
1078             }
1079 
1080             // Can we just aggregate the remainder?
1081             if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
1082             {
1083                 int position = BufferUtil.flipToFill(_aggregate);
1084                 BufferUtil.put(_buffer,_aggregate);
1085                 BufferUtil.flipToFlush(_aggregate, position);
1086                 return Action.SUCCEEDED;
1087             }
1088             
1089             // Is there data left to write?
1090             if (_buffer.hasRemaining())
1091             {
1092                 // if there is no slice, just write it
1093                 if (_slice==null)
1094                 {
1095                     _completed=true;
1096                     write(_buffer, _complete, this);
1097                     return Action.SCHEDULED;
1098                 }
1099                 
1100                 // otherwise take a slice
1101                 int p=_buffer.position();
1102                 int l=Math.min(getBufferSize(),_buffer.remaining());
1103                 int pl=p+l;
1104                 _slice.limit(pl);
1105                 _buffer.position(pl);
1106                 _slice.position(p);
1107                 _completed=!_buffer.hasRemaining();
1108                 write(_slice, _complete && _completed, this);
1109                 return Action.SCHEDULED;
1110             }
1111             
1112             // all content written, but if we have not yet signal completion, we
1113             // need to do so
1114             if (_complete && !_completed)
1115             {
1116                 _completed=true;
1117                 write(BufferUtil.EMPTY_BUFFER, true, this);
1118                 return Action.SCHEDULED;
1119             }
1120 
1121             if (LOG.isDebugEnabled() && _completed)
1122                 LOG.debug("EOF of {}",this);
1123             return Action.SUCCEEDED;
1124         }
1125 
1126         @Override
1127         protected void onCompleteSuccess()
1128         {
1129             super.onCompleteSuccess();
1130             if (_complete)
1131                 closed();
1132         }
1133     }
1134 
1135     /**
1136      * An iterating callback that will take content from an
1137      * InputStream and write it to the associated {@link HttpChannel}.
1138      * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
1139      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1140      * be notified as each buffer is written and only once all the input is consumed will the
1141      * wrapped {@link Callback#succeeded()} method be called.
1142      */
1143     private class InputStreamWritingCB extends IteratingNestedCallback
1144     {
1145         private final InputStream _in;
1146         private final ByteBuffer _buffer;
1147         private boolean _eof;
1148 
1149         public InputStreamWritingCB(InputStream in, Callback callback)
1150         {
1151             super(callback);
1152             _in=in;
1153             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1154         }
1155 
1156         @Override
1157         protected Action process() throws Exception
1158         {
1159             // Only return if EOF has previously been read and thus
1160             // a write done with EOF=true
1161             if (_eof)
1162             {
1163                 if (LOG.isDebugEnabled())
1164                     LOG.debug("EOF of {}",this);
1165                 // Handle EOF
1166                 _in.close();
1167                 closed();
1168                 _channel.getByteBufferPool().release(_buffer);
1169                 return Action.SUCCEEDED;
1170             }
1171             
1172             // Read until buffer full or EOF
1173             int len=0;
1174             while (len<_buffer.capacity() && !_eof)
1175             {
1176                 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1177                 if (r<0)
1178                     _eof=true;
1179                 else
1180                     len+=r;
1181             }
1182 
1183             // write what we have
1184             _buffer.position(0);
1185             _buffer.limit(len);
1186             write(_buffer,_eof,this);
1187             return Action.SCHEDULED;
1188         }
1189 
1190         @Override
1191         public void onCompleteFailure(Throwable x)
1192         {
1193             abort(x);
1194             _channel.getByteBufferPool().release(_buffer);
1195             HttpOutput.this.close(_in);
1196             super.onCompleteFailure(x);
1197         }
1198     }
1199 
1200     /* ------------------------------------------------------------ */
1201     /** An iterating callback that will take content from a
1202      * ReadableByteChannel and write it to the {@link HttpChannel}.
1203      * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
1204      * {@link HttpChannel#useDirectBuffers()} is true.
1205      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1206      * be notified as each buffer is written and only once all the input is consumed will the
1207      * wrapped {@link Callback#succeeded()} method be called.
1208      */
1209     private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1210     {
1211         private final ReadableByteChannel _in;
1212         private final ByteBuffer _buffer;
1213         private boolean _eof;
1214 
1215         public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1216         {
1217             super(callback);
1218             _in=in;
1219             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1220         }
1221         
1222         @Override
1223         protected Action process() throws Exception
1224         {
1225             // Only return if EOF has previously been read and thus
1226             // a write done with EOF=true
1227             if (_eof)
1228             {
1229                 if (LOG.isDebugEnabled())
1230                     LOG.debug("EOF of {}",this);
1231                 _in.close();
1232                 closed();
1233                 _channel.getByteBufferPool().release(_buffer);
1234                 return Action.SUCCEEDED;
1235             }
1236             
1237             // Read from stream until buffer full or EOF
1238             _buffer.clear();
1239             while (_buffer.hasRemaining() && !_eof)
1240               _eof = (_in.read(_buffer)) <  0;
1241 
1242             // write what we have
1243             _buffer.flip();
1244             write(_buffer,_eof,this);
1245             
1246             return Action.SCHEDULED;
1247         }
1248 
1249         @Override
1250         public void onCompleteFailure(Throwable x)
1251         {
1252             abort(x);
1253             _channel.getByteBufferPool().release(_buffer);
1254             HttpOutput.this.close(_in);
1255             super.onCompleteFailure(x);
1256         }
1257     }
1258 }