1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.util.Objects;
23 import javax.servlet.ReadListener;
24 import javax.servlet.ServletInputStream;
25
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.RuntimeIOException;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35
36
37
38
39 public abstract class HttpInput<T> extends ServletInputStream implements Runnable
40 {
41 private final static Logger LOG = Log.getLogger(HttpInput.class);
42
43 private final byte[] _oneByteBuffer = new byte[1];
44 private final Object _lock;
45 private HttpChannelState _channelState;
46 private ReadListener _listener;
47 private Throwable _onError;
48 private boolean _notReady;
49 private State _contentState = STREAM;
50 private State _eofState;
51 private long _contentRead;
52
53 protected HttpInput()
54 {
55 this(null);
56 }
57
58 protected HttpInput(Object lock)
59 {
60 _lock = lock == null ? this : lock;
61 }
62
63 public void init(HttpChannelState state)
64 {
65 synchronized (lock())
66 {
67 _channelState = state;
68 }
69 }
70
71 public final Object lock()
72 {
73 return _lock;
74 }
75
76 public void recycle()
77 {
78 synchronized (lock())
79 {
80 _listener = null;
81 _onError = null;
82 _notReady = false;
83 _contentState = STREAM;
84 _eofState = null;
85 _contentRead = 0;
86 }
87 }
88
89 @Override
90 public int available()
91 {
92 try
93 {
94 synchronized (lock())
95 {
96 T item = getNextContent();
97 return item == null ? 0 : remaining(item);
98 }
99 }
100 catch (IOException e)
101 {
102 throw new RuntimeIOException(e);
103 }
104 }
105
106 @Override
107 public int read() throws IOException
108 {
109 int read = read(_oneByteBuffer, 0, 1);
110 return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
111 }
112
113 @Override
114 public int read(byte[] b, int off, int len) throws IOException
115 {
116 synchronized (lock())
117 {
118 T item = getNextContent();
119 if (item == null)
120 {
121 _contentState.waitForContent(this);
122 item = getNextContent();
123 if (item == null)
124 return _contentState.noContent();
125 }
126 int l = get(item, b, off, len);
127 _contentRead += l;
128 return l;
129 }
130 }
131
132
133
134
135
136
137
138
139
140 protected T getNextContent() throws IOException
141 {
142 T content = nextContent();
143 if (content == null)
144 {
145 synchronized (lock())
146 {
147 if (_eofState != null)
148 {
149 if (LOG.isDebugEnabled())
150 LOG.debug("{} eof {}", this, _eofState);
151 _contentState = _eofState;
152 }
153 }
154 }
155 return content;
156 }
157
158
159
160
161
162
163
164
165
166
167
168 protected abstract T nextContent() throws IOException;
169
170
171
172
173
174 protected abstract int remaining(T item);
175
176
177
178
179
180
181
182
183
184
185 protected abstract int get(T item, byte[] buffer, int offset, int length);
186
187
188
189
190
191
192
193 protected abstract void consume(T item, int length);
194
195
196
197
198
199
200 protected abstract void blockForContent() throws IOException;
201
202
203
204
205
206
207 public abstract void content(T item);
208
209 protected boolean onAsyncRead()
210 {
211 synchronized (lock())
212 {
213 if (_listener == null)
214 return false;
215 }
216 _channelState.onReadPossible();
217 return true;
218 }
219
220 public long getContentRead()
221 {
222 synchronized (lock())
223 {
224 return _contentRead;
225 }
226 }
227
228
229
230
231
232
233
234
235 public void earlyEOF()
236 {
237 synchronized (lock())
238 {
239 if (!isEOF())
240 {
241 if (LOG.isDebugEnabled())
242 LOG.debug("{} early EOF", this);
243 _eofState = EARLY_EOF;
244 if (_listener == null)
245 return;
246 }
247 }
248 _channelState.onReadPossible();
249 }
250
251
252 public boolean isEarlyEOF()
253 {
254 synchronized (lock())
255 {
256 return _contentState==EARLY_EOF;
257 }
258 }
259
260
261
262
263
264 public void messageComplete()
265 {
266 synchronized (lock())
267 {
268 if (!isEOF())
269 {
270 if (LOG.isDebugEnabled())
271 LOG.debug("{} EOF", this);
272 _eofState = EOF;
273 if (_listener == null)
274 return;
275 }
276 }
277 _channelState.onReadPossible();
278 }
279
280 public void consumeAll()
281 {
282 synchronized (lock())
283 {
284 try
285 {
286 while (!isFinished())
287 {
288 T item = getNextContent();
289 if (item == null)
290 _contentState.waitForContent(this);
291 else
292 consume(item, remaining(item));
293 }
294 }
295 catch (IOException e)
296 {
297 LOG.debug(e);
298 }
299 }
300 }
301
302 public boolean isAsync()
303 {
304 synchronized (lock())
305 {
306 return _contentState==ASYNC;
307 }
308 }
309
310
311
312
313 public boolean isEOF()
314 {
315 synchronized (lock())
316 {
317 return _eofState != null && _eofState.isEOF();
318 }
319 }
320
321 @Override
322 public boolean isFinished()
323 {
324 synchronized (lock())
325 {
326 return _contentState.isEOF();
327 }
328 }
329
330
331 @Override
332 public boolean isReady()
333 {
334 boolean finished;
335 synchronized (lock())
336 {
337 if (_contentState.isEOF())
338 return true;
339 if (_listener == null )
340 return true;
341 if (available() > 0)
342 return true;
343 if (_notReady)
344 return false;
345 _notReady = true;
346 finished = isFinished();
347 }
348 if (finished)
349 _channelState.onReadPossible();
350 else
351 unready();
352 return false;
353 }
354
355 protected void unready()
356 {
357 }
358
359 @Override
360 public void setReadListener(ReadListener readListener)
361 {
362 readListener = Objects.requireNonNull(readListener);
363 synchronized (lock())
364 {
365 if (_contentState != STREAM)
366 throw new IllegalStateException("state=" + _contentState);
367 _contentState = ASYNC;
368 _listener = readListener;
369 _notReady = true;
370 }
371 _channelState.onReadPossible();
372 }
373
374 public void failed(Throwable x)
375 {
376 synchronized (lock())
377 {
378 if (_onError != null)
379 LOG.warn(x);
380 else
381 _onError = x;
382 }
383 }
384
385 @Override
386 public void run()
387 {
388 final Throwable error;
389 final ReadListener listener;
390 boolean available = false;
391 final boolean eof;
392
393 synchronized (lock())
394 {
395 if (!_notReady || _listener == null)
396 return;
397
398 error = _onError;
399 listener = _listener;
400
401 try
402 {
403 T item = getNextContent();
404 available = item != null && remaining(item) > 0;
405 }
406 catch (Exception e)
407 {
408 failed(e);
409 }
410
411 eof = !available && isFinished();
412 _notReady = !available && !eof;
413 }
414
415 try
416 {
417 if (error != null)
418 listener.onError(error);
419 else if (available)
420 listener.onDataAvailable();
421 else if (eof)
422 listener.onAllDataRead();
423 else
424 unready();
425 }
426 catch (Throwable e)
427 {
428 LOG.warn(e.toString());
429 LOG.debug(e);
430 listener.onError(e);
431 }
432 }
433
434 protected static abstract class State
435 {
436 public void waitForContent(HttpInput<?> in) throws IOException
437 {
438 }
439
440 public int noContent() throws IOException
441 {
442 return -1;
443 }
444
445 public boolean isEOF()
446 {
447 return false;
448 }
449 }
450
451 protected static final State STREAM = new State()
452 {
453 @Override
454 public void waitForContent(HttpInput<?> input) throws IOException
455 {
456 input.blockForContent();
457 }
458
459 @Override
460 public String toString()
461 {
462 return "STREAM";
463 }
464 };
465
466 protected static final State ASYNC = new State()
467 {
468 @Override
469 public int noContent() throws IOException
470 {
471 return 0;
472 }
473
474 @Override
475 public String toString()
476 {
477 return "ASYNC";
478 }
479 };
480
481 protected static final State EARLY_EOF = new State()
482 {
483 @Override
484 public int noContent() throws IOException
485 {
486 throw new EofException("Early EOF");
487 }
488
489 @Override
490 public boolean isEOF()
491 {
492 return true;
493 }
494
495 @Override
496 public String toString()
497 {
498 return "EARLY_EOF";
499 }
500 };
501
502 protected static final State EOF = new State()
503 {
504 @Override
505 public boolean isEOF()
506 {
507 return true;
508 }
509
510 @Override
511 public String toString()
512 {
513 return "EOF";
514 }
515 };
516 }