View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  
23  import javax.servlet.ReadListener;
24  import javax.servlet.ServletInputStream;
25  
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.RuntimeIOException;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  
31  /**
32   * <p>{@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
33   * <p>{@link HttpInput} holds a queue of items passed to it by calls to {@link #content(T)}.</p>
34   * <p>{@link HttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
35   * but simply holds references to the item, thus the caller must organize for those buffers to valid while
36   * held by this class.</p>
37   * <p>To assist the caller, subclasses may override methods {@link #onContentQueued(T)},
38   * {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the
39   * caller will know when buffers are queued and consumed.</p>
40   */
41  /**
42   * @author gregw
43   *
44   * @param <T>
45   */
46  /**
47   * @author gregw
48   *
49   * @param <T>
50   */
51  public abstract class HttpInput<T> extends ServletInputStream implements Runnable
52  {
53      private final static Logger LOG = Log.getLogger(HttpInput.class);
54  
55      private final byte[] _oneByteBuffer = new byte[1];
56      private HttpChannelState _channelState;
57      private Throwable _onError;
58      private ReadListener _listener;
59      private boolean _notReady;
60  
61      protected State _state = BLOCKING;
62      private State _eof=null;
63      private final Object _lock;
64      private long _contentRead;
65  
66      protected HttpInput()
67      {
68          this(null);
69      }
70  
71      protected HttpInput(Object lock)
72      {
73          _lock=lock==null?this:lock;
74      }
75  
76      public final Object lock()
77      {
78          return _lock;
79      }
80  
81      public void recycle()
82      {
83          synchronized (lock())
84          {
85              _state = BLOCKING;
86              _eof=null;
87              _onError=null;
88              _contentRead=0;
89          }
90      }
91  
92      /**
93       * Access the next content to be consumed from.   Returning the next item does not consume it
94       * and it may be returned multiple times until it is consumed.   Calls to {@link #get(Object, byte[], int, int)}
95       * or {@link #consume(Object, int)} are required to consume data from the content.
96       * @return Content or null if none available.
97       * @throws IOException
98       */
99      protected abstract T nextContent() throws IOException;
100 
101     /**
102      * A convenience method to call nextContent and to check the return value, which if null then the
103      * a check is made for EOF and the state changed accordingly.
104      * @see #nextContent()
105      * @return Content or null if none available.
106      * @throws IOException
107      */
108     protected T getNextContent() throws IOException
109     {
110         T content=nextContent();
111 
112         if (content==null && _eof!=null)
113         {
114             LOG.debug("{} eof {}",this,_eof);
115             _state=_eof;
116             _eof=null;
117         }
118 
119         return content;
120     }
121 
122     @Override
123     public int read() throws IOException
124     {
125         int read = read(_oneByteBuffer, 0, 1);
126         return read < 0 ? -1 : 0xff & _oneByteBuffer[0];
127     }
128 
129     @Override
130     public int available()
131     {
132         try
133         {
134             synchronized (lock())
135             {
136                 T item = getNextContent();
137                 return item==null?0:remaining(item);
138             }
139         }
140         catch (IOException e)
141         {
142             throw new RuntimeIOException(e);
143         }
144     }
145 
146     @Override
147     public int read(byte[] b, int off, int len) throws IOException
148     {
149         T item = null;
150         int l;
151         synchronized (lock())
152         {
153             // System.err.printf("read s=%s q=%d e=%s%n",_state,_inputQ.size(),_eof);
154 
155             // Get the current head of the input Q
156             item = getNextContent();
157 
158             // If we have no item
159             if (item == null)
160             {
161                 _state.waitForContent(this);
162                 item=getNextContent();
163                 if (item==null)
164                     return _state.noContent();
165             }
166             
167             l=get(item, b, off, len);
168             _contentRead+=l;
169             
170         }
171         return l;
172     }
173     
174     protected abstract int remaining(T item);
175 
176     protected abstract int get(T item, byte[] buffer, int offset, int length);
177 
178     protected abstract void consume(T item, int length);
179 
180     protected abstract void blockForContent() throws IOException;
181 
182     protected boolean onAsyncRead()
183     {
184         if (_listener==null)
185             return false;
186         _channelState.onReadPossible();
187         return true;
188     }
189 
190     public long getContentRead()
191     {
192         synchronized (lock())
193         {
194             return _contentRead;
195         }
196     }
197     
198     /** Add some content to the input stream
199      * @param item
200      */
201     public abstract void content(T item);
202 
203 
204     /** This method should be called to signal to the HttpInput
205      * that an EOF has arrived before all the expected content.
206      * Typically this will result in an EOFException being thrown
207      * from a subsequent read rather than a -1 return.
208      */
209     public void earlyEOF()
210     {
211         synchronized (lock())
212         {
213             if (_eof==null || !_eof.isEOF())
214             {
215                 LOG.debug("{} early EOF", this);
216                 _eof=EARLY_EOF;
217                 if (_listener!=null)
218                     _channelState.onReadPossible();
219             }
220         }
221     }
222 
223     public void messageComplete()
224     {
225         synchronized (lock())
226         {
227             if (_eof==null || !_eof.isEOF())
228             {
229                 LOG.debug("{} EOF", this);
230                 _eof=EOF;
231                 if (_listener!=null)
232                     _channelState.onReadPossible();
233             }
234         }
235     }
236 
237     public void consumeAll()
238     {
239         synchronized (lock())
240         {
241             try
242             {
243                 while (!isFinished())
244                 {
245                     T item = getNextContent();
246                     if (item==null)
247                         _state.waitForContent(this);
248                     else
249                         consume(item,remaining(item));
250                 }
251             }
252             catch (IOException e)
253             {
254                 LOG.debug(e);
255             }
256         }
257     }
258 
259     @Override
260     public boolean isFinished()
261     {
262         synchronized (lock())
263         {
264             return _state.isEOF();
265         }
266     }
267 
268     @Override
269     public boolean isReady()
270     {
271         synchronized (lock())
272         {
273             if (_listener==null)
274                 return true;
275             int available = available();
276             if (available>0)
277                 return true;
278             if (!_notReady)
279             {
280                 _notReady=true;
281                 if (_state.isEOF())
282                     _channelState.onReadPossible();
283                 else
284                     unready();
285             }
286             return false;
287         }
288     }
289 
290     protected void unready()
291     {
292     }
293 
294     @Override
295     public void setReadListener(ReadListener readListener)
296     {
297         if (readListener==null)
298             throw new NullPointerException("readListener==null");
299         synchronized (lock())
300         {
301             if (_state!=BLOCKING)
302                 throw new IllegalStateException("state="+_state);
303             _state=ASYNC;
304             _listener=readListener;
305             _notReady=true;
306 
307             _channelState.onReadPossible();
308         }
309     }
310 
311     public void failed(Throwable x)
312     {
313         synchronized (lock())
314         {
315             if (_onError==null)
316                 LOG.warn(x);
317             else
318                 _onError=x;
319         }
320     }
321 
322     @Override
323     public void run()
324     {
325         final boolean available;
326         final boolean eof;
327         final Throwable x;
328 
329         synchronized (lock())
330         {
331             if (!_notReady || _listener==null)
332                 return;
333 
334             x=_onError;
335             T item;
336             try
337             {
338                 item = getNextContent();
339             }
340             catch(Exception e)
341             {
342                 item=null;
343                 failed(e);
344             }
345             available= item!=null && remaining(item)>0;
346 
347             eof = !available && _state.isEOF();
348             _notReady=!available&&!eof;
349         }
350 
351         try
352         {
353             if (x!=null)
354                 _listener.onError(x);
355             else if (available)
356                 _listener.onDataAvailable();
357             else if (eof)
358                 _listener.onAllDataRead();
359             else
360                 unready();
361         }
362         catch(Throwable e)
363         {
364             LOG.warn(e.toString());
365             LOG.debug(e);
366             _listener.onError(e);
367         }
368     }
369 
370     protected static class State
371     {
372         public void waitForContent(HttpInput<?> in) throws IOException
373         {
374         }
375 
376         public int noContent() throws IOException
377         {
378             return -1;
379         }
380 
381         public boolean isEOF()
382         {
383             return false;
384         }
385     }
386 
387     protected static final State BLOCKING= new State()
388     {
389         @Override
390         public void waitForContent(HttpInput<?> in) throws IOException
391         {
392             in.blockForContent();
393         }
394         public String toString()
395         {
396             return "OPEN";
397         }
398     };
399 
400     protected static final State ASYNC= new State()
401     {
402         @Override
403         public int noContent() throws IOException
404         {
405             return 0;
406         }
407         @Override
408         public String toString()
409         {
410             return "ASYNC";
411         }
412     };
413 
414     protected static final State EARLY_EOF= new State()
415     {
416         @Override
417         public int noContent() throws IOException
418         {
419             throw new EofException();
420         }
421         @Override
422         public boolean isEOF()
423         {
424             return true;
425         }
426         public String toString()
427         {
428             return "EARLY_EOF";
429         }
430     };
431 
432     protected static final State EOF= new State()
433     {
434         @Override
435         public boolean isEOF()
436         {
437             return true;
438         }
439 
440         public String toString()
441         {
442             return "EOF";
443         }
444     };
445 
446     public void init(HttpChannelState state)
447     {
448         synchronized (lock())
449         {
450             _channelState=state;
451         }
452     }
453 
454 }