View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  
24  import javax.servlet.ServletInputStream;
25  
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.RuntimeIOException;
28  import org.eclipse.jetty.util.ArrayQueue;
29  import org.eclipse.jetty.util.log.Log;
30  import org.eclipse.jetty.util.log.Logger;
31  
32  /**
33   * <p>{@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
34   * <p>{@link HttpInput} holds a queue of items passed to it by calls to {@link #content(T)}.</p>
35   * <p>{@link HttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
36   * but simply holds references to the item, thus the caller must organize for those buffers to valid while
37   * held by this class.</p>
38   * <p>To assist the caller, subclasses may override methods {@link #onContentQueued(T)},
39   * {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the
40   * caller will know when buffers are queued and consumed.</p>
41   */
42  public abstract class HttpInput<T> extends ServletInputStream
43  {
44      private final static Logger LOG = Log.getLogger(HttpInput.class);
45      private final ArrayQueue<T> _inputQ = new ArrayQueue<>();
46      private boolean _earlyEOF;
47      private boolean _inputEOF;
48  
49      public Object lock()
50      {
51          return _inputQ.lock();
52      }
53  
54      public void recycle()
55      {
56          synchronized (lock())
57          {
58              T item = _inputQ.peekUnsafe();
59              while (item != null)
60              {
61                  _inputQ.pollUnsafe();
62                  onContentConsumed(item);
63  
64                  item = _inputQ.peekUnsafe();
65                  if (item == null)
66                      onAllContentConsumed();
67              }
68              _inputEOF = false;
69              _earlyEOF = false;
70          }
71      }
72  
73      @Override
74      public int read() throws IOException
75      {
76          byte[] bytes = new byte[1];
77          int read = read(bytes, 0, 1);
78          return read < 0 ? -1 : 0xff & bytes[0];
79      }
80  
81      @Override
82      public int available()
83      {
84          synchronized (lock())
85          {
86              T item = _inputQ.peekUnsafe();
87              if (item == null)
88                  return 0;
89              return remaining(item);
90          }
91      }
92  
93      @Override
94      public int read(byte[] b, int off, int len) throws IOException
95      {
96          T item = null;
97          synchronized (lock())
98          {
99              while (item == null)
100             {
101                 item = _inputQ.peekUnsafe();
102                 while (item != null && remaining(item) == 0)
103                 {
104                     _inputQ.pollUnsafe();
105                     onContentConsumed(item);
106                     LOG.debug("{} consumed {}", this, item);
107                     item = _inputQ.peekUnsafe();
108                 }
109 
110                 if (item == null)
111                 {
112                     onAllContentConsumed();
113 
114                     if (isEarlyEOF())
115                         throw new EofException();
116 
117                     // check for EOF
118                     if (isShutdown())
119                     {
120                         onEOF();
121                         return -1;
122                     }
123 
124                     blockForContent();
125                 }
126             }
127         }
128         return get(item, b, off, len);
129     }
130     protected abstract int remaining(T item);
131 
132     protected abstract int get(T item, byte[] buffer, int offset, int length);
133 
134     protected abstract void onContentConsumed(T item);
135 
136     protected void blockForContent() throws IOException
137     {
138         synchronized (lock())
139         {
140             while (_inputQ.isEmpty() && !isShutdown() && !isEarlyEOF())
141             {
142                 try
143                 {
144                     LOG.debug("{} waiting for content", this);
145                     lock().wait();
146                 }
147                 catch (InterruptedException e)
148                 {
149                     throw (IOException)new InterruptedIOException().initCause(e);
150                 }
151             }
152         }
153     }
154 
155     protected void onContentQueued(T item)
156     {
157         lock().notify();
158     }
159 
160     protected void onAllContentConsumed()
161     {
162     }
163 
164     protected void onEOF()
165     {
166     }
167 
168     public boolean content(T item)
169     {
170         synchronized (lock())
171         {
172             // The buffer is not copied here.  This relies on the caller not recycling the buffer
173             // until the it is consumed.  The onAllContentConsumed() callback is the signal to the
174             // caller that the buffers can be recycled.
175             _inputQ.add(item);
176             onContentQueued(item);
177             LOG.debug("{} queued {}", this, item);
178         }
179         return true;
180     }
181 
182     public void earlyEOF()
183     {
184         synchronized (lock())
185         {
186             _earlyEOF = true;
187             lock().notify();
188             LOG.debug("{} early EOF", this);
189         }
190     }
191 
192     public boolean isEarlyEOF()
193     {
194         synchronized (lock())
195         {
196             return _earlyEOF;
197         }
198     }
199 
200     public void shutdown()
201     {
202         synchronized (lock())
203         {
204             _inputEOF = true;
205             lock().notify();
206             LOG.debug("{} shutdown", this);
207         }
208     }
209 
210     public boolean isShutdown()
211     {
212         synchronized (lock())
213         {
214             return _inputEOF;
215         }
216     }
217 
218     public void consumeAll()
219     {
220         synchronized (lock())
221         {
222             while (!isShutdown() && !isEarlyEOF())
223             {
224                 T item = _inputQ.peekUnsafe();
225                 while (item != null)
226                 {
227                     _inputQ.pollUnsafe();
228                     onContentConsumed(item);
229 
230                     item = _inputQ.peekUnsafe();
231                     if (item == null)
232                         onAllContentConsumed();
233                 }
234 
235                 try
236                 {
237                     blockForContent();
238                 }
239                 catch (IOException e)
240                 {
241                     throw new RuntimeIOException(e);
242                 }
243             }
244         }
245     }
246 }