View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ReadableByteChannel;
26  import java.nio.channels.WritePendingException;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import javax.servlet.RequestDispatcher;
30  import javax.servlet.ServletOutputStream;
31  import javax.servlet.ServletRequest;
32  import javax.servlet.ServletResponse;
33  import javax.servlet.WriteListener;
34  
35  import org.eclipse.jetty.http.HttpContent;
36  import org.eclipse.jetty.io.EofException;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.Callback;
39  import org.eclipse.jetty.util.IteratingCallback;
40  import org.eclipse.jetty.util.IteratingNestedCallback;
41  import org.eclipse.jetty.util.SharedBlockingCallback;
42  import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  
46  /**
47   * <p>{@link HttpOutput} implements {@link ServletOutputStream}
48   * as required by the Servlet specification.</p>
49   * <p>{@link HttpOutput} buffers content written by the application until a
50   * further write will overflow the buffer, at which point it triggers a commit
51   * of the response.</p>
52   * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
53   * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
54   * close the stream, to be reopened after the inclusion ends.</p>
55   */
56  public class HttpOutput extends ServletOutputStream implements Runnable
57  {
58      public interface Interceptor
59      {
60          void write(ByteBuffer content, boolean complete, Callback callback);
61          Interceptor getNextInterceptor();
62          boolean isOptimizedForDirectBuffers();
63      }
64      
65      private static Logger LOG = Log.getLogger(HttpOutput.class);
66  
67      private final HttpChannel _channel;
68      private final SharedBlockingCallback _writeBlock;
69      private Interceptor _interceptor;
70      
71      /** Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. */
72      private long _written;
73      
74      private ByteBuffer _aggregate;
75      private int _bufferSize;
76      private int _commitSize;
77      private WriteListener _writeListener;
78      private volatile Throwable _onError;
79      /*
80      ACTION             OPEN       ASYNC      READY      PENDING       UNREADY       CLOSED
81      -------------------------------------------------------------------------------------------
82      setWriteListener() READY->owp ise        ise        ise           ise           ise
83      write()            OPEN       ise        PENDING    wpe           wpe           eof
84      flush()            OPEN       ise        PENDING    wpe           wpe           eof
85      close()            CLOSED     CLOSED     CLOSED     CLOSED        wpe           CLOSED
86      isReady()          OPEN:true  READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
87      write completed    -          -          -          ASYNC         READY->owp    -
88      */
89      private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
90      private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
91  
92      public HttpOutput(HttpChannel channel)
93      {
94          _channel = channel;
95          _interceptor = channel;
96          _writeBlock = new SharedBlockingCallback()
97          {
98              @Override
99              protected long getIdleTimeout()
100             {
101                 long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout();
102                 if (bto>0)
103                     return bto;
104                 if (bto<0)
105                     return -1;
106                 return _channel.getIdleTimeout();
107             }
108         };
109         HttpConfiguration config = channel.getHttpConfiguration();
110         _bufferSize = config.getOutputBufferSize();
111         _commitSize = config.getOutputAggregationSize();
112         if (_commitSize>_bufferSize)
113         {
114             LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize);
115             _commitSize=_bufferSize;
116         }
117     }
118     
119     public HttpChannel getHttpChannel()
120     {
121         return _channel;
122     }
123 
124     public Interceptor getInterceptor()
125     {
126         return _interceptor;
127     }
128 
129     public void setInterceptor(Interceptor filter)
130     {
131         _interceptor=filter;
132     }
133     
134     public boolean isWritten()
135     {
136         return _written > 0;
137     }
138 
139     public long getWritten()
140     {
141         return _written;
142     }
143 
144     public void reopen()
145     {
146         _state.set(OutputState.OPEN);
147     }
148 
149     public boolean isAllContentWritten()
150     {
151         return _channel.getResponse().isAllContentWritten(_written);
152     }
153 
154     protected Blocker acquireWriteBlockingCallback() throws IOException
155     {
156         return _writeBlock.acquire();
157     }
158     
159     private void write(ByteBuffer content, boolean complete) throws IOException
160     {
161         try (Blocker blocker = _writeBlock.acquire())
162         {        
163             write(content, complete, blocker);
164             blocker.block();
165         }
166         catch (Throwable failure)
167         {
168             if (LOG.isDebugEnabled())
169                 LOG.debug(failure);
170             abort(failure);
171             throw failure;
172         }
173     }
174 
175     protected void write(ByteBuffer content, boolean complete, Callback callback)
176     {
177         _interceptor.write(content, complete, callback);
178     }
179 
180     private void abort(Throwable failure)
181     {
182         closed();
183         _channel.abort(failure);
184     }
185 
186     @Override
187     public void close()
188     {
189         while(true)
190         {
191             OutputState state=_state.get();
192             switch (state)
193             {
194                 case CLOSED:
195                 {
196                     return;
197                 }
198                 case UNREADY:
199                 {
200                     if (_state.compareAndSet(state,OutputState.ERROR))
201                         _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
202                     break;
203                 }
204                 default:
205                 {
206                     if (!_state.compareAndSet(state,OutputState.CLOSED))
207                         break;
208 
209                     try
210                     {
211                         write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
212                     }
213                     catch (IOException x)
214                     {
215                         // Ignore it, it's been already logged in write().
216                     }
217                     finally
218                     {
219                         releaseBuffer();
220                     }
221                     // Return even if an exception is thrown by write().
222                     return;
223                 }
224             }
225         }
226     }
227 
228     /**
229      * Called to indicate that the last write has been performed.
230      * It updates the state and performs cleanup operations.
231      */
232     void closed()
233     {
234         while(true)
235         {
236             OutputState state=_state.get();
237             switch (state)
238             {
239                 case CLOSED:
240                 {
241                     return;
242                 }
243                 case UNREADY:
244                 {
245                     if (_state.compareAndSet(state,OutputState.ERROR))
246                         _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
247                     break;
248                 }
249                 default:
250                 {
251                     if (!_state.compareAndSet(state, OutputState.CLOSED))
252                         break;
253 
254                     try
255                     {
256                         _channel.getResponse().closeOutput();
257                     }
258                     catch (Throwable x)
259                     {
260                         if (LOG.isDebugEnabled())
261                             LOG.debug(x);
262                         abort(x);
263                     }
264                     finally
265                     {
266                         releaseBuffer();
267                     }
268                     // Return even if an exception is thrown by closeOutput().
269                     return;
270                 }
271             }
272         }
273     }
274 
275     private void releaseBuffer()
276     {
277         if (_aggregate != null)
278         {
279             _channel.getConnector().getByteBufferPool().release(_aggregate);
280             _aggregate = null;
281         }
282     }
283 
284     public boolean isClosed()
285     {
286         return _state.get()==OutputState.CLOSED;
287     }
288 
289     @Override
290     public void flush() throws IOException
291     {
292         while(true)
293         {
294             switch(_state.get())
295             {
296                 case OPEN:
297                     write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
298                     return;
299 
300                 case ASYNC:
301                     throw new IllegalStateException("isReady() not called");
302 
303                 case READY:
304                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
305                         continue;
306                     new AsyncFlush().iterate();
307                     return;
308 
309                 case PENDING:
310                 case UNREADY:
311                     throw new WritePendingException();
312 
313                 case ERROR:
314                     throw new EofException(_onError);
315                     
316                 case CLOSED:
317                     return;
318 
319                 default:
320                     throw new IllegalStateException();
321             }
322         }
323     }
324 
325     @Override
326     public void write(byte[] b, int off, int len) throws IOException
327     {
328         _written+=len;
329         boolean complete=_channel.getResponse().isAllContentWritten(_written);
330 
331         // Async or Blocking ?
332         while(true)
333         {
334             switch(_state.get())
335             {
336                 case OPEN:
337                     // process blocking below
338                     break;
339 
340                 case ASYNC:
341                     throw new IllegalStateException("isReady() not called");
342 
343                 case READY:
344                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
345                         continue;
346 
347                     // Should we aggregate?
348                     if (!complete && len<=_commitSize)
349                     {
350                         if (_aggregate == null)
351                             _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
352 
353                         // YES - fill the aggregate with content from the buffer
354                         int filled = BufferUtil.fill(_aggregate, b, off, len);
355 
356                         // return if we are not complete, not full and filled all the content
357                         if (filled==len && !BufferUtil.isFull(_aggregate))
358                         {
359                             if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
360                                 throw new IllegalStateException();
361                             return;
362                         }
363 
364                         // adjust offset/length
365                         off+=filled;
366                         len-=filled;
367                     }
368 
369                     // Do the asynchronous writing from the callback
370                     new AsyncWrite(b,off,len,complete).iterate();
371                     return;
372 
373                 case PENDING:
374                 case UNREADY:
375                     throw new WritePendingException();
376 
377                 case ERROR:
378                     throw new EofException(_onError);
379                     
380                 case CLOSED:
381                     throw new EofException("Closed");
382 
383                 default:
384                     throw new IllegalStateException();
385             }
386             break;
387         }
388 
389         // handle blocking write
390 
391         // Should we aggregate?
392         int capacity = getBufferSize();
393         if (!complete && len<=_commitSize)
394         {
395             if (_aggregate == null)
396                 _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
397 
398             // YES - fill the aggregate with content from the buffer
399             int filled = BufferUtil.fill(_aggregate, b, off, len);
400 
401             // return if we are not complete, not full and filled all the content
402             if (filled==len && !BufferUtil.isFull(_aggregate))
403                 return;
404 
405             // adjust offset/length
406             off+=filled;
407             len-=filled;
408         }
409 
410         // flush any content from the aggregate
411         if (BufferUtil.hasContent(_aggregate))
412         {
413             write(_aggregate, complete && len==0);
414 
415             // should we fill aggregate again from the buffer?
416             if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate))
417             {
418                 BufferUtil.append(_aggregate, b, off, len);
419                 return;
420             }
421         }
422 
423         // write any remaining content in the buffer directly
424         if (len>0)
425         {
426             ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
427             ByteBuffer view = wrap.duplicate();
428 
429             // write a buffer capacity at a time to avoid JVM pooling large direct buffers
430             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
431             while (len>getBufferSize())
432             {
433                 int p=view.position();
434                 int l=p+getBufferSize();
435                 view.limit(p+getBufferSize());
436                 write(view,false);
437                 len-=getBufferSize();
438                 view.limit(l+Math.min(len,getBufferSize()));
439                 view.position(l);
440             }
441             write(view,complete);
442         }
443         else if (complete)
444         {
445             write(BufferUtil.EMPTY_BUFFER,true);
446         }
447 
448         if (complete)
449             closed();
450     }
451 
452     public void write(ByteBuffer buffer) throws IOException
453     {
454         _written+=buffer.remaining();
455         boolean complete=_channel.getResponse().isAllContentWritten(_written);
456 
457         // Async or Blocking ?
458         while(true)
459         {
460             switch(_state.get())
461             {
462                 case OPEN:
463                     // process blocking below
464                     break;
465 
466                 case ASYNC:
467                     throw new IllegalStateException("isReady() not called");
468 
469                 case READY:
470                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
471                         continue;
472 
473                     // Do the asynchronous writing from the callback
474                     new AsyncWrite(buffer,complete).iterate();
475                     return;
476 
477                 case PENDING:
478                 case UNREADY:
479                     throw new WritePendingException();
480 
481                 case ERROR:
482                     throw new EofException(_onError);
483                     
484                 case CLOSED:
485                     throw new EofException("Closed");
486 
487                 default:
488                     throw new IllegalStateException();
489             }
490             break;
491         }
492 
493 
494         // handle blocking write
495         int len=BufferUtil.length(buffer);
496 
497         // flush any content from the aggregate
498         if (BufferUtil.hasContent(_aggregate))
499             write(_aggregate, complete && len==0);
500 
501         // write any remaining content in the buffer directly
502         if (len>0)
503             write(buffer, complete);
504         else if (complete)
505             write(BufferUtil.EMPTY_BUFFER, true);
506 
507         if (complete)
508             closed();
509     }
510 
511     @Override
512     public void write(int b) throws IOException
513     {
514         _written+=1;
515         boolean complete=_channel.getResponse().isAllContentWritten(_written);
516 
517         // Async or Blocking ?
518         while(true)
519         {
520             switch(_state.get())
521             {
522                 case OPEN:
523                     if (_aggregate == null)
524                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
525                     BufferUtil.append(_aggregate, (byte)b);
526 
527                     // Check if all written or full
528                     if (complete || BufferUtil.isFull(_aggregate))
529                     {
530                         write(_aggregate, complete);
531                         if (complete)
532                             closed();
533                     }
534                     break;
535 
536                 case ASYNC:
537                     throw new IllegalStateException("isReady() not called");
538 
539                 case READY:
540                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
541                         continue;
542 
543                     if (_aggregate == null)
544                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
545                     BufferUtil.append(_aggregate, (byte)b);
546 
547                     // Check if all written or full
548                     if (!complete && !BufferUtil.isFull(_aggregate))
549                     {
550                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
551                             throw new IllegalStateException();
552                         return;
553                     }
554 
555                     // Do the asynchronous writing from the callback
556                     new AsyncFlush().iterate();
557                     return;
558 
559                 case PENDING:
560                 case UNREADY:
561                     throw new WritePendingException();
562 
563                 case ERROR:
564                     throw new EofException(_onError);
565                     
566                 case CLOSED:
567                     throw new EofException("Closed");
568 
569                 default:
570                     throw new IllegalStateException();
571             }
572             break;
573         }
574     }
575 
576     @Override
577     public void print(String s) throws IOException
578     {
579         if (isClosed())
580             throw new IOException("Closed");
581 
582         write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
583     }
584 
585     /**
586      * Blocking send of whole content.
587      *
588      * @param content The whole content to send
589      * @throws IOException if the send fails
590      */
591     public void sendContent(ByteBuffer content) throws IOException
592     {
593         if (LOG.isDebugEnabled())
594             LOG.debug("sendContent({})",BufferUtil.toDetailString(content));
595         
596         write(content, true);
597         closed();
598     }
599 
600     /**
601      * Blocking send of stream content.
602      *
603      * @param in The stream content to send
604      * @throws IOException if the send fails
605      */
606     public void sendContent(InputStream in) throws IOException
607     {
608         try(Blocker blocker = _writeBlock.acquire())
609         {
610             new InputStreamWritingCB(in, blocker).iterate();
611             blocker.block();
612         }
613         catch (Throwable failure)
614         {
615             if (LOG.isDebugEnabled())
616                 LOG.debug(failure);
617             abort(failure);
618             throw failure;
619         }
620     }
621 
622     /**
623      * Blocking send of channel content.
624      *
625      * @param in The channel content to send
626      * @throws IOException if the send fails
627      */
628     public void sendContent(ReadableByteChannel in) throws IOException
629     {
630         try(Blocker blocker = _writeBlock.acquire())
631         {
632             new ReadableByteChannelWritingCB(in, blocker).iterate();
633             blocker.block();
634         }
635         catch (Throwable failure)
636         {
637             if (LOG.isDebugEnabled())
638                 LOG.debug(failure);
639             abort(failure);
640             throw failure;
641         }
642     }
643 
644     /**
645      * Blocking send of HTTP content.
646      *
647      * @param content The HTTP content to send
648      * @throws IOException if the send fails
649      */
650     public void sendContent(HttpContent content) throws IOException
651     {
652         try(Blocker blocker = _writeBlock.acquire())
653         {
654             sendContent(content, blocker);
655             blocker.block();
656         }
657         catch (Throwable failure)
658         {
659             if (LOG.isDebugEnabled())
660                 LOG.debug(failure);
661             abort(failure);
662             throw failure;
663         }
664     }
665 
666     /**
667      * Asynchronous send of whole content.
668      * @param content The whole content to send
669      * @param callback The callback to use to notify success or failure
670      */
671     public void sendContent(ByteBuffer content, final Callback callback)
672     {
673         if (LOG.isDebugEnabled())
674             LOG.debug("sendContent(buffer={},{})",BufferUtil.toDetailString(content),callback);
675         
676         write(content, true, new Callback()
677         {
678             @Override
679             public void succeeded()
680             {
681                 closed();
682                 callback.succeeded();
683             }
684 
685             @Override
686             public void failed(Throwable x)
687             {
688                 abort(x);
689                 callback.failed(x);
690             }
691         });
692     }
693 
694     /**
695      * Asynchronous send of stream content.
696      * The stream will be closed after reading all content.
697      *
698      * @param in The stream content to send
699      * @param callback The callback to use to notify success or failure
700      */
701     public void sendContent(InputStream in, Callback callback)
702     {
703         if (LOG.isDebugEnabled())
704             LOG.debug("sendContent(stream={},{})",in,callback);
705         
706         new InputStreamWritingCB(in, callback).iterate();
707     }
708 
709     /**
710      * Asynchronous send of channel content.
711      * The channel will be closed after reading all content.
712      *
713      * @param in The channel content to send
714      * @param callback The callback to use to notify success or failure
715      */
716     public void sendContent(ReadableByteChannel in, Callback callback)
717     {
718         if (LOG.isDebugEnabled())
719             LOG.debug("sendContent(channel={},{})",in,callback);
720         
721         new ReadableByteChannelWritingCB(in, callback).iterate();
722     }
723 
724     /**
725      * Asynchronous send of HTTP content.
726      *
727      * @param httpContent The HTTP content to send
728      * @param callback The callback to use to notify success or failure
729      */
730     public void sendContent(HttpContent httpContent, Callback callback)
731     {
732         if (LOG.isDebugEnabled())
733             LOG.debug("sendContent(http={},{})",httpContent,callback);
734         
735         if (BufferUtil.hasContent(_aggregate))
736         {
737             callback.failed(new IOException("cannot sendContent() after write()"));
738             return;
739         }
740         if (_channel.isCommitted())
741         {
742             callback.failed(new IOException("committed"));
743             return;
744         }
745 
746         while (true)
747         {
748             switch(_state.get())
749             {
750                 case OPEN:
751                     if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
752                         continue;
753                     break;
754 
755                 case ERROR:
756                     callback.failed(new EofException(_onError));
757                     return;
758                     
759                 case CLOSED:
760                     callback.failed(new EofException("Closed"));
761                     return;
762 
763                 default:
764                     throw new IllegalStateException();
765             }
766             break;
767         }
768 
769         ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
770         if (buffer == null)
771             buffer = httpContent.getIndirectBuffer();
772 
773         if (buffer!=null)
774         {
775             sendContent(buffer,callback);
776             return;
777         }
778 
779         try
780         {
781             ReadableByteChannel rbc=httpContent.getReadableByteChannel();
782             if (rbc!=null)
783             {
784                 // Close of the rbc is done by the async sendContent
785                 sendContent(rbc,callback);
786                 return;
787             }
788 
789             InputStream in = httpContent.getInputStream();
790             if (in!=null)
791             {
792                 sendContent(in,callback);
793                 return;
794             }
795 
796             throw new IllegalArgumentException("unknown content for "+httpContent);
797         }
798         catch(Throwable th)
799         {
800             abort(th);
801             callback.failed(th);
802         }
803     }
804 
805     public int getBufferSize()
806     {
807         return _bufferSize;
808     }
809 
810     public void setBufferSize(int size)
811     {
812         _bufferSize = size;
813         _commitSize = size;
814     }
815 
816     public void recycle()
817     {
818         resetBuffer();
819         _interceptor=_channel;
820     }
821     
822     public void resetBuffer()
823     {
824         _written = 0;
825         if (BufferUtil.hasContent(_aggregate))
826             BufferUtil.clear(_aggregate);
827         reopen();
828     }
829 
830     @Override
831     public void setWriteListener(WriteListener writeListener)
832     {
833         if (!_channel.getState().isAsync())
834             throw new IllegalStateException("!ASYNC");
835 
836         if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
837         {
838             _writeListener = writeListener;
839             if (_channel.getState().onWritePossible())
840                 _channel.execute(_channel);
841         }
842         else
843             throw new IllegalStateException();
844     }
845 
846     /**
847      * @see javax.servlet.ServletOutputStream#isReady()
848      */
849     @Override
850     public boolean isReady()
851     {
852         while (true)
853         {
854             switch(_state.get())
855             {
856                 case OPEN:
857                     return true;
858 
859                 case ASYNC:
860                     if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
861                         continue;
862                     return true;
863 
864                 case READY:
865                     return true;
866 
867                 case PENDING:
868                     if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
869                         continue;
870                     return false;
871 
872                 case UNREADY:
873                     return false;
874 
875                 case ERROR:
876                     return true;
877                     
878                 case CLOSED:
879                     return true;
880 
881                 default:
882                     throw new IllegalStateException();
883             }
884         }
885     }
886 
887     @Override
888     public void run()
889     {
890         loop: while (true)
891         {
892             OutputState state = _state.get();
893 
894             if(_onError!=null)
895             {
896                 switch(state)
897                 {
898                     case CLOSED:
899                     case ERROR:
900                     {
901                         _onError=null;
902                         break loop;
903                     }
904                     default:
905                     {
906                         if (_state.compareAndSet(state, OutputState.ERROR))
907                         {
908                             Throwable th=_onError;
909                             _onError=null;
910                             if (LOG.isDebugEnabled())
911                                 LOG.debug("onError",th);
912                             _writeListener.onError(th);
913                             close();
914                             break loop;
915                         }
916                     }
917                 }
918                 continue;
919             }
920             
921             switch(_state.get())
922             {
923                 case CLOSED:
924                     // Even though a write is not possible, because a close has
925                     // occurred, we need to call onWritePossible to tell async
926                     // producer that the last write completed.
927                     // So fall through
928                 case READY:
929                     try
930                     {
931                         _writeListener.onWritePossible();
932                         break loop;
933                     }
934                     catch (Throwable e)
935                     {
936                         _onError = e;
937                     }
938                     break;
939                     
940                 default:
941                     _onError=new IllegalStateException("state="+_state.get());
942             }
943         }
944     }
945 
946     private void close(Closeable resource)
947     {
948         try
949         {
950             resource.close();
951         }
952         catch (Throwable x)
953         {
954             LOG.ignore(x);
955         }
956     }
957 
958     @Override
959     public String toString()
960     {
961         return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
962     }
963 
964     private abstract class AsyncICB extends IteratingCallback
965     {
966         @Override
967         protected void onCompleteSuccess()
968         {
969             while(true)
970             {
971                 OutputState last=_state.get();
972                 switch(last)
973                 {
974                     case PENDING:
975                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
976                             continue;
977                         break;
978 
979                     case UNREADY:
980                         if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
981                             continue;
982                         if (_channel.getState().onWritePossible())
983                             _channel.execute(_channel);
984                         break;
985 
986                     case CLOSED:
987                         break;
988 
989                     default:
990                         throw new IllegalStateException();
991                 }
992                 break;
993             }
994         }
995 
996         @Override
997         public void onCompleteFailure(Throwable e)
998         {
999             _onError=e==null?new IOException():e;
1000             if (_channel.getState().onWritePossible())
1001                 _channel.execute(_channel);
1002         }
1003     }
1004 
1005     private class AsyncFlush extends AsyncICB
1006     {
1007         protected volatile boolean _flushed;
1008 
1009         public AsyncFlush()
1010         {
1011         }
1012 
1013         @Override
1014         protected Action process()
1015         {
1016             if (BufferUtil.hasContent(_aggregate))
1017             {
1018                 _flushed=true;
1019                 write(_aggregate, false, this);
1020                 return Action.SCHEDULED;
1021             }
1022 
1023             if (!_flushed)
1024             {
1025                 _flushed=true;
1026                 write(BufferUtil.EMPTY_BUFFER,false,this);
1027                 return Action.SCHEDULED;
1028             }
1029 
1030             return Action.SUCCEEDED;
1031         }
1032     }
1033 
1034     private class AsyncWrite extends AsyncICB
1035     {
1036         private final ByteBuffer _buffer;
1037         private final ByteBuffer _slice;
1038         private final boolean _complete;
1039         private final int _len;
1040         protected volatile boolean _completed;
1041 
1042         public AsyncWrite(byte[] b, int off, int len, boolean complete)
1043         {
1044             _buffer=ByteBuffer.wrap(b, off, len);
1045             _len=len;
1046             // always use a view for large byte arrays to avoid JVM pooling large direct buffers
1047             _slice=_len<getBufferSize()?null:_buffer.duplicate();
1048             _complete=complete;
1049         }
1050 
1051         public AsyncWrite(ByteBuffer buffer, boolean complete)
1052         {
1053             _buffer=buffer;
1054             _len=buffer.remaining();
1055             // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
1056             if (_buffer.isDirect()||_len<getBufferSize())
1057                 _slice=null;
1058             else
1059             {
1060                 _slice=_buffer.duplicate();
1061                 _buffer.position(_buffer.limit());
1062             }                
1063             _complete=complete;
1064         }
1065 
1066         @Override
1067         protected Action process()
1068         {
1069             // flush any content from the aggregate
1070             if (BufferUtil.hasContent(_aggregate))
1071             {
1072                 _completed=_len==0;
1073                 write(_aggregate, _complete && _completed, this);
1074                 return Action.SCHEDULED;
1075             }
1076 
1077             // Can we just aggregate the remainder?
1078             if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
1079             {
1080                 int position = BufferUtil.flipToFill(_aggregate);
1081                 BufferUtil.put(_buffer,_aggregate);
1082                 BufferUtil.flipToFlush(_aggregate, position);
1083                 return Action.SUCCEEDED;
1084             }
1085             
1086             // Is there data left to write?
1087             if (_buffer.hasRemaining())
1088             {
1089                 // if there is no slice, just write it
1090                 if (_slice==null)
1091                 {
1092                     _completed=true;
1093                     write(_buffer, _complete, this);
1094                     return Action.SCHEDULED;
1095                 }
1096                 
1097                 // otherwise take a slice
1098                 int p=_buffer.position();
1099                 int l=Math.min(getBufferSize(),_buffer.remaining());
1100                 int pl=p+l;
1101                 _slice.limit(pl);
1102                 _buffer.position(pl);
1103                 _slice.position(p);
1104                 _completed=!_buffer.hasRemaining();
1105                 write(_slice, _complete && _completed, this);
1106                 return Action.SCHEDULED;
1107             }
1108             
1109             // all content written, but if we have not yet signal completion, we
1110             // need to do so
1111             if (_complete && !_completed)
1112             {
1113                 _completed=true;
1114                 write(BufferUtil.EMPTY_BUFFER, true, this);
1115                 return Action.SCHEDULED;
1116             }
1117 
1118             if (LOG.isDebugEnabled() && _completed)
1119                 LOG.debug("EOF of {}",this);
1120             return Action.SUCCEEDED;
1121         }
1122 
1123         @Override
1124         protected void onCompleteSuccess()
1125         {
1126             super.onCompleteSuccess();
1127             if (_complete)
1128                 closed();
1129         }
1130     }
1131 
1132     /**
1133      * An iterating callback that will take content from an
1134      * InputStream and write it to the associated {@link HttpChannel}.
1135      * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
1136      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1137      * be notified as each buffer is written and only once all the input is consumed will the
1138      * wrapped {@link Callback#succeeded()} method be called.
1139      */
1140     private class InputStreamWritingCB extends IteratingNestedCallback
1141     {
1142         private final InputStream _in;
1143         private final ByteBuffer _buffer;
1144         private boolean _eof;
1145 
1146         public InputStreamWritingCB(InputStream in, Callback callback)
1147         {
1148             super(callback);
1149             _in=in;
1150             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1151         }
1152 
1153         @Override
1154         protected Action process() throws Exception
1155         {
1156             // Only return if EOF has previously been read and thus
1157             // a write done with EOF=true
1158             if (_eof)
1159             {
1160                 if (LOG.isDebugEnabled())
1161                     LOG.debug("EOF of {}",this);
1162                 // Handle EOF
1163                 _in.close();
1164                 closed();
1165                 _channel.getByteBufferPool().release(_buffer);
1166                 return Action.SUCCEEDED;
1167             }
1168             
1169             // Read until buffer full or EOF
1170             int len=0;
1171             while (len<_buffer.capacity() && !_eof)
1172             {
1173                 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1174                 if (r<0)
1175                     _eof=true;
1176                 else
1177                     len+=r;
1178             }
1179 
1180             // write what we have
1181             _buffer.position(0);
1182             _buffer.limit(len);
1183             write(_buffer,_eof,this);
1184             return Action.SCHEDULED;
1185         }
1186 
1187         @Override
1188         public void onCompleteFailure(Throwable x)
1189         {
1190             abort(x);
1191             _channel.getByteBufferPool().release(_buffer);
1192             HttpOutput.this.close(_in);
1193             super.onCompleteFailure(x);
1194         }
1195     }
1196 
1197     /* ------------------------------------------------------------ */
1198     /** An iterating callback that will take content from a
1199      * ReadableByteChannel and write it to the {@link HttpChannel}.
1200      * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
1201      * {@link HttpChannel#useDirectBuffers()} is true.
1202      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1203      * be notified as each buffer is written and only once all the input is consumed will the
1204      * wrapped {@link Callback#succeeded()} method be called.
1205      */
1206     private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1207     {
1208         private final ReadableByteChannel _in;
1209         private final ByteBuffer _buffer;
1210         private boolean _eof;
1211 
1212         public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1213         {
1214             super(callback);
1215             _in=in;
1216             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1217         }
1218         
1219         @Override
1220         protected Action process() throws Exception
1221         {
1222             // Only return if EOF has previously been read and thus
1223             // a write done with EOF=true
1224             if (_eof)
1225             {
1226                 if (LOG.isDebugEnabled())
1227                     LOG.debug("EOF of {}",this);
1228                 _in.close();
1229                 closed();
1230                 _channel.getByteBufferPool().release(_buffer);
1231                 return Action.SUCCEEDED;
1232             }
1233             
1234             // Read from stream until buffer full or EOF
1235             _buffer.clear();
1236             while (_buffer.hasRemaining() && !_eof)
1237               _eof = (_in.read(_buffer)) <  0;
1238 
1239             // write what we have
1240             _buffer.flip();
1241             write(_buffer,_eof,this);
1242             
1243             return Action.SCHEDULED;
1244         }
1245 
1246         @Override
1247         public void onCompleteFailure(Throwable x)
1248         {
1249             abort(x);
1250             _channel.getByteBufferPool().release(_buffer);
1251             HttpOutput.this.close(_in);
1252             super.onCompleteFailure(x);
1253         }
1254     }
1255 }