View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ReadableByteChannel;
26  import javax.servlet.RequestDispatcher;
27  import javax.servlet.ServletOutputStream;
28  import javax.servlet.ServletRequest;
29  import javax.servlet.ServletResponse;
30  
31  import org.eclipse.jetty.http.HttpContent;
32  import org.eclipse.jetty.http.HttpHeader;
33  import org.eclipse.jetty.io.EofException;
34  import org.eclipse.jetty.util.BlockingCallback;
35  import org.eclipse.jetty.util.BufferUtil;
36  import org.eclipse.jetty.util.Callback;
37  import org.eclipse.jetty.util.IteratingCallback;
38  import org.eclipse.jetty.util.log.Log;
39  import org.eclipse.jetty.util.log.Logger;
40  import org.eclipse.jetty.util.resource.Resource;
41  
42  /**
43   * <p>{@link HttpOutput} implements {@link ServletOutputStream}
44   * as required by the Servlet specification.</p>
45   * <p>{@link HttpOutput} buffers content written by the application until a
46   * further write will overflow the buffer, at which point it triggers a commit
47   * of the response.</p>
48   * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
49   * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
50   * close the stream, to be reopened after the inclusion ends.</p>
51   */
52  public class HttpOutput extends ServletOutputStream
53  {
54      private static Logger LOG = Log.getLogger(HttpOutput.class);
55      private final HttpChannel<?> _channel;
56      private boolean _closed;
57      private long _written;
58      private ByteBuffer _aggregate;
59      private int _bufferSize;
60  
61      public HttpOutput(HttpChannel<?> channel)
62      {
63          _channel = channel;
64          _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
65      }
66  
67      public boolean isWritten()
68      {
69          return _written > 0;
70      }
71  
72      public long getWritten()
73      {
74          return _written;
75      }
76  
77      public void reset()
78      {
79          _written = 0;
80          reopen();
81      }
82  
83      public void reopen()
84      {
85          _closed = false;
86      }
87  
88      /** Called by the HttpChannel if the output was closed
89       * externally (eg by a 500 exception handling).
90       */
91      void closed()
92      {
93          if (!_closed)
94          {
95              _closed = true;
96              try
97              {
98                  _channel.getResponse().closeOutput(); 
99              }
100             catch(IOException e)
101             {
102                 _channel.failed();
103                 LOG.ignore(e);
104             }
105             releaseBuffer();
106         }
107     }
108 
109     @Override
110     public void close()
111     {
112         if (!isClosed())
113         {
114             try
115             {
116                 if (BufferUtil.hasContent(_aggregate))
117                     _channel.write(_aggregate, !_channel.getResponse().isIncluding());
118                 else
119                     _channel.write(BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
120             }
121             catch(IOException e)
122             {
123                 _channel.failed();
124                 LOG.ignore(e);
125             }
126         }
127         closed();
128     }
129 
130     private void releaseBuffer()
131     {
132         if (_aggregate != null)
133         {
134             _channel.getConnector().getByteBufferPool().release(_aggregate);
135             _aggregate = null;
136         }
137     }
138 
139     public boolean isClosed()
140     {
141         return _closed;
142     }
143 
144     @Override
145     public void flush() throws IOException
146     {
147         if (isClosed())
148             return;
149 
150         if (BufferUtil.hasContent(_aggregate))
151             _channel.write(_aggregate, false);
152         else
153             _channel.write(BufferUtil.EMPTY_BUFFER, false);
154     }
155 
156     public boolean isAllContentWritten()
157     {
158         Response response=_channel.getResponse();
159         return response.isAllContentWritten(_written);
160     }
161     
162     public void closeOutput() throws IOException
163     {
164         _channel.getResponse().closeOutput();
165     }
166 
167     @Override
168     public void write(byte[] b, int off, int len) throws IOException
169     {  
170         if (isClosed())
171             throw new EofException("Closed");
172 
173         _written+=len;
174         boolean complete=_channel.getResponse().isAllContentWritten(_written);
175         int capacity = getBufferSize();
176 
177         // Should we aggregate?
178         if (!complete && len<=capacity/4)
179         {
180             if (_aggregate == null)
181                 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
182 
183             // YES - fill the aggregate with content from the buffer
184             int filled = BufferUtil.fill(_aggregate, b, off, len);
185 
186             // return if we are not complete, not full and filled all the content
187             if (!complete && filled == len && !BufferUtil.isFull(_aggregate))
188                 return;
189 
190             // adjust offset/length
191             off += filled;
192             len -= filled;
193         }
194 
195         // flush any content from the aggregate
196         if (BufferUtil.hasContent(_aggregate))
197         {
198             _channel.write(_aggregate, complete && len==0);
199 
200             // should we fill aggregate again from the buffer?
201             if (len>0 && !complete && len<=_aggregate.capacity()/4)
202             {
203                 BufferUtil.append(_aggregate, b, off, len);
204                 return;
205             }
206         }
207 
208         // write any remaining content in the buffer directly
209         if (len>0)
210             // pass as readonly to avoid space stealing optimisation in HttpConnection 
211             _channel.write(ByteBuffer.wrap(b, off, len).asReadOnlyBuffer(), complete);
212         else if (complete)
213             _channel.write(BufferUtil.EMPTY_BUFFER,complete);
214 
215         if (complete)
216             closed();
217     }
218 
219 
220     @Override
221     public void write(int b) throws IOException
222     {
223         if (isClosed())
224             throw new EOFException("Closed");
225 
226         // Allocate an aggregate buffer.
227         // Never direct as it is slow to do little writes to a direct buffer.
228         if (_aggregate == null)
229             _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
230 
231         BufferUtil.append(_aggregate, (byte)b);
232         _written++;
233 
234         boolean complete=_channel.getResponse().isAllContentWritten(_written);
235         
236         // Check if all written or full
237         if (complete ||  BufferUtil.isFull(_aggregate))
238         {
239             BlockingCallback callback = _channel.getWriteBlockingCallback();
240             _channel.write(_aggregate, false, callback);
241             callback.block();
242             if (complete)
243                 closed();
244         }
245     }
246 
247     @Override
248     public void print(String s) throws IOException
249     {
250         if (isClosed())
251             throw new IOException("Closed");
252 
253         write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
254     }
255 
256     /* ------------------------------------------------------------ */
257     /** Set headers and send content.
258      * @deprecated Use {@link Response#setHeaders(HttpContent)} and {@link #sendContent(HttpContent)} instead.
259      * @param content
260      * @throws IOException
261      */
262     @Deprecated
263     public void sendContent(Object content) throws IOException
264     {
265         final BlockingCallback callback =_channel.getWriteBlockingCallback();
266 
267         if (content instanceof HttpContent)
268         {
269             _channel.getResponse().setHeaders((HttpContent)content);
270             sendContent((HttpContent)content,callback);
271         }
272         else if (content instanceof Resource)
273         {
274             Resource resource = (Resource)content;
275             _channel.getResponse().getHttpFields().putDateField(HttpHeader.LAST_MODIFIED, resource.lastModified());
276             
277             ReadableByteChannel in=((Resource)content).getReadableByteChannel();
278             if (in!=null)
279                 sendContent(in,callback);
280             else
281                 sendContent(resource.getInputStream(),callback);
282         }
283         else if (content instanceof ByteBuffer)
284         {
285             sendContent((ByteBuffer)content,callback);
286         }
287         else if (content instanceof ReadableByteChannel)
288         {
289             sendContent((ReadableByteChannel)content,callback);
290         }
291         else if (content instanceof InputStream)
292         {
293             sendContent((InputStream)content,callback);
294         }
295         else
296             callback.failed(new IllegalArgumentException("unknown content type "+content.getClass()));
297 
298         callback.block();
299     }
300 
301     /* ------------------------------------------------------------ */
302     /** Blocking send of content.
303      * @param content The content to send.
304      * @throws IOException
305      */
306     public void sendContent(ByteBuffer content) throws IOException
307     {
308         final BlockingCallback callback =_channel.getWriteBlockingCallback();
309         if (content.hasArray()&&content.limit()<content.capacity())
310             content=content.asReadOnlyBuffer();
311         _channel.write(content,true,callback);
312         callback.block();
313     }
314 
315     /* ------------------------------------------------------------ */
316     /** Blocking send of content.
317      * @param in The content to send
318      * @throws IOException
319      */
320     public void sendContent(InputStream in) throws IOException
321     {
322         final BlockingCallback callback =_channel.getWriteBlockingCallback();
323         new InputStreamWritingCB(in,callback).iterate();
324         callback.block();
325     }
326 
327     /* ------------------------------------------------------------ */
328     /** Blocking send of content.
329      * @param in The content to send
330      * @throws IOException
331      */
332     public void sendContent(ReadableByteChannel in) throws IOException
333     {
334         final BlockingCallback callback =_channel.getWriteBlockingCallback();
335         new ReadableByteChannelWritingCB(in,callback).iterate();
336         callback.block();
337     }
338     
339 
340     /* ------------------------------------------------------------ */
341     /** Blocking send of content.
342      * @param content The content to send
343      * @throws IOException
344      */
345     public void sendContent(HttpContent content) throws IOException
346     {
347         final BlockingCallback callback =_channel.getWriteBlockingCallback();
348         sendContent(content,callback);
349         callback.block();
350     }
351    
352     /* ------------------------------------------------------------ */
353     /** Asynchronous send of content.
354      * @param content The content to send
355      * @param callback The callback to use to notify success or failure
356      */
357     public void sendContent(ByteBuffer content, final Callback callback)
358     {
359         if (content.hasArray()&&content.limit()<content.capacity())
360             content=content.asReadOnlyBuffer();
361         _channel.write(content,true,new Callback()
362         {
363             @Override
364             public void succeeded()
365             {
366                 closed();
367                 callback.succeeded();
368             }
369 
370             @Override
371             public void failed(Throwable x)
372             {
373                 callback.failed(x);
374             }            
375         });
376     }
377 
378     /* ------------------------------------------------------------ */
379     /** Asynchronous send of content.
380      * @param in The content to send as a stream.  The stream will be closed 
381      * after reading all content.
382      * @param callback The callback to use to notify success or failure
383      */
384     public void sendContent(InputStream in, Callback callback)
385     {
386         new InputStreamWritingCB(in,callback).iterate();
387     }
388 
389     /* ------------------------------------------------------------ */
390     /** Asynchronous send of content.
391      * @param in The content to send as a channel.  The channel will be closed 
392      * after reading all content.
393      * @param callback The callback to use to notify success or failure
394      */
395     public void sendContent(ReadableByteChannel in, Callback callback)
396     {
397         new ReadableByteChannelWritingCB(in,callback).iterate();
398     }
399 
400     /* ------------------------------------------------------------ */
401     /** Asynchronous send of content.
402      * @param httpContent The content to send
403      * @param callback The callback to use to notify success or failure
404      */
405     public void sendContent(HttpContent httpContent, Callback callback) throws IOException
406     {
407         if (isClosed())
408             throw new IOException("Closed");
409         if (BufferUtil.hasContent(_aggregate))
410             throw new IOException("written");
411         if (_channel.isCommitted())
412             throw new IOException("committed");
413             
414         ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
415         if (buffer == null)
416             buffer = httpContent.getIndirectBuffer();
417         
418         if (buffer!=null)
419         {
420             sendContent(buffer,callback);
421             return;
422         }
423         
424         ReadableByteChannel rbc=httpContent.getReadableByteChannel();
425         if (rbc!=null)
426         {
427             // Close of the rbc is done by the async sendContent
428             sendContent(rbc,callback);
429             return;
430         }
431            
432         InputStream in = httpContent.getInputStream();
433         if ( in!=null )
434         {
435             sendContent(in,callback);
436             return;
437         }
438 
439         callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
440     }
441 
442     public int getBufferSize()
443     {
444         return _bufferSize;
445     }
446 
447     public void setBufferSize(int size)
448     {
449         this._bufferSize = size;
450     }
451 
452     public void resetBuffer()
453     {
454         if (BufferUtil.hasContent(_aggregate))
455             BufferUtil.clear(_aggregate);
456     }
457     
458     
459     /* ------------------------------------------------------------ */
460     /** An iterating callback that will take content from an 
461      * InputStream and write it to the associated {@link HttpChannel}.
462      * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used. 
463      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
464      * be notified as each buffer is written and only once all the input is consumed will the 
465      * wrapped {@link Callback#succeeded()} method be called. 
466      */
467     private class InputStreamWritingCB extends IteratingCallback
468     {
469         private final InputStream _in;
470         private final ByteBuffer _buffer;
471         private boolean _eof;
472         
473         public InputStreamWritingCB(InputStream in, Callback callback)
474         {          
475             super(callback);
476             _in=in;
477             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
478         }
479 
480         @Override
481         protected boolean process() throws Exception
482         {
483             // Only return if EOF has previously been read and thus
484             // a write done with EOF=true
485             if (_eof)
486             {
487                 // Handle EOF
488                 _in.close();
489                 closed();
490                 _channel.getByteBufferPool().release(_buffer);
491                 return true;
492             }
493             
494             // Read until buffer full or EOF
495             int len=0;
496             while (len<_buffer.capacity() && !_eof)
497             {
498                 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
499                 if (r<0)
500                     _eof=true;
501                 else
502                     len+=r;
503             }
504 
505             // write what we have
506             _buffer.position(0);
507             _buffer.limit(len);
508             _channel.write(_buffer,_eof,this);
509             return false;
510         }
511 
512         @Override
513         public void failed(Throwable x)
514         {
515             super.failed(x);
516             _channel.getByteBufferPool().release(_buffer);
517             try
518             {
519                 _in.close();
520             }
521             catch (IOException e)
522             {
523                 LOG.ignore(e);
524             }
525         }
526         
527     }
528 
529     /* ------------------------------------------------------------ */
530     /** An iterating callback that will take content from a 
531      * ReadableByteChannel and write it to the {@link HttpChannel}.
532      * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
533      * {@link HttpChannel#useDirectBuffers()} is true.
534      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
535      * be notified as each buffer is written and only once all the input is consumed will the 
536      * wrapped {@link Callback#succeeded()} method be called. 
537      */
538     private class ReadableByteChannelWritingCB extends IteratingCallback
539     {
540         private final ReadableByteChannel _in;
541         private final ByteBuffer _buffer;
542         private boolean _eof;
543         
544         public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
545         {          
546             super(callback);
547             _in=in;
548             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
549         }
550 
551         @Override
552         protected boolean process() throws Exception
553         {
554             // Only return if EOF has previously been read and thus
555             // a write done with EOF=true
556             if (_eof)
557             {
558                 _in.close();
559                 closed();
560                 _channel.getByteBufferPool().release(_buffer);
561                 return true;
562             }
563             
564             // Read from stream until buffer full or EOF
565             _buffer.clear();
566             while (_buffer.hasRemaining() && !_eof)
567               _eof = (_in.read(_buffer)) <  0;
568             
569             // write what we have
570             _buffer.flip();
571             _channel.write(_buffer,_eof,this);
572             return false;
573         }
574 
575         @Override
576         public void failed(Throwable x)
577         {
578             super.failed(x);
579             _channel.getByteBufferPool().release(_buffer);
580             try
581             {
582                 _in.close();
583             }
584             catch (IOException e)
585             {
586                 LOG.ignore(e);
587             }
588         }
589     }
590 }