1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.net.URI;
24 import java.nio.ByteBuffer;
25 import java.util.Collections;
26 import java.util.Enumeration;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.TimeoutException;
31 import java.util.concurrent.atomic.AtomicMarkableReference;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.eclipse.jetty.client.api.Response;
35 import org.eclipse.jetty.client.api.Result;
36 import org.eclipse.jetty.http.HttpField;
37 import org.eclipse.jetty.http.HttpHeader;
38 import org.eclipse.jetty.http.HttpMethod;
39 import org.eclipse.jetty.http.HttpParser;
40 import org.eclipse.jetty.http.HttpVersion;
41 import org.eclipse.jetty.io.ByteBufferPool;
42 import org.eclipse.jetty.io.EndPoint;
43 import org.eclipse.jetty.io.EofException;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46
47 public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
48 {
49 private static final Logger LOG = Log.getLogger(HttpReceiver.class);
50
51 private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
52 private final HttpParser parser = new HttpParser(this);
53 private final HttpConnection connection;
54 private ContentDecoder decoder;
55
56 public HttpReceiver(HttpConnection connection)
57 {
58 this.connection = connection;
59 }
60
61 public void receive()
62 {
63 EndPoint endPoint = connection.getEndPoint();
64 HttpClient client = connection.getHttpClient();
65 ByteBufferPool bufferPool = client.getByteBufferPool();
66 ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
67 try
68 {
69 while (true)
70 {
71 int read = endPoint.fill(buffer);
72 LOG.debug("Read {} bytes from {}", read, connection);
73 if (read > 0)
74 {
75 parse(buffer);
76 }
77 else if (read == 0)
78 {
79 fillInterested();
80 break;
81 }
82 else
83 {
84 shutdown();
85 break;
86 }
87 }
88 }
89 catch (EofException x)
90 {
91 LOG.ignore(x);
92 failAndClose(x);
93 }
94 catch (Exception x)
95 {
96 LOG.debug(x);
97 failAndClose(x);
98 }
99 finally
100 {
101 bufferPool.release(buffer);
102 }
103 }
104
105 private void parse(ByteBuffer buffer)
106 {
107 while (buffer.hasRemaining())
108 parser.parseNext(buffer);
109 }
110
111 private void fillInterested()
112 {
113 State state = this.state.get();
114 if (state == State.IDLE || state == State.RECEIVE)
115 connection.fillInterested();
116 }
117
118 private void shutdown()
119 {
120
121 parser.shutdownInput();
122 State state = this.state.get();
123 if (state == State.IDLE || state == State.RECEIVE)
124 {
125 if (!fail(new EOFException()))
126 connection.close();
127 }
128 }
129
130 @Override
131 public boolean startResponse(HttpVersion version, int status, String reason)
132 {
133 if (updateState(State.IDLE, State.RECEIVE))
134 {
135 HttpExchange exchange = connection.getExchange();
136
137 if (exchange != null)
138 {
139 HttpConversation conversation = exchange.getConversation();
140 HttpResponse response = exchange.getResponse();
141
142 parser.setHeadResponse(exchange.getRequest().getMethod() == HttpMethod.HEAD);
143 response.version(version).status(status).reason(reason);
144
145
146 HttpClient client = connection.getHttpClient();
147 ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
148 Response.Listener handlerListener = null;
149 if (protocolHandler != null)
150 {
151 handlerListener = protocolHandler.getResponseListener();
152 LOG.debug("Found protocol handler {}", protocolHandler);
153 }
154 exchange.getConversation().updateResponseListeners(handlerListener);
155
156 LOG.debug("Receiving {}", response);
157 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
158 notifier.notifyBegin(conversation.getResponseListeners(), response);
159 }
160 }
161 return false;
162 }
163
164 @Override
165 public boolean parsedHeader(HttpField field)
166 {
167 if (updateState(State.RECEIVE, State.RECEIVE))
168 {
169 HttpExchange exchange = connection.getExchange();
170
171 if (exchange != null)
172 {
173 HttpConversation conversation = exchange.getConversation();
174 HttpResponse response = exchange.getResponse();
175 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
176 boolean process = notifier.notifyHeader(conversation.getResponseListeners(), response, field);
177 if (process)
178 {
179 response.getHeaders().add(field);
180 HttpHeader fieldHeader = field.getHeader();
181 if (fieldHeader != null)
182 {
183 switch (fieldHeader)
184 {
185 case SET_COOKIE:
186 case SET_COOKIE2:
187 {
188 storeCookie(exchange.getRequest().getURI(), field);
189 break;
190 }
191 default:
192 {
193 break;
194 }
195 }
196 }
197 }
198 }
199 }
200 return false;
201 }
202
203 private void storeCookie(URI uri, HttpField field)
204 {
205 try
206 {
207 Map<String, List<String>> header = new HashMap<>(1);
208 header.put(field.getHeader().asString(), Collections.singletonList(field.getValue()));
209 connection.getHttpClient().getCookieManager().put(uri, header);
210 }
211 catch (IOException x)
212 {
213 LOG.debug(x);
214 }
215 }
216
217 @Override
218 public boolean headerComplete()
219 {
220 if (updateState(State.RECEIVE, State.RECEIVE))
221 {
222 HttpExchange exchange = connection.getExchange();
223
224 if (exchange != null)
225 {
226 HttpConversation conversation = exchange.getConversation();
227 HttpResponse response = exchange.getResponse();
228 LOG.debug("Headers {}", response);
229 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
230 notifier.notifyHeaders(conversation.getResponseListeners(), response);
231
232 Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
233 if (contentEncodings != null)
234 {
235 for (ContentDecoder.Factory factory : connection.getHttpClient().getContentDecoderFactories())
236 {
237 while (contentEncodings.hasMoreElements())
238 {
239 if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
240 {
241 this.decoder = factory.newContentDecoder();
242 break;
243 }
244 }
245 }
246 }
247 }
248 }
249 return false;
250 }
251
252 @Override
253 public boolean content(ByteBuffer buffer)
254 {
255 if (updateState(State.RECEIVE, State.RECEIVE))
256 {
257 HttpExchange exchange = connection.getExchange();
258
259 if (exchange != null)
260 {
261 HttpConversation conversation = exchange.getConversation();
262 HttpResponse response = exchange.getResponse();
263 LOG.debug("Content {}: {} bytes", response, buffer.remaining());
264
265 ContentDecoder decoder = this.decoder;
266 if (decoder != null)
267 {
268 buffer = decoder.decode(buffer);
269 LOG.debug("{} {}: {} bytes", decoder, response, buffer.remaining());
270 }
271
272 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
273 notifier.notifyContent(conversation.getResponseListeners(), response, buffer);
274 }
275 }
276 return false;
277 }
278
279 @Override
280 public boolean messageComplete()
281 {
282 if (updateState(State.RECEIVE, State.RECEIVE))
283 success();
284 return true;
285 }
286
287 protected boolean success()
288 {
289 HttpExchange exchange = connection.getExchange();
290 if (exchange == null)
291 return false;
292
293 AtomicMarkableReference<Result> completion = exchange.responseComplete(null);
294 if (!completion.isMarked())
295 return false;
296
297 parser.reset();
298 decoder = null;
299
300 if (!updateState(State.RECEIVE, State.IDLE))
301 throw new IllegalStateException();
302
303 exchange.terminateResponse();
304
305 HttpResponse response = exchange.getResponse();
306 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
307 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
308 notifier.notifySuccess(listeners, response);
309 LOG.debug("Received {}", response);
310
311 Result result = completion.getReference();
312 if (result != null)
313 {
314 connection.complete(exchange, !result.isFailed());
315 notifier.notifyComplete(listeners, result);
316 }
317
318 return true;
319 }
320
321 protected boolean fail(Throwable failure)
322 {
323 HttpExchange exchange = connection.getExchange();
324
325
326
327
328 if (exchange == null)
329 return false;
330
331 AtomicMarkableReference<Result> completion = exchange.responseComplete(failure);
332 if (!completion.isMarked())
333 return false;
334
335 parser.close();
336 decoder = null;
337
338 while (true)
339 {
340 State current = state.get();
341 if (updateState(current, State.FAILURE))
342 break;
343 }
344
345 exchange.terminateResponse();
346
347 HttpResponse response = exchange.getResponse();
348 HttpConversation conversation = exchange.getConversation();
349 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
350 notifier.notifyFailure(conversation.getResponseListeners(), response, failure);
351 LOG.debug("Failed {} {}", response, failure);
352
353 Result result = completion.getReference();
354 if (result != null)
355 {
356 connection.complete(exchange, false);
357
358 notifier.notifyComplete(conversation.getResponseListeners(), result);
359 }
360
361 return true;
362 }
363
364 @Override
365 public boolean earlyEOF()
366 {
367 failAndClose(new EOFException());
368 return false;
369 }
370
371 private void failAndClose(Throwable failure)
372 {
373 fail(failure);
374 connection.close();
375 }
376
377 @Override
378 public void badMessage(int status, String reason)
379 {
380 HttpExchange exchange = connection.getExchange();
381 HttpResponse response = exchange.getResponse();
382 response.status(status).reason(reason);
383 failAndClose(new HttpResponseException("HTTP protocol violation: bad response", response));
384 }
385
386 public void idleTimeout()
387 {
388
389
390 fail(new TimeoutException());
391 }
392
393 public boolean abort(Throwable cause)
394 {
395 return fail(cause);
396 }
397
398 private boolean updateState(State from, State to)
399 {
400 boolean updated = state.compareAndSet(from, to);
401 if (!updated)
402 LOG.debug("State update failed: {} -> {}: {}", from, to, state.get());
403 return updated;
404 }
405
406 private enum State
407 {
408 IDLE, RECEIVE, FAILURE
409 }
410 }