1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.Enumeration;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.eclipse.jetty.client.api.Response;
33 import org.eclipse.jetty.client.api.Result;
34 import org.eclipse.jetty.http.HttpField;
35 import org.eclipse.jetty.http.HttpHeader;
36 import org.eclipse.jetty.http.HttpStatus;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.Callback;
39 import org.eclipse.jetty.util.CountingCallback;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public abstract class HttpReceiver
69 {
70 protected static final Logger LOG = Log.getLogger(HttpReceiver.class);
71
72 private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
73 private final HttpChannel channel;
74 private ContentDecoder decoder;
75 private Throwable failure;
76
77 protected HttpReceiver(HttpChannel channel)
78 {
79 this.channel = channel;
80 }
81
82 protected HttpChannel getHttpChannel()
83 {
84 return channel;
85 }
86
87 protected HttpExchange getHttpExchange()
88 {
89 return channel.getHttpExchange();
90 }
91
92 protected HttpDestination getHttpDestination()
93 {
94 return channel.getHttpDestination();
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108 protected boolean responseBegin(HttpExchange exchange)
109 {
110 if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
111 return false;
112
113 HttpConversation conversation = exchange.getConversation();
114 HttpResponse response = exchange.getResponse();
115
116 HttpDestination destination = getHttpDestination();
117 HttpClient client = destination.getHttpClient();
118 ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
119 Response.Listener handlerListener = null;
120 if (protocolHandler != null)
121 {
122 handlerListener = protocolHandler.getResponseListener();
123 if (LOG.isDebugEnabled())
124 LOG.debug("Found protocol handler {}", protocolHandler);
125 }
126 exchange.getConversation().updateResponseListeners(handlerListener);
127
128 if (LOG.isDebugEnabled())
129 LOG.debug("Response begin {}", response);
130 ResponseNotifier notifier = destination.getResponseNotifier();
131 notifier.notifyBegin(conversation.getResponseListeners(), response);
132
133 if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
134 return true;
135
136 terminateResponse(exchange);
137 return false;
138 }
139
140
141
142
143
144
145
146
147
148
149
150
151
152 protected boolean responseHeader(HttpExchange exchange, HttpField field)
153 {
154 out: while (true)
155 {
156 ResponseState current = responseState.get();
157 switch (current)
158 {
159 case BEGIN:
160 case HEADER:
161 {
162 if (updateResponseState(current, ResponseState.TRANSIENT))
163 break out;
164 break;
165 }
166 default:
167 {
168 return false;
169 }
170 }
171 }
172
173 HttpResponse response = exchange.getResponse();
174 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
175 boolean process = notifier.notifyHeader(exchange.getConversation().getResponseListeners(), response, field);
176 if (process)
177 {
178 response.getHeaders().add(field);
179 HttpHeader fieldHeader = field.getHeader();
180 if (fieldHeader != null)
181 {
182 switch (fieldHeader)
183 {
184 case SET_COOKIE:
185 case SET_COOKIE2:
186 {
187 URI uri = exchange.getRequest().getURI();
188 if (uri != null)
189 storeCookie(uri, field);
190 break;
191 }
192 default:
193 {
194 break;
195 }
196 }
197 }
198 }
199
200 if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
201 return true;
202
203 terminateResponse(exchange);
204 return false;
205 }
206
207 protected void storeCookie(URI uri, HttpField field)
208 {
209 try
210 {
211 String value = field.getValue();
212 if (value != null)
213 {
214 Map<String, List<String>> header = new HashMap<>(1);
215 header.put(field.getHeader().asString(), Collections.singletonList(value));
216 getHttpDestination().getHttpClient().getCookieManager().put(uri, header);
217 }
218 }
219 catch (IOException x)
220 {
221 if (LOG.isDebugEnabled())
222 LOG.debug(x);
223 }
224 }
225
226
227
228
229
230
231
232
233
234 protected boolean responseHeaders(HttpExchange exchange)
235 {
236 out: while (true)
237 {
238 ResponseState current = responseState.get();
239 switch (current)
240 {
241 case BEGIN:
242 case HEADER:
243 {
244 if (updateResponseState(current, ResponseState.TRANSIENT))
245 break out;
246 break;
247 }
248 default:
249 {
250 return false;
251 }
252 }
253 }
254
255 HttpResponse response = exchange.getResponse();
256 if (LOG.isDebugEnabled())
257 LOG.debug("Response headers {}{}{}", response, System.lineSeparator(), response.getHeaders().toString().trim());
258 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
259 notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
260
261 Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
262 if (contentEncodings != null)
263 {
264 for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
265 {
266 while (contentEncodings.hasMoreElements())
267 {
268 if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
269 {
270 this.decoder = factory.newContentDecoder();
271 break;
272 }
273 }
274 }
275 }
276
277 if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
278 return true;
279
280 terminateResponse(exchange);
281 return false;
282 }
283
284
285
286
287
288
289
290
291
292
293
294 protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, final Callback callback)
295 {
296 out: while (true)
297 {
298 ResponseState current = responseState.get();
299 switch (current)
300 {
301 case HEADERS:
302 case CONTENT:
303 {
304 if (updateResponseState(current, ResponseState.TRANSIENT))
305 break out;
306 break;
307 }
308 default:
309 {
310 return false;
311 }
312 }
313 }
314
315 HttpResponse response = exchange.getResponse();
316 if (LOG.isDebugEnabled())
317 LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
318
319 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
320 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
321
322 ContentDecoder decoder = this.decoder;
323 if (decoder == null)
324 {
325 notifier.notifyContent(listeners, response, buffer, callback);
326 }
327 else
328 {
329 try
330 {
331 List<ByteBuffer> decodeds = new ArrayList<>(2);
332 while (buffer.hasRemaining())
333 {
334 ByteBuffer decoded = decoder.decode(buffer);
335 if (!decoded.hasRemaining())
336 continue;
337 decodeds.add(decoded);
338 if (LOG.isDebugEnabled())
339 LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
340 }
341
342 if (decodeds.isEmpty())
343 {
344 callback.succeeded();
345 }
346 else
347 {
348 int size = decodeds.size();
349 CountingCallback counter = new CountingCallback(callback, size);
350 for (int i = 0; i < size; ++i)
351 notifier.notifyContent(listeners, response, decodeds.get(i), counter);
352 }
353 }
354 catch (Throwable x)
355 {
356 callback.failed(x);
357 }
358 }
359
360 if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
361 return true;
362
363 terminateResponse(exchange);
364 return false;
365 }
366
367
368
369
370
371
372
373
374
375
376 protected boolean responseSuccess(HttpExchange exchange)
377 {
378
379
380 if (!exchange.responseComplete(null))
381 return false;
382
383 responseState.set(ResponseState.IDLE);
384
385
386 reset();
387
388 HttpResponse response = exchange.getResponse();
389 if (LOG.isDebugEnabled())
390 LOG.debug("Response success {}", response);
391 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
392 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
393 notifier.notifySuccess(listeners, response);
394
395
396
397 if (exchange.getResponse().getStatus() == HttpStatus.CONTINUE_100)
398 return true;
399
400
401
402 Result result = exchange.terminateResponse();
403 terminateResponse(exchange, result);
404
405 return true;
406 }
407
408
409
410
411
412
413
414
415
416 protected boolean responseFailure(Throwable failure)
417 {
418 HttpExchange exchange = getHttpExchange();
419
420
421
422
423 if (exchange == null)
424 return false;
425
426
427
428 if (exchange.responseComplete(failure))
429 return abort(exchange, failure);
430
431 return false;
432 }
433
434 private void terminateResponse(HttpExchange exchange)
435 {
436 Result result = exchange.terminateResponse();
437 terminateResponse(exchange, result);
438 }
439
440 private void terminateResponse(HttpExchange exchange, Result result)
441 {
442 HttpResponse response = exchange.getResponse();
443
444 if (LOG.isDebugEnabled())
445 LOG.debug("Response complete {}", response);
446
447 if (result != null)
448 {
449 boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
450 if (!ordered)
451 channel.exchangeTerminated(exchange, result);
452 if (LOG.isDebugEnabled())
453 LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
454 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
455 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
456 notifier.notifyComplete(listeners, result);
457 if (ordered)
458 channel.exchangeTerminated(exchange, result);
459 }
460 }
461
462
463
464
465
466
467
468
469 protected void reset()
470 {
471 decoder = null;
472 }
473
474
475
476
477
478
479
480
481 protected void dispose()
482 {
483 decoder = null;
484 }
485
486 public boolean abort(HttpExchange exchange, Throwable failure)
487 {
488
489 boolean terminate;
490 out: while (true)
491 {
492 ResponseState current = responseState.get();
493 switch (current)
494 {
495 case FAILURE:
496 {
497 return false;
498 }
499 default:
500 {
501 if (updateResponseState(current, ResponseState.FAILURE))
502 {
503 terminate = current != ResponseState.TRANSIENT;
504 break out;
505 }
506 break;
507 }
508 }
509 }
510
511 this.failure = failure;
512
513 dispose();
514
515 HttpResponse response = exchange.getResponse();
516 if (LOG.isDebugEnabled())
517 LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
518 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
519 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
520 notifier.notifyFailure(listeners, response, failure);
521
522 if (terminate)
523 {
524
525
526 Result result = exchange.terminateResponse();
527 terminateResponse(exchange, result);
528 }
529 else
530 {
531 if (LOG.isDebugEnabled())
532 LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
533 }
534
535 return true;
536 }
537
538 private boolean updateResponseState(ResponseState from, ResponseState to)
539 {
540 boolean updated = responseState.compareAndSet(from, to);
541 if (!updated)
542 {
543 if (LOG.isDebugEnabled())
544 LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
545 }
546 return updated;
547 }
548
549 @Override
550 public String toString()
551 {
552 return String.format("%s@%x(rsp=%s,failure=%s)",
553 getClass().getSimpleName(),
554 hashCode(),
555 responseState,
556 failure);
557 }
558
559
560
561
562 private enum ResponseState
563 {
564
565
566
567 TRANSIENT,
568
569
570
571 IDLE,
572
573
574
575 BEGIN,
576
577
578
579 HEADER,
580
581
582
583 HEADERS,
584
585
586
587 CONTENT,
588
589
590
591 FAILURE
592 }
593 }