1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.http;
20
21 import java.io.EOFException;
22 import java.nio.ByteBuffer;
23
24 import org.eclipse.jetty.client.HttpClient;
25 import org.eclipse.jetty.client.HttpExchange;
26 import org.eclipse.jetty.client.HttpReceiver;
27 import org.eclipse.jetty.client.HttpResponse;
28 import org.eclipse.jetty.client.HttpResponseException;
29 import org.eclipse.jetty.http.HttpField;
30 import org.eclipse.jetty.http.HttpMethod;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.io.ByteBufferPool;
35 import org.eclipse.jetty.io.EndPoint;
36 import org.eclipse.jetty.util.BufferUtil;
37 import org.eclipse.jetty.util.CompletableCallback;
38
39 public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler
40 {
41 private final HttpParser parser = new HttpParser(this);
42 private ByteBuffer buffer;
43 private boolean shutdown;
44
45 public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
46 {
47 super(channel);
48 }
49
50 @Override
51 public HttpChannelOverHTTP getHttpChannel()
52 {
53 return (HttpChannelOverHTTP)super.getHttpChannel();
54 }
55
56 private HttpConnectionOverHTTP getHttpConnection()
57 {
58 return getHttpChannel().getHttpConnection();
59 }
60
61 protected ByteBuffer getResponseBuffer()
62 {
63 return buffer;
64 }
65
66 public void receive()
67 {
68 if (buffer == null)
69 acquireBuffer();
70 process();
71 }
72
73 private void acquireBuffer()
74 {
75 HttpClient client = getHttpDestination().getHttpClient();
76 ByteBufferPool bufferPool = client.getByteBufferPool();
77 buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
78 }
79
80 private void releaseBuffer()
81 {
82 if (buffer == null)
83 throw new IllegalStateException();
84 if (BufferUtil.hasContent(buffer))
85 throw new IllegalStateException();
86 HttpClient client = getHttpDestination().getHttpClient();
87 ByteBufferPool bufferPool = client.getByteBufferPool();
88 bufferPool.release(buffer);
89 buffer = null;
90 }
91
92 private void process()
93 {
94 try
95 {
96 HttpConnectionOverHTTP connection = getHttpConnection();
97 EndPoint endPoint = connection.getEndPoint();
98 while (true)
99 {
100 boolean upgraded = connection != endPoint.getConnection();
101
102
103 if (connection.isClosed() || upgraded)
104 {
105 if (LOG.isDebugEnabled())
106 LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed");
107 releaseBuffer();
108 return;
109 }
110
111 if (parse())
112 return;
113
114 int read = endPoint.fill(buffer);
115 if (LOG.isDebugEnabled())
116 LOG.debug("Read {} bytes {} from {}", read, BufferUtil.toDetailString(buffer), endPoint);
117
118 if (read > 0)
119 {
120 if (parse())
121 return;
122 }
123 else if (read == 0)
124 {
125 releaseBuffer();
126 fillInterested();
127 return;
128 }
129 else
130 {
131 releaseBuffer();
132 shutdown();
133 return;
134 }
135 }
136 }
137 catch (Throwable x)
138 {
139 if (LOG.isDebugEnabled())
140 LOG.debug(x);
141 BufferUtil.clear(buffer);
142 if (buffer != null)
143 releaseBuffer();
144 failAndClose(x);
145 }
146 }
147
148
149
150
151
152
153 private boolean parse()
154 {
155 while (true)
156 {
157
158
159 boolean handle = parser.parseNext(buffer);
160 if (LOG.isDebugEnabled())
161 LOG.debug("Parsed {}, remaining {} {}", handle, buffer.remaining(), parser);
162 if (handle || !buffer.hasRemaining())
163 return handle;
164 }
165 }
166
167 protected void fillInterested()
168 {
169 getHttpConnection().fillInterested();
170 }
171
172 private void shutdown()
173 {
174
175
176
177
178 shutdown = true;
179
180
181
182
183
184 parser.atEOF();
185 parser.parseNext(BufferUtil.EMPTY_BUFFER);
186 }
187
188 protected boolean isShutdown()
189 {
190 return shutdown;
191 }
192
193 @Override
194 public int getHeaderCacheSize()
195 {
196
197 return 256;
198 }
199
200 @Override
201 public boolean startResponse(HttpVersion version, int status, String reason)
202 {
203 HttpExchange exchange = getHttpExchange();
204 if (exchange == null)
205 return false;
206
207 String method = exchange.getRequest().getMethod();
208 parser.setHeadResponse(HttpMethod.HEAD.is(method) || HttpMethod.CONNECT.is(method));
209 exchange.getResponse().version(version).status(status).reason(reason);
210
211 return !responseBegin(exchange);
212 }
213
214 @Override
215 public void parsedHeader(HttpField field)
216 {
217 HttpExchange exchange = getHttpExchange();
218 if (exchange == null)
219 return;
220
221 responseHeader(exchange, field);
222 }
223
224 @Override
225 public boolean headerComplete()
226 {
227 HttpExchange exchange = getHttpExchange();
228 if (exchange == null)
229 return false;
230
231 return !responseHeaders(exchange);
232 }
233
234 @Override
235 public boolean content(ByteBuffer buffer)
236 {
237 HttpExchange exchange = getHttpExchange();
238 if (exchange == null)
239 return false;
240
241 CompletableCallback callback = new CompletableCallback()
242 {
243 @Override
244 public void resume()
245 {
246 if (LOG.isDebugEnabled())
247 LOG.debug("Content consumed asynchronously, resuming processing");
248 process();
249 }
250
251 public void abort(Throwable x)
252 {
253 failAndClose(x);
254 }
255 };
256
257 boolean proceed = responseContent(exchange, buffer, callback);
258 boolean async = callback.tryComplete();
259 return !proceed || async;
260 }
261
262 @Override
263 public boolean messageComplete()
264 {
265 HttpExchange exchange = getHttpExchange();
266 if (exchange == null)
267 return false;
268
269 boolean proceed = responseSuccess(exchange);
270 if (!proceed)
271 return true;
272
273 int status = exchange.getResponse().getStatus();
274 if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
275 return true;
276
277 if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) &&
278 status == HttpStatus.OK_200)
279 return true;
280
281 return false;
282 }
283
284 @Override
285 public void earlyEOF()
286 {
287 HttpExchange exchange = getHttpExchange();
288 HttpConnectionOverHTTP connection = getHttpConnection();
289 if (exchange == null)
290 connection.close();
291 else
292 failAndClose(new EOFException(String.valueOf(connection)));
293 }
294
295 @Override
296 public void badMessage(int status, String reason)
297 {
298 HttpExchange exchange = getHttpExchange();
299 if (exchange != null)
300 {
301 HttpResponse response = exchange.getResponse();
302 response.status(status).reason(reason);
303 failAndClose(new HttpResponseException("HTTP protocol violation: bad response on " + getHttpConnection(), response));
304 }
305 }
306
307 @Override
308 protected void reset()
309 {
310 super.reset();
311 parser.reset();
312 }
313
314 @Override
315 protected void dispose()
316 {
317 super.dispose();
318 parser.close();
319 }
320
321 private void failAndClose(Throwable failure)
322 {
323 if (responseFailure(failure))
324 getHttpConnection().close(failure);
325 }
326
327 @Override
328 public String toString()
329 {
330 return String.format("%s[%s]", super.toString(), parser);
331 }
332 }