View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  
24  import javax.servlet.ServletInputStream;
25  
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.RuntimeIOException;
28  import org.eclipse.jetty.util.ArrayQueue;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  
32  /**
33   * <p>{@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
34   * <p>{@link HttpInput} holds a queue of items passed to it by calls to {@link #content(T)}.</p>
35   * <p>{@link HttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
36   * but simply holds references to the item, thus the caller must organize for those buffers to valid while
37   * held by this class.</p>
38   * <p>To assist the caller, subclasses may override methods {@link #onContentQueued(T)},
39   * {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the
40   * caller will know when buffers are queued and consumed.</p>
41   */
42  public abstract class HttpInput<T> extends ServletInputStream
43  {
44      private final static Logger LOG = Log.getLogger(HttpInput.class);
45      private final ArrayQueue<T> _inputQ = new ArrayQueue<>();
46      protected boolean _earlyEOF;
47      protected boolean _inputEOF;
48  
49      public Object lock()
50      {
51          return _inputQ.lock();
52      }
53  
54      public void recycle()
55      {
56          synchronized (lock())
57          {
58              T item = _inputQ.peekUnsafe();
59              while (item != null)
60              {
61                  _inputQ.pollUnsafe();
62                  onContentConsumed(item);
63  
64                  item = _inputQ.peekUnsafe();
65                  if (item == null)
66                      onAllContentConsumed();
67              }
68              _inputEOF = false;
69              _earlyEOF = false;
70          }
71      }
72  
73      @Override
74      public int read() throws IOException
75      {
76          byte[] bytes = new byte[1];
77          int read = read(bytes, 0, 1);
78          return read < 0 ? -1 : 0xff & bytes[0];
79      }
80  
81      @Override
82      public int available()
83      {
84          synchronized (lock())
85          {
86              T item = _inputQ.peekUnsafe();
87              if (item == null)
88                  return 0;
89              return remaining(item);
90          }
91      }
92  
93      @Override
94      public int read(byte[] b, int off, int len) throws IOException
95      {
96          T item = null;
97          synchronized (lock())
98          {
99              // Get the current head of the input Q
100             item = _inputQ.peekUnsafe();
101             
102             // Skip empty items at the head of the queue
103             while (item != null && remaining(item) == 0)
104             {
105                 _inputQ.pollUnsafe();
106                 onContentConsumed(item);
107                 LOG.debug("{} consumed {}", this, item);
108                 item = _inputQ.peekUnsafe();
109                 
110                 // If that was the last item then notify
111                 if (item==null)
112                     onAllContentConsumed();
113             }
114 
115             // If we have no item
116             if (item == null)
117             {
118                 // Was it unexpectedly EOF'd?
119                 if (isEarlyEOF())
120                     throw new EofException();
121 
122                 // check for EOF
123                 if (isShutdown())
124                 {
125                     onEOF();
126                     return -1;
127                 }
128 
129                 // OK then block for some more content
130                 blockForContent();
131                 
132                 // If still not content, we must be closed
133                 item = _inputQ.peekUnsafe();
134                 if (item==null)
135                 {
136                     if (isEarlyEOF())
137                         throw new EofException();
138                     
139                     // blockForContent will only return with no 
140                     // content if it is closed.
141                     if (!isShutdown())
142                         LOG.warn("Unexpected !EOF: "+this);
143 
144                     onEOF();
145                     return -1;
146                 }
147             }
148         }
149         return get(item, b, off, len);
150     }
151     protected abstract int remaining(T item);
152 
153     protected abstract int get(T item, byte[] buffer, int offset, int length);
154 
155     protected abstract void onContentConsumed(T item);
156 
157     protected void blockForContent() throws IOException
158     {
159         synchronized (lock())
160         {
161             while (_inputQ.isEmpty() && !isShutdown() && !isEarlyEOF())
162             {
163                 try
164                 {
165                     LOG.debug("{} waiting for content", this);
166                     lock().wait();
167                 }
168                 catch (InterruptedException e)
169                 {
170                     throw (IOException)new InterruptedIOException().initCause(e);
171                 }
172             }
173         }
174     }
175 
176     /* ------------------------------------------------------------ */
177     /** Called by this HttpInput to signal new content has been queued
178      * @param item
179      */
180     protected void onContentQueued(T item)
181     {
182         lock().notify();
183     }
184 
185     /* ------------------------------------------------------------ */
186     /** Called by this HttpInput to signal all available content has been consumed
187      */
188     protected void onAllContentConsumed()
189     {
190     }
191 
192     /* ------------------------------------------------------------ */
193     /** Called by this HttpInput to signal it has reached EOF
194      */
195     protected void onEOF()
196     {
197     }
198 
199     /* ------------------------------------------------------------ */
200     /** Add some content to the input stream
201      * @param item
202      */
203     public void content(T item)
204     {
205         synchronized (lock())
206         {
207             // The buffer is not copied here.  This relies on the caller not recycling the buffer
208             // until the it is consumed.  The onAllContentConsumed() callback is the signal to the
209             // caller that the buffers can be recycled.
210             _inputQ.add(item);
211             onContentQueued(item);
212             LOG.debug("{} queued {}", this, item);
213         }
214     }
215 
216     /* ------------------------------------------------------------ */
217     /** This method should be called to signal to the HttpInput
218      * that an EOF has arrived before all the expected content.
219      * Typically this will result in an EOFException being thrown
220      * from a subsequent read rather than a -1 return.
221      */
222     public void earlyEOF()
223     {
224         synchronized (lock())
225         {
226             _earlyEOF = true;
227             _inputEOF = true;
228             lock().notify();
229             LOG.debug("{} early EOF", this);
230         }
231     }
232 
233     /* ------------------------------------------------------------ */
234     public boolean isEarlyEOF()
235     {
236         synchronized (lock())
237         {
238             return _earlyEOF;
239         }
240     }
241 
242     /* ------------------------------------------------------------ */
243     public void shutdown()
244     {
245         synchronized (lock())
246         {
247             _inputEOF = true;
248             lock().notify();
249             LOG.debug("{} shutdown", this);
250         }
251     }
252 
253     /* ------------------------------------------------------------ */
254     public boolean isShutdown()
255     {
256         synchronized (lock())
257         {
258             return _inputEOF;
259         }
260     }
261 
262     /* ------------------------------------------------------------ */
263     public void consumeAll()
264     {
265         synchronized (lock())
266         {
267             T item = _inputQ.peekUnsafe();
268             loop: while (!isShutdown() && !isEarlyEOF())
269             {
270                 while (item != null)
271                 {
272                     _inputQ.pollUnsafe();
273                     onContentConsumed(item);
274 
275                     item = _inputQ.peekUnsafe();
276                     if (item == null)
277                         onAllContentConsumed();
278                 }
279 
280                 try
281                 {
282                     blockForContent();
283                     item = _inputQ.peekUnsafe();
284                     if (item==null)
285                         break;
286                 }
287                 catch (IOException e)
288                 {
289                     LOG.debug(e);
290                     break loop;
291                 }
292             }
293         }
294     }
295 }