1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23
24 import javax.servlet.ServletInputStream;
25
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.RuntimeIOException;
28 import org.eclipse.jetty.util.ArrayQueue;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31
32
33
34
35
36
37
38
39
40
41
42 public abstract class HttpInput<T> extends ServletInputStream
43 {
44 private final static Logger LOG = Log.getLogger(HttpInput.class);
45 private final ArrayQueue<T> _inputQ = new ArrayQueue<>();
46 private boolean _earlyEOF;
47 private boolean _inputEOF;
48
49 public Object lock()
50 {
51 return _inputQ.lock();
52 }
53
54 public void recycle()
55 {
56 synchronized (lock())
57 {
58 T item = _inputQ.peekUnsafe();
59 while (item != null)
60 {
61 _inputQ.pollUnsafe();
62 onContentConsumed(item);
63
64 item = _inputQ.peekUnsafe();
65 if (item == null)
66 onAllContentConsumed();
67 }
68 _inputEOF = false;
69 _earlyEOF = false;
70 }
71 }
72
73 @Override
74 public int read() throws IOException
75 {
76 byte[] bytes = new byte[1];
77 int read = read(bytes, 0, 1);
78 return read < 0 ? -1 : 0xff & bytes[0];
79 }
80
81 @Override
82 public int available()
83 {
84 synchronized (lock())
85 {
86 T item = _inputQ.peekUnsafe();
87 if (item == null)
88 return 0;
89 return remaining(item);
90 }
91 }
92
93 @Override
94 public int read(byte[] b, int off, int len) throws IOException
95 {
96 T item = null;
97 synchronized (lock())
98 {
99 while (item == null)
100 {
101 item = _inputQ.peekUnsafe();
102 while (item != null && remaining(item) == 0)
103 {
104 _inputQ.pollUnsafe();
105 onContentConsumed(item);
106 LOG.debug("{} consumed {}", this, item);
107 item = _inputQ.peekUnsafe();
108 }
109
110 if (item == null)
111 {
112 onAllContentConsumed();
113
114 if (isEarlyEOF())
115 throw new EofException();
116
117
118 if (isShutdown())
119 {
120 onEOF();
121 return -1;
122 }
123
124 blockForContent();
125 }
126 }
127 }
128 return get(item, b, off, len);
129 }
130 protected abstract int remaining(T item);
131
132 protected abstract int get(T item, byte[] buffer, int offset, int length);
133
134 protected abstract void onContentConsumed(T item);
135
136 protected void blockForContent() throws IOException
137 {
138 synchronized (lock())
139 {
140 while (_inputQ.isEmpty() && !isShutdown() && !isEarlyEOF())
141 {
142 try
143 {
144 LOG.debug("{} waiting for content", this);
145 lock().wait();
146 }
147 catch (InterruptedException e)
148 {
149 throw (IOException)new InterruptedIOException().initCause(e);
150 }
151 }
152 }
153 }
154
155 protected void onContentQueued(T item)
156 {
157 lock().notify();
158 }
159
160 protected void onAllContentConsumed()
161 {
162 }
163
164 protected void onEOF()
165 {
166 }
167
168 public boolean content(T item)
169 {
170 synchronized (lock())
171 {
172
173
174
175 _inputQ.add(item);
176 onContentQueued(item);
177 LOG.debug("{} queued {}", this, item);
178 }
179 return true;
180 }
181
182 public void earlyEOF()
183 {
184 synchronized (lock())
185 {
186 _earlyEOF = true;
187 lock().notify();
188 LOG.debug("{} early EOF", this);
189 }
190 }
191
192 public boolean isEarlyEOF()
193 {
194 synchronized (lock())
195 {
196 return _earlyEOF;
197 }
198 }
199
200 public void shutdown()
201 {
202 synchronized (lock())
203 {
204 _inputEOF = true;
205 lock().notify();
206 LOG.debug("{} shutdown", this);
207 }
208 }
209
210 public boolean isShutdown()
211 {
212 synchronized (lock())
213 {
214 return _inputEOF;
215 }
216 }
217
218 public void consumeAll()
219 {
220 synchronized (lock())
221 {
222 while (!isShutdown() && !isEarlyEOF())
223 {
224 T item = _inputQ.peekUnsafe();
225 while (item != null)
226 {
227 _inputQ.pollUnsafe();
228 onContentConsumed(item);
229
230 item = _inputQ.peekUnsafe();
231 if (item == null)
232 onAllContentConsumed();
233 }
234
235 try
236 {
237 blockForContent();
238 }
239 catch (IOException e)
240 {
241 throw new RuntimeIOException(e);
242 }
243 }
244 }
245 }
246 }