1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23
24 import javax.servlet.ServletInputStream;
25
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.RuntimeIOException;
28 import org.eclipse.jetty.util.ArrayQueue;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31
32
33
34
35
36
37
38
39
40
41
42 public abstract class HttpInput<T> extends ServletInputStream
43 {
44 private final static Logger LOG = Log.getLogger(HttpInput.class);
45 private final ArrayQueue<T> _inputQ = new ArrayQueue<>();
46 protected boolean _earlyEOF;
47 protected boolean _inputEOF;
48
49 public Object lock()
50 {
51 return _inputQ.lock();
52 }
53
54 public void recycle()
55 {
56 synchronized (lock())
57 {
58 T item = _inputQ.peekUnsafe();
59 while (item != null)
60 {
61 _inputQ.pollUnsafe();
62 onContentConsumed(item);
63
64 item = _inputQ.peekUnsafe();
65 if (item == null)
66 onAllContentConsumed();
67 }
68 _inputEOF = false;
69 _earlyEOF = false;
70 }
71 }
72
73 @Override
74 public int read() throws IOException
75 {
76 byte[] bytes = new byte[1];
77 int read = read(bytes, 0, 1);
78 return read < 0 ? -1 : 0xff & bytes[0];
79 }
80
81 @Override
82 public int available()
83 {
84 synchronized (lock())
85 {
86 T item = _inputQ.peekUnsafe();
87 if (item == null)
88 return 0;
89 return remaining(item);
90 }
91 }
92
93 @Override
94 public int read(byte[] b, int off, int len) throws IOException
95 {
96 T item = null;
97 synchronized (lock())
98 {
99
100 item = _inputQ.peekUnsafe();
101
102
103 while (item != null && remaining(item) == 0)
104 {
105 _inputQ.pollUnsafe();
106 onContentConsumed(item);
107 LOG.debug("{} consumed {}", this, item);
108 item = _inputQ.peekUnsafe();
109
110
111 if (item==null)
112 onAllContentConsumed();
113 }
114
115
116 if (item == null)
117 {
118
119 if (isEarlyEOF())
120 throw new EofException();
121
122
123 if (isShutdown())
124 {
125 onEOF();
126 return -1;
127 }
128
129
130 blockForContent();
131
132
133 item = _inputQ.peekUnsafe();
134 if (item==null)
135 {
136 if (isEarlyEOF())
137 throw new EofException();
138
139
140
141 if (!isShutdown())
142 LOG.warn("Unexpected !EOF: "+this);
143
144 onEOF();
145 return -1;
146 }
147 }
148 }
149 return get(item, b, off, len);
150 }
151 protected abstract int remaining(T item);
152
153 protected abstract int get(T item, byte[] buffer, int offset, int length);
154
155 protected abstract void onContentConsumed(T item);
156
157 protected void blockForContent() throws IOException
158 {
159 synchronized (lock())
160 {
161 while (_inputQ.isEmpty() && !isShutdown() && !isEarlyEOF())
162 {
163 try
164 {
165 LOG.debug("{} waiting for content", this);
166 lock().wait();
167 }
168 catch (InterruptedException e)
169 {
170 throw (IOException)new InterruptedIOException().initCause(e);
171 }
172 }
173 }
174 }
175
176
177
178
179
180 protected void onContentQueued(T item)
181 {
182 lock().notify();
183 }
184
185
186
187
188 protected void onAllContentConsumed()
189 {
190 }
191
192
193
194
195 protected void onEOF()
196 {
197 }
198
199
200
201
202
203 public void content(T item)
204 {
205 synchronized (lock())
206 {
207
208
209
210 _inputQ.add(item);
211 onContentQueued(item);
212 LOG.debug("{} queued {}", this, item);
213 }
214 }
215
216
217
218
219
220
221
222 public void earlyEOF()
223 {
224 synchronized (lock())
225 {
226 _earlyEOF = true;
227 _inputEOF = true;
228 lock().notify();
229 LOG.debug("{} early EOF", this);
230 }
231 }
232
233
234 public boolean isEarlyEOF()
235 {
236 synchronized (lock())
237 {
238 return _earlyEOF;
239 }
240 }
241
242
243 public void shutdown()
244 {
245 synchronized (lock())
246 {
247 _inputEOF = true;
248 lock().notify();
249 LOG.debug("{} shutdown", this);
250 }
251 }
252
253
254 public boolean isShutdown()
255 {
256 synchronized (lock())
257 {
258 return _inputEOF;
259 }
260 }
261
262
263 public void consumeAll()
264 {
265 synchronized (lock())
266 {
267 T item = _inputQ.peekUnsafe();
268 loop: while (!isShutdown() && !isEarlyEOF())
269 {
270 while (item != null)
271 {
272 _inputQ.pollUnsafe();
273 onContentConsumed(item);
274
275 item = _inputQ.peekUnsafe();
276 if (item == null)
277 onAllContentConsumed();
278 }
279
280 try
281 {
282 blockForContent();
283 item = _inputQ.peekUnsafe();
284 if (item==null)
285 break;
286 }
287 catch (IOException e)
288 {
289 LOG.debug(e);
290 break loop;
291 }
292 }
293 }
294 }
295 }