1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.util.Objects;
23 import javax.servlet.ReadListener;
24 import javax.servlet.ServletInputStream;
25
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.RuntimeIOException;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35
36
37
38
39 public abstract class HttpInput<T> extends ServletInputStream implements Runnable
40 {
41 private final static Logger LOG = Log.getLogger(HttpInput.class);
42
43 private final byte[] _oneByteBuffer = new byte[1];
44 private final Object _lock;
45 private HttpChannelState _channelState;
46 private ReadListener _listener;
47 private Throwable _onError;
48 private boolean _notReady;
49 private State _contentState = STREAM;
50 private State _eofState;
51 private long _contentRead;
52
53 protected HttpInput()
54 {
55 this(null);
56 }
57
58 protected HttpInput(Object lock)
59 {
60 _lock = lock == null ? this : lock;
61 }
62
63 public void init(HttpChannelState state)
64 {
65 synchronized (lock())
66 {
67 _channelState = state;
68 }
69 }
70
71 public final Object lock()
72 {
73 return _lock;
74 }
75
76 public void recycle()
77 {
78 synchronized (lock())
79 {
80 _listener = null;
81 _onError = null;
82 _notReady = false;
83 _contentState = STREAM;
84 _eofState = null;
85 _contentRead = 0;
86 }
87 }
88
89 @Override
90 public int available()
91 {
92 try
93 {
94 synchronized (lock())
95 {
96 T item = getNextContent();
97 return item == null ? 0 : remaining(item);
98 }
99 }
100 catch (IOException e)
101 {
102 throw new RuntimeIOException(e);
103 }
104 }
105
106 @Override
107 public int read() throws IOException
108 {
109 int read = read(_oneByteBuffer, 0, 1);
110 return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
111 }
112
113 @Override
114 public int read(byte[] b, int off, int len) throws IOException
115 {
116 synchronized (lock())
117 {
118 T item = getNextContent();
119 if (item == null)
120 {
121 _contentState.waitForContent(this);
122 item = getNextContent();
123 if (item == null)
124 return _contentState.noContent();
125 }
126 int l = get(item, b, off, len);
127 _contentRead += l;
128 return l;
129 }
130 }
131
132
133
134
135
136
137
138
139
140 protected T getNextContent() throws IOException
141 {
142 T content = nextContent();
143 if (content == null)
144 {
145 synchronized (lock())
146 {
147 if (_eofState != null)
148 {
149 LOG.debug("{} eof {}", this, _eofState);
150 _contentState = _eofState;
151 }
152 }
153 }
154 return content;
155 }
156
157
158
159
160
161
162
163
164
165
166
167 protected abstract T nextContent() throws IOException;
168
169
170
171
172
173 protected abstract int remaining(T item);
174
175
176
177
178
179
180
181
182
183
184 protected abstract int get(T item, byte[] buffer, int offset, int length);
185
186
187
188
189
190
191
192 protected abstract void consume(T item, int length);
193
194
195
196
197
198
199 protected abstract void blockForContent() throws IOException;
200
201
202
203
204
205
206 public abstract void content(T item);
207
208 protected boolean onAsyncRead()
209 {
210 synchronized (lock())
211 {
212 if (_listener == null)
213 return false;
214 }
215 _channelState.onReadPossible();
216 return true;
217 }
218
219 public long getContentRead()
220 {
221 synchronized (lock())
222 {
223 return _contentRead;
224 }
225 }
226
227
228
229
230
231
232
233
234 public void earlyEOF()
235 {
236 synchronized (lock())
237 {
238 if (!isEOF())
239 {
240 LOG.debug("{} early EOF", this);
241 _eofState = EARLY_EOF;
242 if (_listener == null)
243 return;
244 }
245 }
246 _channelState.onReadPossible();
247 }
248
249
250
251
252
253 public void messageComplete()
254 {
255 synchronized (lock())
256 {
257 if (!isEOF())
258 {
259 LOG.debug("{} EOF", this);
260 _eofState = EOF;
261 if (_listener == null)
262 return;
263 }
264 }
265 _channelState.onReadPossible();
266 }
267
268 public void consumeAll()
269 {
270 synchronized (lock())
271 {
272 try
273 {
274 while (!isFinished())
275 {
276 T item = getNextContent();
277 if (item == null)
278 _contentState.waitForContent(this);
279 else
280 consume(item, remaining(item));
281 }
282 }
283 catch (IOException e)
284 {
285 LOG.debug(e);
286 }
287 }
288 }
289
290
291
292
293 public boolean isEOF()
294 {
295 synchronized (lock())
296 {
297 return _eofState != null && _eofState.isEOF();
298 }
299 }
300
301 @Override
302 public boolean isFinished()
303 {
304 synchronized (lock())
305 {
306 return _contentState.isEOF();
307 }
308 }
309
310 @Override
311 public boolean isReady()
312 {
313 boolean finished;
314 synchronized (lock())
315 {
316 if (_contentState.isEOF())
317 return true;
318 if (_listener == null )
319 return true;
320 if (available() > 0)
321 return true;
322 if (_notReady)
323 return false;
324 _notReady = true;
325 finished = isFinished();
326 }
327 if (finished)
328 _channelState.onReadPossible();
329 else
330 unready();
331 return false;
332 }
333
334 protected void unready()
335 {
336 }
337
338 @Override
339 public void setReadListener(ReadListener readListener)
340 {
341 readListener = Objects.requireNonNull(readListener);
342 synchronized (lock())
343 {
344 if (_contentState != STREAM)
345 throw new IllegalStateException("state=" + _contentState);
346 _contentState = ASYNC;
347 _listener = readListener;
348 _notReady = true;
349 }
350 _channelState.onReadPossible();
351 }
352
353 public void failed(Throwable x)
354 {
355 synchronized (lock())
356 {
357 if (_onError != null)
358 LOG.warn(x);
359 else
360 _onError = x;
361 }
362 }
363
364 @Override
365 public void run()
366 {
367 final Throwable error;
368 final ReadListener listener;
369 boolean available = false;
370 final boolean eof;
371
372 synchronized (lock())
373 {
374 if (!_notReady || _listener == null)
375 return;
376
377 error = _onError;
378 listener = _listener;
379
380 try
381 {
382 T item = getNextContent();
383 available = item != null && remaining(item) > 0;
384 }
385 catch (Exception e)
386 {
387 failed(e);
388 }
389
390 eof = !available && isFinished();
391 _notReady = !available && !eof;
392 }
393
394 try
395 {
396 if (error != null)
397 listener.onError(error);
398 else if (available)
399 listener.onDataAvailable();
400 else if (eof)
401 listener.onAllDataRead();
402 else
403 unready();
404 }
405 catch (Throwable e)
406 {
407 LOG.warn(e.toString());
408 LOG.debug(e);
409 listener.onError(e);
410 }
411 }
412
413 protected static abstract class State
414 {
415 public void waitForContent(HttpInput<?> in) throws IOException
416 {
417 }
418
419 public int noContent() throws IOException
420 {
421 return -1;
422 }
423
424 public boolean isEOF()
425 {
426 return false;
427 }
428 }
429
430 protected static final State STREAM = new State()
431 {
432 @Override
433 public void waitForContent(HttpInput<?> input) throws IOException
434 {
435 input.blockForContent();
436 }
437
438 public String toString()
439 {
440 return "STREAM";
441 }
442 };
443
444 protected static final State ASYNC = new State()
445 {
446 @Override
447 public int noContent() throws IOException
448 {
449 return 0;
450 }
451
452 @Override
453 public String toString()
454 {
455 return "ASYNC";
456 }
457 };
458
459 protected static final State EARLY_EOF = new State()
460 {
461 @Override
462 public int noContent() throws IOException
463 {
464 throw new EofException();
465 }
466
467 @Override
468 public boolean isEOF()
469 {
470 return true;
471 }
472
473 public String toString()
474 {
475 return "EARLY_EOF";
476 }
477 };
478
479 protected static final State EOF = new State()
480 {
481 @Override
482 public boolean isEOF()
483 {
484 return true;
485 }
486
487 public String toString()
488 {
489 return "EOF";
490 }
491 };
492 }