1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.util.Objects;
23 import javax.servlet.ReadListener;
24 import javax.servlet.ServletInputStream;
25
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.RuntimeIOException;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35
36
37
38
39 public abstract class HttpInput<T> extends ServletInputStream implements Runnable
40 {
41 private final static Logger LOG = Log.getLogger(HttpInput.class);
42
43 private final byte[] _oneByteBuffer = new byte[1];
44 private final Object _lock;
45 private HttpChannelState _channelState;
46 private ReadListener _listener;
47 private Throwable _onError;
48 private boolean _notReady;
49 private State _contentState = STREAM;
50 private State _eofState;
51 private long _contentRead;
52
53 protected HttpInput()
54 {
55 this(null);
56 }
57
58 protected HttpInput(Object lock)
59 {
60 _lock = lock == null ? this : lock;
61 }
62
63 public void init(HttpChannelState state)
64 {
65 synchronized (lock())
66 {
67 _channelState = state;
68 }
69 }
70
71 public final Object lock()
72 {
73 return _lock;
74 }
75
76 public void recycle()
77 {
78 synchronized (lock())
79 {
80 _listener = null;
81 _onError = null;
82 _notReady = false;
83 _contentState = STREAM;
84 _eofState = null;
85 _contentRead = 0;
86 }
87 }
88
89 @Override
90 public int available()
91 {
92 try
93 {
94 synchronized (lock())
95 {
96 T item = getNextContent();
97 return item == null ? 0 : remaining(item);
98 }
99 }
100 catch (IOException e)
101 {
102 throw new RuntimeIOException(e);
103 }
104 }
105
106 @Override
107 public int read() throws IOException
108 {
109 int read = read(_oneByteBuffer, 0, 1);
110 return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
111 }
112
113 @Override
114 public int read(byte[] b, int off, int len) throws IOException
115 {
116 synchronized (lock())
117 {
118 T item = getNextContent();
119 if (item == null)
120 {
121 _contentState.waitForContent(this);
122 item = getNextContent();
123 if (item == null)
124 return _contentState.noContent();
125 }
126 int l = get(item, b, off, len);
127 _contentRead += l;
128 return l;
129 }
130 }
131
132
133
134
135
136
137
138
139
140 protected T getNextContent() throws IOException
141 {
142 T content = nextContent();
143 if (content == null)
144 {
145 synchronized (lock())
146 {
147 if (_eofState != null)
148 {
149 if (LOG.isDebugEnabled())
150 LOG.debug("{} eof {}", this, _eofState);
151 _contentState = _eofState;
152 }
153 }
154 }
155 return content;
156 }
157
158
159
160
161
162
163
164
165
166
167
168 protected abstract T nextContent() throws IOException;
169
170
171
172
173
174 protected abstract int remaining(T item);
175
176
177
178
179
180
181
182
183
184
185 protected abstract int get(T item, byte[] buffer, int offset, int length);
186
187
188
189
190
191
192
193 protected abstract void consume(T item, int length);
194
195
196
197
198
199
200 protected abstract void blockForContent() throws IOException;
201
202
203
204
205
206
207 public abstract void content(T item);
208
209 protected boolean onAsyncRead()
210 {
211 synchronized (lock())
212 {
213 if (_listener == null)
214 return false;
215 }
216 _channelState.onReadPossible();
217 return true;
218 }
219
220 public long getContentRead()
221 {
222 synchronized (lock())
223 {
224 return _contentRead;
225 }
226 }
227
228
229
230
231
232
233
234
235 public void earlyEOF()
236 {
237 synchronized (lock())
238 {
239 if (!isEOF())
240 {
241 if (LOG.isDebugEnabled())
242 LOG.debug("{} early EOF", this);
243 _eofState = EARLY_EOF;
244 if (_listener == null)
245 return;
246 }
247 }
248 _channelState.onReadPossible();
249 }
250
251
252 public boolean isEarlyEOF()
253 {
254 synchronized (lock())
255 {
256 return _contentState==EARLY_EOF;
257 }
258 }
259
260
261
262
263
264 public void messageComplete()
265 {
266 synchronized (lock())
267 {
268 if (!isEOF())
269 {
270 if (LOG.isDebugEnabled())
271 LOG.debug("{} EOF", this);
272 _eofState = EOF;
273 if (_listener == null)
274 return;
275 }
276 }
277 _channelState.onReadPossible();
278 }
279
280 public boolean consumeAll()
281 {
282 synchronized (lock())
283 {
284
285 if (_onError != null)
286 return false;
287
288 try
289 {
290 while (!isFinished())
291 {
292 T item = getNextContent();
293 if (item == null)
294 _contentState.waitForContent(this);
295 else
296 consume(item, remaining(item));
297 }
298 return true;
299 }
300 catch (IOException e)
301 {
302 LOG.debug(e);
303 return false;
304 }
305 }
306 }
307
308 public boolean isAsync()
309 {
310 synchronized (lock())
311 {
312 return _contentState==ASYNC;
313 }
314 }
315
316
317
318
319 public boolean isEOF()
320 {
321 synchronized (lock())
322 {
323 return _eofState != null && _eofState.isEOF();
324 }
325 }
326
327 @Override
328 public boolean isFinished()
329 {
330 synchronized (lock())
331 {
332 return _contentState.isEOF();
333 }
334 }
335
336
337 @Override
338 public boolean isReady()
339 {
340 boolean finished;
341 synchronized (lock())
342 {
343 if (_contentState.isEOF())
344 return true;
345 if (_listener == null )
346 return true;
347 if (available() > 0)
348 return true;
349 if (_notReady)
350 return false;
351 _notReady = true;
352 finished = isFinished();
353 }
354 if (finished)
355 _channelState.onReadPossible();
356 else
357 unready();
358 return false;
359 }
360
361 protected void unready()
362 {
363 }
364
365 @Override
366 public void setReadListener(ReadListener readListener)
367 {
368 readListener = Objects.requireNonNull(readListener);
369 synchronized (lock())
370 {
371 if (_contentState != STREAM)
372 throw new IllegalStateException("state=" + _contentState);
373 _contentState = ASYNC;
374 _listener = readListener;
375 _notReady = true;
376 }
377 _channelState.onReadPossible();
378 }
379
380 public void failed(Throwable x)
381 {
382 synchronized (lock())
383 {
384 if (_onError != null)
385 LOG.warn(x);
386 else
387 _onError = x;
388 }
389 }
390
391 @Override
392 public void run()
393 {
394 final Throwable error;
395 final ReadListener listener;
396 boolean available = false;
397 final boolean eof;
398
399 synchronized (lock())
400 {
401 if (!_notReady || _listener == null)
402 return;
403
404 error = _onError;
405 listener = _listener;
406
407 try
408 {
409 T item = getNextContent();
410 available = item != null && remaining(item) > 0;
411 }
412 catch (Exception e)
413 {
414 failed(e);
415 }
416
417 eof = !available && isFinished();
418 _notReady = !available && !eof;
419 }
420
421 try
422 {
423 if (error != null)
424 listener.onError(error);
425 else if (available)
426 listener.onDataAvailable();
427 else if (eof)
428 listener.onAllDataRead();
429 else
430 unready();
431 }
432 catch (Throwable e)
433 {
434 LOG.warn(e.toString());
435 LOG.debug(e);
436 listener.onError(e);
437 }
438 }
439
440 protected static abstract class State
441 {
442 public void waitForContent(HttpInput<?> in) throws IOException
443 {
444 }
445
446 public int noContent() throws IOException
447 {
448 return -1;
449 }
450
451 public boolean isEOF()
452 {
453 return false;
454 }
455 }
456
457 protected static final State STREAM = new State()
458 {
459 @Override
460 public void waitForContent(HttpInput<?> input) throws IOException
461 {
462 input.blockForContent();
463 }
464
465 @Override
466 public String toString()
467 {
468 return "STREAM";
469 }
470 };
471
472 protected static final State ASYNC = new State()
473 {
474 @Override
475 public int noContent() throws IOException
476 {
477 return 0;
478 }
479
480 @Override
481 public String toString()
482 {
483 return "ASYNC";
484 }
485 };
486
487 protected static final State EARLY_EOF = new State()
488 {
489 @Override
490 public int noContent() throws IOException
491 {
492 throw new EofException("Early EOF");
493 }
494
495 @Override
496 public boolean isEOF()
497 {
498 return true;
499 }
500
501 @Override
502 public String toString()
503 {
504 return "EARLY_EOF";
505 }
506 };
507
508 protected static final State EOF = new State()
509 {
510 @Override
511 public boolean isEOF()
512 {
513 return true;
514 }
515
516 @Override
517 public String toString()
518 {
519 return "EOF";
520 }
521 };
522 }