1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.Enumeration;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.eclipse.jetty.client.api.Response;
33 import org.eclipse.jetty.client.api.Result;
34 import org.eclipse.jetty.http.HttpField;
35 import org.eclipse.jetty.http.HttpHeader;
36 import org.eclipse.jetty.util.BufferUtil;
37 import org.eclipse.jetty.util.Callback;
38 import org.eclipse.jetty.util.log.Log;
39 import org.eclipse.jetty.util.log.Logger;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public abstract class HttpReceiver
67 {
68 protected static final Logger LOG = Log.getLogger(HttpReceiver.class);
69
70 private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
71 private final HttpChannel channel;
72 private ContentDecoder decoder;
73 private Throwable failure;
74
75 protected HttpReceiver(HttpChannel channel)
76 {
77 this.channel = channel;
78 }
79
80 protected HttpChannel getHttpChannel()
81 {
82 return channel;
83 }
84
85 protected HttpExchange getHttpExchange()
86 {
87 return channel.getHttpExchange();
88 }
89
90 protected HttpDestination getHttpDestination()
91 {
92 return channel.getHttpDestination();
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106 protected boolean responseBegin(HttpExchange exchange)
107 {
108 if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
109 return false;
110
111 HttpConversation conversation = exchange.getConversation();
112 HttpResponse response = exchange.getResponse();
113
114 HttpDestination destination = getHttpDestination();
115 HttpClient client = destination.getHttpClient();
116 ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
117 Response.Listener handlerListener = null;
118 if (protocolHandler != null)
119 {
120 handlerListener = protocolHandler.getResponseListener();
121 if (LOG.isDebugEnabled())
122 LOG.debug("Found protocol handler {}", protocolHandler);
123 }
124 exchange.getConversation().updateResponseListeners(handlerListener);
125
126 if (LOG.isDebugEnabled())
127 LOG.debug("Response begin {}", response);
128 ResponseNotifier notifier = destination.getResponseNotifier();
129 notifier.notifyBegin(conversation.getResponseListeners(), response);
130
131 if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
132 terminateResponse(exchange, failure);
133
134 return true;
135 }
136
137
138
139
140
141
142
143
144
145
146
147
148
149 protected boolean responseHeader(HttpExchange exchange, HttpField field)
150 {
151 out: while (true)
152 {
153 ResponseState current = responseState.get();
154 switch (current)
155 {
156 case BEGIN:
157 case HEADER:
158 {
159 if (updateResponseState(current, ResponseState.TRANSIENT))
160 break out;
161 break;
162 }
163 default:
164 {
165 return false;
166 }
167 }
168 }
169
170 HttpResponse response = exchange.getResponse();
171 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
172 boolean process = notifier.notifyHeader(exchange.getConversation().getResponseListeners(), response, field);
173 if (process)
174 {
175 response.getHeaders().add(field);
176 HttpHeader fieldHeader = field.getHeader();
177 if (fieldHeader != null)
178 {
179 switch (fieldHeader)
180 {
181 case SET_COOKIE:
182 case SET_COOKIE2:
183 {
184 storeCookie(exchange.getRequest().getURI(), field);
185 break;
186 }
187 default:
188 {
189 break;
190 }
191 }
192 }
193 }
194
195 if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
196 terminateResponse(exchange, failure);
197
198 return true;
199 }
200
201 protected void storeCookie(URI uri, HttpField field)
202 {
203 try
204 {
205 String value = field.getValue();
206 if (value != null)
207 {
208 Map<String, List<String>> header = new HashMap<>(1);
209 header.put(field.getHeader().asString(), Collections.singletonList(value));
210 getHttpDestination().getHttpClient().getCookieManager().put(uri, header);
211 }
212 }
213 catch (IOException x)
214 {
215 if (LOG.isDebugEnabled())
216 LOG.debug(x);
217 }
218 }
219
220
221
222
223
224
225
226
227
228 protected boolean responseHeaders(HttpExchange exchange)
229 {
230 out: while (true)
231 {
232 ResponseState current = responseState.get();
233 switch (current)
234 {
235 case BEGIN:
236 case HEADER:
237 {
238 if (updateResponseState(current, ResponseState.TRANSIENT))
239 break out;
240 break;
241 }
242 default:
243 {
244 return false;
245 }
246 }
247 }
248
249 HttpResponse response = exchange.getResponse();
250 if (LOG.isDebugEnabled())
251 LOG.debug("Response headers {}{}{}", response, System.lineSeparator(), response.getHeaders().toString().trim());
252 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
253 notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
254
255 Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
256 if (contentEncodings != null)
257 {
258 for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
259 {
260 while (contentEncodings.hasMoreElements())
261 {
262 if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
263 {
264 this.decoder = factory.newContentDecoder();
265 break;
266 }
267 }
268 }
269 }
270
271 if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
272 terminateResponse(exchange, failure);
273
274 return true;
275 }
276
277
278
279
280
281
282
283
284
285
286 protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, final Callback callback)
287 {
288 out: while (true)
289 {
290 ResponseState current = responseState.get();
291 switch (current)
292 {
293 case HEADERS:
294 case CONTENT:
295 {
296 if (updateResponseState(current, ResponseState.TRANSIENT))
297 break out;
298 break;
299 }
300 default:
301 {
302 return false;
303 }
304 }
305 }
306
307 HttpResponse response = exchange.getResponse();
308 if (LOG.isDebugEnabled())
309 LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
310
311 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
312 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
313
314 ContentDecoder decoder = this.decoder;
315 if (decoder == null)
316 {
317 notifier.notifyContent(listeners, response, buffer, callback);
318 }
319 else
320 {
321 List<ByteBuffer> decodeds = new ArrayList<>(2);
322 while (buffer.hasRemaining())
323 {
324 ByteBuffer decoded = decoder.decode(buffer);
325 if (!decoded.hasRemaining())
326 continue;
327 decodeds.add(decoded);
328 if (LOG.isDebugEnabled())
329 LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
330 }
331
332 if (decodeds.isEmpty())
333 {
334 callback.succeeded();
335 }
336 else
337 {
338 Callback partial = new Callback.Adapter()
339 {
340 @Override
341 public void failed(Throwable x)
342 {
343 callback.failed(x);
344 }
345 };
346
347 for (int i = 1, size = decodeds.size(); i <= size; ++i)
348 notifier.notifyContent(listeners, response, decodeds.get(i - 1), i < size ? partial : callback);
349 }
350 }
351
352 if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
353 terminateResponse(exchange, failure);
354
355 return true;
356 }
357
358
359
360
361
362
363
364
365
366
367 protected boolean responseSuccess(HttpExchange exchange)
368 {
369
370
371 boolean completed = exchange.responseComplete();
372 if (!completed)
373 return false;
374
375 responseState.set(ResponseState.IDLE);
376
377
378 reset();
379
380
381
382 Result result = exchange.terminateResponse(null);
383
384
385
386 HttpResponse response = exchange.getResponse();
387 if (LOG.isDebugEnabled())
388 LOG.debug("Response success {}", response);
389 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
390 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
391 notifier.notifySuccess(listeners, response);
392
393 terminateResponse(exchange, result);
394
395 return true;
396 }
397
398
399
400
401
402
403
404
405
406 protected boolean responseFailure(Throwable failure)
407 {
408 HttpExchange exchange = getHttpExchange();
409
410
411
412
413 if (exchange == null)
414 return false;
415
416
417
418 boolean completed = exchange.responseComplete();
419 if (!completed)
420 return false;
421
422 this.failure = failure;
423
424
425 boolean fail;
426 while (true)
427 {
428 ResponseState current = responseState.get();
429 if (updateResponseState(current, ResponseState.FAILURE))
430 {
431 fail = current != ResponseState.TRANSIENT;
432 break;
433 }
434 }
435
436 dispose();
437
438
439
440 Result result = exchange.terminateResponse(failure);
441
442 HttpResponse response = exchange.getResponse();
443 if (LOG.isDebugEnabled())
444 LOG.debug("Response failure {} {}", response, failure);
445 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
446 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
447 notifier.notifyFailure(listeners, response, failure);
448
449 if (fail)
450 {
451 terminateResponse(exchange, result);
452 }
453 else
454 {
455 if (LOG.isDebugEnabled())
456 LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
457 }
458
459 return true;
460 }
461
462 private void terminateResponse(HttpExchange exchange, Throwable failure)
463 {
464 Result result = exchange.terminateResponse(failure);
465 terminateResponse(exchange, result);
466 }
467
468 private void terminateResponse(HttpExchange exchange, Result result)
469 {
470 HttpResponse response = exchange.getResponse();
471
472 if (LOG.isDebugEnabled())
473 LOG.debug("Response complete {}", response);
474
475 if (result != null)
476 {
477 boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
478 if (!ordered)
479 channel.exchangeTerminated(result);
480 if (LOG.isDebugEnabled())
481 LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", response);
482 List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
483 ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
484 notifier.notifyComplete(listeners, result);
485 if (ordered)
486 channel.exchangeTerminated(result);
487 }
488 }
489
490
491
492
493
494
495
496
497 protected void reset()
498 {
499 decoder = null;
500 }
501
502
503
504
505
506
507
508
509 protected void dispose()
510 {
511 decoder = null;
512 }
513
514 public boolean abort(Throwable cause)
515 {
516 return responseFailure(cause);
517 }
518
519 private boolean updateResponseState(ResponseState from, ResponseState to)
520 {
521 boolean updated = responseState.compareAndSet(from, to);
522 if (!updated)
523 {
524 if (LOG.isDebugEnabled())
525 LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
526 }
527 return updated;
528 }
529
530
531
532
533 private enum ResponseState
534 {
535
536
537
538 TRANSIENT,
539
540
541
542 IDLE,
543
544
545
546 BEGIN,
547
548
549
550 HEADER,
551
552
553
554 HEADERS,
555
556
557
558 CONTENT,
559
560
561
562 FAILURE
563 }
564 }