1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.nio.ByteBuffer;
22 import java.util.concurrent.atomic.AtomicReference;
23
24 import org.eclipse.jetty.client.api.ContentProvider;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.client.api.Result;
27 import org.eclipse.jetty.http.HttpHeader;
28 import org.eclipse.jetty.http.HttpHeaderValue;
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.Callback;
31 import org.eclipse.jetty.util.IteratingCallback;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public abstract class HttpSender implements AsyncContentProvider.Listener
59 {
60 protected static final Logger LOG = Log.getLogger(HttpSender.class);
61
62 private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
63 private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
64 private final Callback commitCallback = new CommitCallback();
65 private final Callback contentCallback = new ContentCallback();
66 private final Callback lastCallback = new LastContentCallback();
67 private final HttpChannel channel;
68 private volatile HttpContent content;
69
70 protected HttpSender(HttpChannel channel)
71 {
72 this.channel = channel;
73 }
74
75 protected HttpChannel getHttpChannel()
76 {
77 return channel;
78 }
79
80 protected HttpExchange getHttpExchange()
81 {
82 return channel.getHttpExchange();
83 }
84
85 @Override
86 public void onContent()
87 {
88 HttpExchange exchange = getHttpExchange();
89 if (exchange == null)
90 return;
91
92 while (true)
93 {
94 SenderState current = senderState.get();
95 switch (current)
96 {
97 case IDLE:
98 {
99 if (updateSenderState(current, SenderState.SENDING))
100 {
101 LOG.debug("Deferred content available, idle -> sending");
102 HttpContent content = this.content;
103 content.advance();
104 sendContent(exchange, content, contentCallback);
105 return;
106 }
107 break;
108 }
109 case SENDING:
110 {
111 if (updateSenderState(current, SenderState.SCHEDULED))
112 {
113 LOG.debug("Deferred content available, sending -> scheduled");
114 return;
115 }
116 break;
117 }
118 case EXPECTING:
119 {
120 if (updateSenderState(current, SenderState.SCHEDULED))
121 {
122 LOG.debug("Deferred content available, expecting -> scheduled");
123 return;
124 }
125 break;
126 }
127 case WAITING:
128 {
129 LOG.debug("Deferred content available, waiting");
130 return;
131 }
132 case SCHEDULED:
133 {
134 LOG.debug("Deferred content available, scheduled");
135 return;
136 }
137 default:
138 {
139 throw new IllegalStateException();
140 }
141 }
142 }
143 }
144
145 public void send(HttpExchange exchange)
146 {
147 Request request = exchange.getRequest();
148 Throwable cause = request.getAbortCause();
149 if (cause != null)
150 {
151 exchange.abort(cause);
152 }
153 else
154 {
155 if (!queuedToBegin(request))
156 throw new IllegalStateException();
157
158 if (!updateSenderState(SenderState.IDLE, expects100Continue(request) ? SenderState.EXPECTING : SenderState.SENDING))
159 throw new IllegalStateException();
160
161 ContentProvider contentProvider = request.getContent();
162 HttpContent content = this.content = new HttpContent(contentProvider);
163
164
165
166 if (contentProvider instanceof AsyncContentProvider)
167 ((AsyncContentProvider)contentProvider).setListener(this);
168
169 if (!beginToHeaders(request))
170 return;
171
172 sendHeaders(exchange, content, commitCallback);
173 }
174 }
175
176 protected boolean expects100Continue(Request request)
177 {
178 return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
179 }
180
181 protected boolean queuedToBegin(Request request)
182 {
183 if (!updateRequestState(RequestState.QUEUED, RequestState.BEGIN))
184 return false;
185 LOG.debug("Request begin {}", request);
186 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
187 notifier.notifyBegin(request);
188 return true;
189 }
190
191 protected boolean beginToHeaders(Request request)
192 {
193 if (!updateRequestState(RequestState.BEGIN, RequestState.HEADERS))
194 return false;
195 if (LOG.isDebugEnabled())
196 LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim());
197 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
198 notifier.notifyHeaders(request);
199 return true;
200 }
201
202 protected boolean headersToCommit(Request request)
203 {
204 if (!updateRequestState(RequestState.HEADERS, RequestState.COMMIT))
205 return false;
206 LOG.debug("Request committed {}", request);
207 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
208 notifier.notifyCommit(request);
209 return true;
210 }
211
212 protected boolean someToContent(Request request, ByteBuffer content)
213 {
214 RequestState current = requestState.get();
215 switch (current)
216 {
217 case COMMIT:
218 case CONTENT:
219 {
220 if (!updateRequestState(current, RequestState.CONTENT))
221 return false;
222 if (LOG.isDebugEnabled())
223 LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content));
224 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
225 notifier.notifyContent(request, content);
226 return true;
227 }
228 case FAILURE:
229 {
230 return false;
231 }
232 default:
233 {
234 throw new IllegalStateException();
235 }
236 }
237 }
238
239 protected boolean someToSuccess(HttpExchange exchange)
240 {
241 RequestState current = requestState.get();
242 switch (current)
243 {
244 case COMMIT:
245 case CONTENT:
246 {
247
248
249 boolean completed = exchange.requestComplete();
250 if (!completed)
251 return false;
252
253
254 reset();
255
256
257
258 Result result = exchange.terminateRequest(null);
259
260
261
262 Request request = exchange.getRequest();
263 LOG.debug("Request success {}", request);
264 HttpDestination destination = getHttpChannel().getHttpDestination();
265 destination.getRequestNotifier().notifySuccess(exchange.getRequest());
266
267 if (result != null)
268 {
269 boolean ordered = destination.getHttpClient().isStrictEventOrdering();
270 if (!ordered)
271 channel.exchangeTerminated(result);
272 LOG.debug("Request/Response succeded {}", request);
273 HttpConversation conversation = exchange.getConversation();
274 destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
275 if (ordered)
276 channel.exchangeTerminated(result);
277 }
278
279 return true;
280 }
281 case FAILURE:
282 {
283 return false;
284 }
285 default:
286 {
287 throw new IllegalStateException();
288 }
289 }
290 }
291
292 protected boolean anyToFailure(Throwable failure)
293 {
294 HttpExchange exchange = getHttpExchange();
295 if (exchange == null)
296 return false;
297
298
299
300 boolean completed = exchange.requestComplete();
301 if (!completed)
302 return false;
303
304
305 RequestState requestState = dispose();
306
307
308
309 Result result = exchange.terminateRequest(failure);
310
311 Request request = exchange.getRequest();
312 LOG.debug("Request failure {} {}", exchange, failure);
313 HttpDestination destination = getHttpChannel().getHttpDestination();
314 destination.getRequestNotifier().notifyFailure(request, failure);
315
316 boolean notCommitted = isBeforeCommit(requestState);
317 if (result == null && notCommitted && request.getAbortCause() == null)
318 {
319
320 if (exchange.responseComplete())
321 {
322 result = exchange.terminateResponse(failure);
323 LOG.debug("Failed response from request {}", exchange);
324 }
325 }
326
327 if (result != null)
328 {
329 boolean ordered = destination.getHttpClient().isStrictEventOrdering();
330 if (!ordered)
331 channel.exchangeTerminated(result);
332 LOG.debug("Request/Response failed {}", request);
333 HttpConversation conversation = exchange.getConversation();
334 destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
335 if (ordered)
336 channel.exchangeTerminated(result);
337 }
338
339 return true;
340 }
341
342
343
344
345
346
347
348
349
350
351
352
353 protected abstract void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback);
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371 protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
372
373 protected void reset()
374 {
375 content = null;
376 requestState.set(RequestState.QUEUED);
377 senderState.set(SenderState.IDLE);
378 }
379
380 protected RequestState dispose()
381 {
382 while (true)
383 {
384 RequestState current = requestState.get();
385 if (updateRequestState(current, RequestState.FAILURE))
386 return current;
387 }
388 }
389
390 public void proceed(HttpExchange exchange, boolean proceed)
391 {
392 if (!expects100Continue(exchange.getRequest()))
393 return;
394
395 if (proceed)
396 {
397 while (true)
398 {
399 SenderState current = senderState.get();
400 switch (current)
401 {
402 case EXPECTING:
403 {
404
405
406 if (!updateSenderState(current, SenderState.SENDING))
407 break;
408 LOG.debug("Proceed while expecting");
409 return;
410 }
411 case WAITING:
412 {
413
414
415
416 if (!updateSenderState(current, SenderState.SENDING))
417 break;
418 HttpContent content = this.content;
419 if (content.advance())
420 {
421
422 LOG.debug("Proceed while waiting");
423 sendContent(exchange, content, contentCallback);
424 }
425 else
426 {
427
428
429 if (!updateSenderState(SenderState.SENDING, SenderState.IDLE))
430 break;
431 LOG.debug("Proceed deferred");
432 }
433 return;
434 }
435 case SCHEDULED:
436 {
437
438 if (!updateSenderState(current, SenderState.WAITING))
439 throw new IllegalStateException();
440 LOG.debug("Proceed while scheduled");
441 break;
442 }
443 default:
444 {
445 throw new IllegalStateException();
446 }
447 }
448 }
449 }
450 else
451 {
452 anyToFailure(new HttpRequestException("Expectation failed", exchange.getRequest()));
453 }
454 }
455
456 public boolean abort(Throwable failure)
457 {
458 RequestState current = requestState.get();
459 boolean abortable = isBeforeCommit(current) ||
460 isSending(current) && !content.isLast();
461 return abortable && anyToFailure(failure);
462 }
463
464 protected boolean updateRequestState(RequestState from, RequestState to)
465 {
466 boolean updated = requestState.compareAndSet(from, to);
467 if (!updated)
468 LOG.debug("RequestState update failed: {} -> {}: {}", from, to, requestState.get());
469 return updated;
470 }
471
472 private boolean updateSenderState(SenderState from, SenderState to)
473 {
474 boolean updated = senderState.compareAndSet(from, to);
475 if (!updated)
476 LOG.debug("SenderState update failed: {} -> {}: {}", from, to, senderState.get());
477 return updated;
478 }
479
480 private boolean isBeforeCommit(RequestState requestState)
481 {
482 switch (requestState)
483 {
484 case QUEUED:
485 case BEGIN:
486 case HEADERS:
487 return true;
488 default:
489 return false;
490 }
491 }
492
493 private boolean isSending(RequestState requestState)
494 {
495 switch (requestState)
496 {
497 case COMMIT:
498 case CONTENT:
499 return true;
500 default:
501 return false;
502 }
503 }
504
505
506
507
508 protected enum RequestState
509 {
510
511
512
513 QUEUED,
514
515
516
517 BEGIN,
518
519
520
521 HEADERS,
522
523
524
525 COMMIT,
526
527
528
529 CONTENT,
530
531
532
533 FAILURE
534 }
535
536
537
538
539 private enum SenderState
540 {
541
542
543
544 IDLE,
545
546
547
548 SENDING,
549
550
551
552 EXPECTING,
553
554
555
556 WAITING,
557
558
559
560 SCHEDULED
561 }
562
563 private class CommitCallback implements Callback
564 {
565 @Override
566 public void succeeded()
567 {
568 try
569 {
570 process();
571 }
572
573 catch (Exception x)
574 {
575 anyToFailure(x);
576 }
577 }
578
579 private void process() throws Exception
580 {
581 HttpExchange exchange = getHttpExchange();
582 if (exchange == null)
583 return;
584
585 Request request = exchange.getRequest();
586 if (!headersToCommit(request))
587 return;
588
589 HttpContent content = HttpSender.this.content;
590
591 if (!content.hasContent())
592 {
593
594 someToSuccess(exchange);
595 }
596 else
597 {
598
599 ByteBuffer contentBuffer = content.getContent();
600 if (contentBuffer != null)
601 {
602 if (!someToContent(request, contentBuffer))
603 return;
604 }
605
606 while (true)
607 {
608 SenderState current = senderState.get();
609 switch (current)
610 {
611 case SENDING:
612 {
613
614 if (content.advance())
615 {
616 sendContent(exchange, content, contentCallback);
617 }
618 else
619 {
620 if (content.isConsumed())
621 {
622 sendContent(exchange, content, lastCallback);
623 }
624 else
625 {
626 if (!updateSenderState(current, SenderState.IDLE))
627 break;
628 LOG.debug("Waiting for deferred content for {}", request);
629 }
630 }
631 return;
632 }
633 case EXPECTING:
634 {
635
636 if (!updateSenderState(current, SenderState.WAITING))
637 break;
638 return;
639 }
640 case SCHEDULED:
641 {
642 if (expects100Continue(request))
643 return;
644
645 updateSenderState(current, SenderState.SENDING);
646 break;
647 }
648 default:
649 {
650 throw new IllegalStateException();
651 }
652 }
653 }
654 }
655 }
656
657 @Override
658 public void failed(Throwable failure)
659 {
660 anyToFailure(failure);
661 }
662 }
663
664 private class ContentCallback extends IteratingCallback
665 {
666 @Override
667 protected boolean process() throws Exception
668 {
669 HttpExchange exchange = getHttpExchange();
670 if (exchange == null)
671 return false;
672
673 Request request = exchange.getRequest();
674 HttpContent content = HttpSender.this.content;
675
676 ByteBuffer contentBuffer = content.getContent();
677 if (contentBuffer != null)
678 {
679 if (!someToContent(request, contentBuffer))
680 return false;
681 }
682
683 if (content.advance())
684 {
685
686 sendContent(exchange, content, this);
687 }
688 else
689 {
690 if (content.isConsumed())
691 {
692 sendContent(exchange, content, lastCallback);
693 }
694 else
695 {
696 while (true)
697 {
698 SenderState current = senderState.get();
699 switch (current)
700 {
701 case SENDING:
702 {
703 if (updateSenderState(current, SenderState.IDLE))
704 {
705 LOG.debug("Waiting for deferred content for {}", request);
706 return false;
707 }
708 break;
709 }
710 case SCHEDULED:
711 {
712 if (updateSenderState(current, SenderState.SENDING))
713 {
714 LOG.debug("Deferred content available for {}", request);
715
716 sendContent(exchange, content, this);
717 return false;
718 }
719 break;
720 }
721 default:
722 {
723 throw new IllegalStateException();
724 }
725 }
726 }
727 }
728 }
729 return false;
730 }
731
732 @Override
733 protected void completed()
734 {
735
736
737 }
738
739 @Override
740 public void failed(Throwable failure)
741 {
742 super.failed(failure);
743 anyToFailure(failure);
744 }
745 }
746
747 private class LastContentCallback implements Callback
748 {
749 @Override
750 public void succeeded()
751 {
752 HttpExchange exchange = getHttpExchange();
753 if (exchange == null)
754 return;
755 someToSuccess(exchange);
756 }
757
758 @Override
759 public void failed(Throwable failure)
760 {
761 anyToFailure(failure);
762 }
763 }
764 }