1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.URI;
24 import java.nio.ByteBuffer;
25 import java.util.Collections;
26 import java.util.Enumeration;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.TimeoutException;
31 import java.util.concurrent.atomic.AtomicMarkableReference;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.eclipse.jetty.client.api.Response;
35 import org.eclipse.jetty.client.api.Result;
36 import org.eclipse.jetty.http.HttpField;
37 import org.eclipse.jetty.http.HttpHeader;
38 import org.eclipse.jetty.http.HttpMethod;
39 import org.eclipse.jetty.http.HttpParser;
40 import org.eclipse.jetty.http.HttpVersion;
41 import org.eclipse.jetty.io.ByteBufferPool;
42 import org.eclipse.jetty.io.EndPoint;
43 import org.eclipse.jetty.io.EofException;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46
47 public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
48 {
49 private static final Logger LOG = Log.getLogger(HttpReceiver.class);
50
51 private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
52 private final HttpParser parser = new HttpParser(this);
53 private final HttpConnection connection;
54 private ContentDecoder decoder;
55
56 public HttpReceiver(HttpConnection connection)
57 {
58 this.connection = connection;
59 }
60
61 public void receive()
62 {
63 EndPoint endPoint = connection.getEndPoint();
64 HttpClient client = connection.getHttpClient();
65 ByteBufferPool bufferPool = client.getByteBufferPool();
66 ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
67 try
68 {
69 while (true)
70 {
71
72 if (connection.isClosed())
73 {
74 LOG.debug("{} closed", connection);
75 break;
76 }
77 else
78 {
79 int read = endPoint.fill(buffer);
80 LOG.debug("Read {} bytes from {}", read, connection);
81 if (read > 0)
82 {
83 parse(buffer);
84 }
85 else if (read == 0)
86 {
87 fillInterested();
88 break;
89 }
90 else
91 {
92 shutdown();
93 break;
94 }
95 }
96 }
97 }
98 catch (EofException x)
99 {
100 LOG.ignore(x);
101 failAndClose(x);
102 }
103 catch (Exception x)
104 {
105 LOG.debug(x);
106 failAndClose(x);
107 }
108 finally
109 {
110 bufferPool.release(buffer);
111 }
112 }
113
114 private void parse(ByteBuffer buffer)
115 {
116 while (buffer.hasRemaining())
117 parser.parseNext(buffer);
118 }
119
120 private void fillInterested()
121 {
122 State state = this.state.get();
123 if (state == State.IDLE || state == State.RECEIVE)
124 connection.fillInterested();
125 }
126
127 private void shutdown()
128 {
129
130 parser.shutdownInput();
131 State state = this.state.get();
132 if (state == State.IDLE || state == State.RECEIVE)
133 {
134 if (!fail(new EOFException()))
135 connection.close();
136 }
137 }
138
139
140 @Override
141 public int getHeaderCacheSize()
142 {
143
144 return 256;
145 }
146
147 @Override
148 public boolean startResponse(HttpVersion version, int status, String reason)
149 {
150 if (updateState(State.IDLE, State.RECEIVE))
151 {
152 HttpExchange exchange = connection.getExchange();
153
154 if (exchange != null)
155 {
156 HttpConversation conversation = exchange.getConversation();
157 HttpResponse response = exchange.getResponse();
158
159 String method = exchange.getRequest().method();
160 parser.setHeadResponse(HttpMethod.HEAD.is(method) || HttpMethod.CONNECT.is(method));
161 response.version(version).status(status).reason(reason);
162
163
164 HttpClient client = connection.getHttpClient();
165 ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
166 Response.Listener handlerListener = null;
167 if (protocolHandler != null)
168 {
169 handlerListener = protocolHandler.getResponseListener();
170 LOG.debug("Found protocol handler {}", protocolHandler);
171 }
172 exchange.getConversation().updateResponseListeners(handlerListener);
173
174 LOG.debug("Receiving {}", response);
175 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
176 notifier.notifyBegin(conversation.getResponseListeners(), response);
177 }
178 }
179 return false;
180 }
181
182 @Override
183 public boolean parsedHeader(HttpField field)
184 {
185 if (updateState(State.RECEIVE, State.RECEIVE))
186 {
187 HttpExchange exchange = connection.getExchange();
188
189 if (exchange != null)
190 {
191 HttpConversation conversation = exchange.getConversation();
192 HttpResponse response = exchange.getResponse();
193 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
194 boolean process = notifier.notifyHeader(conversation.getResponseListeners(), response, field);
195 if (process)
196 {
197 response.getHeaders().add(field);
198 HttpHeader fieldHeader = field.getHeader();
199 if (fieldHeader != null)
200 {
201 switch (fieldHeader)
202 {
203 case SET_COOKIE:
204 case SET_COOKIE2:
205 {
206 storeCookie(exchange.getRequest().getURI(), field);
207 break;
208 }
209 default:
210 {
211 break;
212 }
213 }
214 }
215 }
216 }
217 }
218 return false;
219 }
220
221 private void storeCookie(URI uri, HttpField field)
222 {
223 try
224 {
225 String value = field.getValue();
226 if (value != null)
227 {
228 Map<String, List<String>> header = new HashMap<>(1);
229 header.put(field.getHeader().asString(), Collections.singletonList(value));
230 connection.getHttpClient().getCookieManager().put(uri, header);
231 }
232 }
233 catch (IOException x)
234 {
235 LOG.debug(x);
236 }
237 }
238
239 @Override
240 public boolean headerComplete()
241 {
242 if (updateState(State.RECEIVE, State.RECEIVE))
243 {
244 HttpExchange exchange = connection.getExchange();
245
246 if (exchange != null)
247 {
248 HttpConversation conversation = exchange.getConversation();
249 HttpResponse response = exchange.getResponse();
250 LOG.debug("Headers {}", response);
251 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
252 notifier.notifyHeaders(conversation.getResponseListeners(), response);
253
254 Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
255 if (contentEncodings != null)
256 {
257 for (ContentDecoder.Factory factory : connection.getHttpClient().getContentDecoderFactories())
258 {
259 while (contentEncodings.hasMoreElements())
260 {
261 if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
262 {
263 this.decoder = factory.newContentDecoder();
264 break;
265 }
266 }
267 }
268 }
269 }
270 }
271 return false;
272 }
273
274 @Override
275 public boolean content(ByteBuffer buffer)
276 {
277 if (updateState(State.RECEIVE, State.RECEIVE))
278 {
279 HttpExchange exchange = connection.getExchange();
280
281 if (exchange != null)
282 {
283 HttpConversation conversation = exchange.getConversation();
284 HttpResponse response = exchange.getResponse();
285 LOG.debug("Content {}: {} bytes", response, buffer.remaining());
286
287 ContentDecoder decoder = this.decoder;
288 if (decoder != null)
289 {
290 buffer = decoder.decode(buffer);
291 LOG.debug("{} {}: {} bytes", decoder, response, buffer.remaining());
292 }
293
294 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
295 notifier.notifyContent(conversation.getResponseListeners(), response, buffer);
296 }
297 }
298 return false;
299 }
300
301 @Override
302 public boolean messageComplete()
303 {
304 if (updateState(State.RECEIVE, State.RECEIVE))
305 success();
306 return true;
307 }
308
309 protected boolean success()
310 {
311 HttpExchange exchange = connection.getExchange();
312 if (exchange == null)
313 return false;
314
315 AtomicMarkableReference<Result> completion = exchange.responseComplete(null);
316 if (!completion.isMarked())
317 return false;
318
319 parser.reset();
320 decoder = null;
321
322 if (!updateState(State.RECEIVE, State.IDLE))
323 throw new IllegalStateException();
324
325 exchange.terminateResponse();
326
327 HttpResponse response = exchange.getResponse();
328 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
329 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
330 notifier.notifySuccess(listeners, response);
331 LOG.debug("Received {}", response);
332
333 Result result = completion.getReference();
334 if (result != null)
335 {
336 connection.complete(exchange, !result.isFailed());
337 notifier.notifyComplete(listeners, result);
338 }
339
340 return true;
341 }
342
343 protected boolean fail(Throwable failure)
344 {
345 HttpExchange exchange = connection.getExchange();
346
347
348
349
350 if (exchange == null)
351 return false;
352
353 AtomicMarkableReference<Result> completion = exchange.responseComplete(failure);
354 if (!completion.isMarked())
355 return false;
356
357 parser.close();
358 decoder = null;
359
360 while (true)
361 {
362 State current = state.get();
363 if (updateState(current, State.FAILURE))
364 break;
365 }
366
367 exchange.terminateResponse();
368
369 HttpResponse response = exchange.getResponse();
370 HttpConversation conversation = exchange.getConversation();
371 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
372 notifier.notifyFailure(conversation.getResponseListeners(), response, failure);
373 LOG.debug("Failed {} {}", response, failure);
374
375 Result result = completion.getReference();
376 if (result != null)
377 {
378 connection.complete(exchange, false);
379
380 notifier.notifyComplete(conversation.getResponseListeners(), result);
381 }
382
383 return true;
384 }
385
386 @Override
387 public void earlyEOF()
388 {
389 failAndClose(new EOFException());
390 }
391
392 private void failAndClose(Throwable failure)
393 {
394 fail(failure);
395 connection.close();
396 }
397
398 @Override
399 public void badMessage(int status, String reason)
400 {
401 HttpExchange exchange = connection.getExchange();
402 HttpResponse response = exchange.getResponse();
403 response.status(status).reason(reason);
404 failAndClose(new HttpResponseException("HTTP protocol violation: bad response", response));
405 }
406
407 public void idleTimeout()
408 {
409
410
411 fail(new TimeoutException());
412 }
413
414 public boolean abort(Throwable cause)
415 {
416 return fail(cause);
417 }
418
419 private boolean updateState(State from, State to)
420 {
421 boolean updated = state.compareAndSet(from, to);
422 if (!updated)
423 LOG.debug("State update failed: {} -> {}: {}", from, to, state.get());
424 return updated;
425 }
426
427 @Override
428 public String toString()
429 {
430 return String.format("%s@%x on %s", getClass().getSimpleName(), hashCode(), connection);
431 }
432
433 private enum State
434 {
435 IDLE, RECEIVE, FAILURE
436 }
437 }