1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.Enumeration;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.eclipse.jetty.client.api.Response;
33 import org.eclipse.jetty.client.api.Result;
34 import org.eclipse.jetty.http.HttpField;
35 import org.eclipse.jetty.http.HttpHeader;
36 import org.eclipse.jetty.http.HttpStatus;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.Callback;
39 import org.eclipse.jetty.util.CountingCallback;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public abstract class HttpReceiver
69 {
70 protected static final Logger LOG = Log.getLogger(HttpReceiver.class);
71
72 private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
73 private final HttpChannel channel;
74 private ContentDecoder decoder;
75 private Throwable failure;
76
77 protected HttpReceiver(HttpChannel channel)
78 {
79 this.channel = channel;
80 }
81
82 protected HttpChannel getHttpChannel()
83 {
84 return channel;
85 }
86
87 protected HttpExchange getHttpExchange()
88 {
89 return channel.getHttpExchange();
90 }
91
92 protected HttpDestination getHttpDestination()
93 {
94 return channel.getHttpDestination();
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108 protected boolean responseBegin(HttpExchange exchange)
109 {
110 if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
111 return false;
112
113 HttpConversation conversation = exchange.getConversation();
114 HttpResponse response = exchange.getResponse();
115
116 HttpDestination destination = getHttpDestination();
117 HttpClient client = destination.getHttpClient();
118 ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
119 Response.Listener handlerListener = null;
120 if (protocolHandler != null)
121 {
122 handlerListener = protocolHandler.getResponseListener();
123 if (LOG.isDebugEnabled())
124 LOG.debug("Found protocol handler {}", protocolHandler);
125 }
126 exchange.getConversation().updateResponseListeners(handlerListener);
127
128 if (LOG.isDebugEnabled())
129 LOG.debug("Response begin {}", response);
130 ResponseNotifier notifier = destination.getResponseNotifier();
131 notifier.notifyBegin(conversation.getResponseListeners(), response);
132
133 if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
134 return true;
135
136 terminateResponse(exchange);
137 return false;
138 }
139
140
141
142
143
144
145
146
147
148
149
150
151
152 protected boolean responseHeader(HttpExchange exchange, HttpField field)
153 {
154 out: while (true)
155 {
156 ResponseState current = responseState.get();
157 switch (current)
158 {
159 case BEGIN:
160 case HEADER:
161 {
162 if (updateResponseState(current, ResponseState.TRANSIENT))
163 break out;
164 break;
165 }
166 default:
167 {
168 return false;
169 }
170 }
171 }
172
173 HttpResponse response = exchange.getResponse();
174 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
175 boolean process = notifier.notifyHeader(exchange.getConversation().getResponseListeners(), response, field);
176 if (process)
177 {
178 response.getHeaders().add(field);
179 HttpHeader fieldHeader = field.getHeader();
180 if (fieldHeader != null)
181 {
182 switch (fieldHeader)
183 {
184 case SET_COOKIE:
185 case SET_COOKIE2:
186 {
187 storeCookie(exchange.getRequest().getURI(), field);
188 break;
189 }
190 default:
191 {
192 break;
193 }
194 }
195 }
196 }
197
198 if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
199 return true;
200
201 terminateResponse(exchange);
202 return false;
203 }
204
205 protected void storeCookie(URI uri, HttpField field)
206 {
207 try
208 {
209 String value = field.getValue();
210 if (value != null)
211 {
212 Map<String, List<String>> header = new HashMap<>(1);
213 header.put(field.getHeader().asString(), Collections.singletonList(value));
214 getHttpDestination().getHttpClient().getCookieManager().put(uri, header);
215 }
216 }
217 catch (IOException x)
218 {
219 if (LOG.isDebugEnabled())
220 LOG.debug(x);
221 }
222 }
223
224
225
226
227
228
229
230
231
232 protected boolean responseHeaders(HttpExchange exchange)
233 {
234 out: while (true)
235 {
236 ResponseState current = responseState.get();
237 switch (current)
238 {
239 case BEGIN:
240 case HEADER:
241 {
242 if (updateResponseState(current, ResponseState.TRANSIENT))
243 break out;
244 break;
245 }
246 default:
247 {
248 return false;
249 }
250 }
251 }
252
253 HttpResponse response = exchange.getResponse();
254 if (LOG.isDebugEnabled())
255 LOG.debug("Response headers {}{}{}", response, System.lineSeparator(), response.getHeaders().toString().trim());
256 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
257 notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
258
259 Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
260 if (contentEncodings != null)
261 {
262 for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
263 {
264 while (contentEncodings.hasMoreElements())
265 {
266 if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
267 {
268 this.decoder = factory.newContentDecoder();
269 break;
270 }
271 }
272 }
273 }
274
275 if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
276 return true;
277
278 terminateResponse(exchange);
279 return false;
280 }
281
282
283
284
285
286
287
288
289
290
291
292 protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, final Callback callback)
293 {
294 out: while (true)
295 {
296 ResponseState current = responseState.get();
297 switch (current)
298 {
299 case HEADERS:
300 case CONTENT:
301 {
302 if (updateResponseState(current, ResponseState.TRANSIENT))
303 break out;
304 break;
305 }
306 default:
307 {
308 return false;
309 }
310 }
311 }
312
313 HttpResponse response = exchange.getResponse();
314 if (LOG.isDebugEnabled())
315 LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
316
317 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
318 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
319
320 ContentDecoder decoder = this.decoder;
321 if (decoder == null)
322 {
323 notifier.notifyContent(listeners, response, buffer, callback);
324 }
325 else
326 {
327 List<ByteBuffer> decodeds = new ArrayList<>(2);
328 while (buffer.hasRemaining())
329 {
330 ByteBuffer decoded = decoder.decode(buffer);
331 if (!decoded.hasRemaining())
332 continue;
333 decodeds.add(decoded);
334 if (LOG.isDebugEnabled())
335 LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
336 }
337
338 if (decodeds.isEmpty())
339 {
340 callback.succeeded();
341 }
342 else
343 {
344 int size = decodeds.size();
345 CountingCallback counter = new CountingCallback(callback, size);
346 for (int i = 0; i < size; ++i)
347 notifier.notifyContent(listeners, response, decodeds.get(i), counter);
348 }
349 }
350
351 if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
352 return true;
353
354 terminateResponse(exchange);
355 return false;
356 }
357
358
359
360
361
362
363
364
365
366
367 protected boolean responseSuccess(HttpExchange exchange)
368 {
369
370
371 if (!exchange.responseComplete(null))
372 return false;
373
374 responseState.set(ResponseState.IDLE);
375
376
377 reset();
378
379 HttpResponse response = exchange.getResponse();
380 if (LOG.isDebugEnabled())
381 LOG.debug("Response success {}", response);
382 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
383 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
384 notifier.notifySuccess(listeners, response);
385
386
387
388 if (exchange.getResponse().getStatus() == HttpStatus.CONTINUE_100)
389 return true;
390
391
392
393 Result result = exchange.terminateResponse();
394 terminateResponse(exchange, result);
395
396 return true;
397 }
398
399
400
401
402
403
404
405
406
407 protected boolean responseFailure(Throwable failure)
408 {
409 HttpExchange exchange = getHttpExchange();
410
411
412
413
414 if (exchange == null)
415 return false;
416
417
418
419 if (exchange.responseComplete(failure))
420 return abort(exchange, failure);
421
422 return false;
423 }
424
425 private void terminateResponse(HttpExchange exchange)
426 {
427 Result result = exchange.terminateResponse();
428 terminateResponse(exchange, result);
429 }
430
431 private void terminateResponse(HttpExchange exchange, Result result)
432 {
433 HttpResponse response = exchange.getResponse();
434
435 if (LOG.isDebugEnabled())
436 LOG.debug("Response complete {}", response);
437
438 if (result != null)
439 {
440 boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
441 if (!ordered)
442 channel.exchangeTerminated(exchange, result);
443 if (LOG.isDebugEnabled())
444 LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
445 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
446 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
447 notifier.notifyComplete(listeners, result);
448 if (ordered)
449 channel.exchangeTerminated(exchange, result);
450 }
451 }
452
453
454
455
456
457
458
459
460 protected void reset()
461 {
462 decoder = null;
463 }
464
465
466
467
468
469
470
471
472 protected void dispose()
473 {
474 decoder = null;
475 }
476
477 public boolean abort(HttpExchange exchange, Throwable failure)
478 {
479
480 boolean terminate;
481 out: while (true)
482 {
483 ResponseState current = responseState.get();
484 switch (current)
485 {
486 case FAILURE:
487 {
488 return false;
489 }
490 default:
491 {
492 if (updateResponseState(current, ResponseState.FAILURE))
493 {
494 terminate = current != ResponseState.TRANSIENT;
495 break out;
496 }
497 break;
498 }
499 }
500 }
501
502 this.failure = failure;
503
504 dispose();
505
506 HttpResponse response = exchange.getResponse();
507 if (LOG.isDebugEnabled())
508 LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
509 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
510 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
511 notifier.notifyFailure(listeners, response, failure);
512
513 if (terminate)
514 {
515
516
517 Result result = exchange.terminateResponse();
518 terminateResponse(exchange, result);
519 }
520 else
521 {
522 if (LOG.isDebugEnabled())
523 LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
524 }
525
526 return true;
527 }
528
529 private boolean updateResponseState(ResponseState from, ResponseState to)
530 {
531 boolean updated = responseState.compareAndSet(from, to);
532 if (!updated)
533 {
534 if (LOG.isDebugEnabled())
535 LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
536 }
537 return updated;
538 }
539
540 @Override
541 public String toString()
542 {
543 return String.format("%s@%x(rsp=%s,failure=%s)",
544 getClass().getSimpleName(),
545 hashCode(),
546 responseState,
547 failure);
548 }
549
550
551
552
553 private enum ResponseState
554 {
555
556
557
558 TRANSIENT,
559
560
561
562 IDLE,
563
564
565
566 BEGIN,
567
568
569
570 HEADER,
571
572
573
574 HEADERS,
575
576
577
578 CONTENT,
579
580
581
582 FAILURE
583 }
584 }