1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22
23 import javax.servlet.ReadListener;
24 import javax.servlet.ServletInputStream;
25
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.RuntimeIOException;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public abstract class HttpInput<T> extends ServletInputStream implements Runnable
52 {
53 private final static Logger LOG = Log.getLogger(HttpInput.class);
54
55 private final byte[] _oneByteBuffer = new byte[1];
56 private HttpChannelState _channelState;
57 private Throwable _onError;
58 private ReadListener _listener;
59 private boolean _notReady;
60
61 protected State _state = BLOCKING;
62 private State _eof=null;
63 private final Object _lock;
64 private long _contentRead;
65
66 protected HttpInput()
67 {
68 this(null);
69 }
70
71 protected HttpInput(Object lock)
72 {
73 _lock=lock==null?this:lock;
74 }
75
76 public final Object lock()
77 {
78 return _lock;
79 }
80
81 public void recycle()
82 {
83 synchronized (lock())
84 {
85 _state = BLOCKING;
86 _eof=null;
87 _onError=null;
88 _contentRead=0;
89 }
90 }
91
92
93
94
95
96
97
98
99 protected abstract T nextContent() throws IOException;
100
101
102
103
104
105
106
107
108 protected T getNextContent() throws IOException
109 {
110 T content=nextContent();
111
112 if (content==null && _eof!=null)
113 {
114 LOG.debug("{} eof {}",this,_eof);
115 _state=_eof;
116 _eof=null;
117 }
118
119 return content;
120 }
121
122 @Override
123 public int read() throws IOException
124 {
125 int read = read(_oneByteBuffer, 0, 1);
126 return read < 0 ? -1 : 0xff & _oneByteBuffer[0];
127 }
128
129 @Override
130 public int available()
131 {
132 try
133 {
134 synchronized (lock())
135 {
136 T item = getNextContent();
137 return item==null?0:remaining(item);
138 }
139 }
140 catch (IOException e)
141 {
142 throw new RuntimeIOException(e);
143 }
144 }
145
146 @Override
147 public int read(byte[] b, int off, int len) throws IOException
148 {
149 T item = null;
150 int l;
151 synchronized (lock())
152 {
153
154
155
156 item = getNextContent();
157
158
159 if (item == null)
160 {
161 _state.waitForContent(this);
162 item=getNextContent();
163 if (item==null)
164 return _state.noContent();
165 }
166
167 l=get(item, b, off, len);
168 _contentRead+=l;
169
170 }
171 return l;
172 }
173
174 protected abstract int remaining(T item);
175
176 protected abstract int get(T item, byte[] buffer, int offset, int length);
177
178 protected abstract void consume(T item, int length);
179
180 protected abstract void blockForContent() throws IOException;
181
182 protected boolean onAsyncRead()
183 {
184 if (_listener==null)
185 return false;
186 _channelState.onReadPossible();
187 return true;
188 }
189
190 public long getContentRead()
191 {
192 synchronized (lock())
193 {
194 return _contentRead;
195 }
196 }
197
198
199
200
201 public abstract void content(T item);
202
203
204
205
206
207
208
209 public void earlyEOF()
210 {
211 synchronized (lock())
212 {
213 if (_eof==null || !_eof.isEOF())
214 {
215 LOG.debug("{} early EOF", this);
216 _eof=EARLY_EOF;
217 if (_listener!=null)
218 _channelState.onReadPossible();
219 }
220 }
221 }
222
223 public void messageComplete()
224 {
225 synchronized (lock())
226 {
227 if (_eof==null || !_eof.isEOF())
228 {
229 LOG.debug("{} EOF", this);
230 _eof=EOF;
231 if (_listener!=null)
232 _channelState.onReadPossible();
233 }
234 }
235 }
236
237 public void consumeAll()
238 {
239 synchronized (lock())
240 {
241 try
242 {
243 while (!isFinished())
244 {
245 T item = getNextContent();
246 if (item==null)
247 _state.waitForContent(this);
248 else
249 consume(item,remaining(item));
250 }
251 }
252 catch (IOException e)
253 {
254 LOG.debug(e);
255 }
256 }
257 }
258
259 @Override
260 public boolean isFinished()
261 {
262 synchronized (lock())
263 {
264 return _state.isEOF();
265 }
266 }
267
268 @Override
269 public boolean isReady()
270 {
271 synchronized (lock())
272 {
273 if (_listener==null)
274 return true;
275 int available = available();
276 if (available>0)
277 return true;
278 if (!_notReady)
279 {
280 _notReady=true;
281 if (_state.isEOF())
282 _channelState.onReadPossible();
283 else
284 unready();
285 }
286 return false;
287 }
288 }
289
290 protected void unready()
291 {
292 }
293
294 @Override
295 public void setReadListener(ReadListener readListener)
296 {
297 if (readListener==null)
298 throw new NullPointerException("readListener==null");
299 synchronized (lock())
300 {
301 if (_state!=BLOCKING)
302 throw new IllegalStateException("state="+_state);
303 _state=ASYNC;
304 _listener=readListener;
305 _notReady=true;
306
307 _channelState.onReadPossible();
308 }
309 }
310
311 public void failed(Throwable x)
312 {
313 synchronized (lock())
314 {
315 if (_onError==null)
316 LOG.warn(x);
317 else
318 _onError=x;
319 }
320 }
321
322 @Override
323 public void run()
324 {
325 final boolean available;
326 final boolean eof;
327 final Throwable x;
328
329 synchronized (lock())
330 {
331 if (!_notReady || _listener==null)
332 return;
333
334 x=_onError;
335 T item;
336 try
337 {
338 item = getNextContent();
339 }
340 catch(Exception e)
341 {
342 item=null;
343 failed(e);
344 }
345 available= item!=null && remaining(item)>0;
346
347 eof = !available && _state.isEOF();
348 _notReady=!available&&!eof;
349 }
350
351 try
352 {
353 if (x!=null)
354 _listener.onError(x);
355 else if (available)
356 _listener.onDataAvailable();
357 else if (eof)
358 _listener.onAllDataRead();
359 else
360 unready();
361 }
362 catch(Throwable e)
363 {
364 LOG.warn(e.toString());
365 LOG.debug(e);
366 _listener.onError(e);
367 }
368 }
369
370 protected static class State
371 {
372 public void waitForContent(HttpInput<?> in) throws IOException
373 {
374 }
375
376 public int noContent() throws IOException
377 {
378 return -1;
379 }
380
381 public boolean isEOF()
382 {
383 return false;
384 }
385 }
386
387 protected static final State BLOCKING= new State()
388 {
389 @Override
390 public void waitForContent(HttpInput<?> in) throws IOException
391 {
392 in.blockForContent();
393 }
394 public String toString()
395 {
396 return "OPEN";
397 }
398 };
399
400 protected static final State ASYNC= new State()
401 {
402 @Override
403 public int noContent() throws IOException
404 {
405 return 0;
406 }
407 @Override
408 public String toString()
409 {
410 return "ASYNC";
411 }
412 };
413
414 protected static final State EARLY_EOF= new State()
415 {
416 @Override
417 public int noContent() throws IOException
418 {
419 throw new EofException();
420 }
421 @Override
422 public boolean isEOF()
423 {
424 return true;
425 }
426 public String toString()
427 {
428 return "EARLY_EOF";
429 }
430 };
431
432 protected static final State EOF= new State()
433 {
434 @Override
435 public boolean isEOF()
436 {
437 return true;
438 }
439
440 public String toString()
441 {
442 return "EOF";
443 }
444 };
445
446 public void init(HttpChannelState state)
447 {
448 synchronized (lock())
449 {
450 _channelState=state;
451 }
452 }
453
454 }