1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.nio.ByteBuffer;
22 import java.util.concurrent.atomic.AtomicReference;
23
24 import org.eclipse.jetty.client.api.ContentProvider;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.client.api.Result;
27 import org.eclipse.jetty.http.HttpHeader;
28 import org.eclipse.jetty.http.HttpHeaderValue;
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.Callback;
31 import org.eclipse.jetty.util.IteratingCallback;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public abstract class HttpSender implements AsyncContentProvider.Listener
59 {
60 protected static final Logger LOG = Log.getLogger(HttpSender.class);
61
62 private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
63 private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
64 private final Callback commitCallback = new CommitCallback();
65 private final IteratingCallback contentCallback = new ContentCallback();
66 private final Callback lastCallback = new LastContentCallback();
67 private final HttpChannel channel;
68 private HttpContent content;
69 private Throwable failure;
70
71 protected HttpSender(HttpChannel channel)
72 {
73 this.channel = channel;
74 }
75
76 protected HttpChannel getHttpChannel()
77 {
78 return channel;
79 }
80
81 protected HttpExchange getHttpExchange()
82 {
83 return channel.getHttpExchange();
84 }
85
86 @Override
87 public void onContent()
88 {
89 HttpExchange exchange = getHttpExchange();
90 if (exchange == null)
91 return;
92
93 while (true)
94 {
95 SenderState current = senderState.get();
96 switch (current)
97 {
98 case IDLE:
99 {
100 SenderState newSenderState = SenderState.SENDING;
101 if (updateSenderState(current, newSenderState))
102 {
103 if (LOG.isDebugEnabled())
104 LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
105 contentCallback.iterate();
106 return;
107 }
108 break;
109 }
110 case SENDING:
111 {
112 SenderState newSenderState = SenderState.SENDING_WITH_CONTENT;
113 if (updateSenderState(current, newSenderState))
114 {
115 if (LOG.isDebugEnabled())
116 LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
117 return;
118 }
119 break;
120 }
121 case EXPECTING:
122 {
123 SenderState newSenderState = SenderState.EXPECTING_WITH_CONTENT;
124 if (updateSenderState(current, newSenderState))
125 {
126 if (LOG.isDebugEnabled())
127 LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
128 return;
129 }
130 break;
131 }
132 case PROCEEDING:
133 {
134 SenderState newSenderState = SenderState.PROCEEDING_WITH_CONTENT;
135 if (updateSenderState(current, newSenderState))
136 {
137 if (LOG.isDebugEnabled())
138 LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
139 return;
140 }
141 break;
142 }
143 case SENDING_WITH_CONTENT:
144 case EXPECTING_WITH_CONTENT:
145 case PROCEEDING_WITH_CONTENT:
146 case WAITING:
147 case COMPLETED:
148 case FAILED:
149 {
150 if (LOG.isDebugEnabled())
151 LOG.debug("Deferred content available, {}", current);
152 return;
153 }
154 default:
155 {
156 illegalSenderState(current);
157 return;
158 }
159 }
160 }
161 }
162
163 public void send(HttpExchange exchange)
164 {
165 if (!queuedToBegin(exchange))
166 return;
167
168 Request request = exchange.getRequest();
169 ContentProvider contentProvider = request.getContent();
170 HttpContent content = this.content = new HttpContent(contentProvider);
171
172 SenderState newSenderState = SenderState.SENDING;
173 if (expects100Continue(request))
174 newSenderState = content.hasContent() ? SenderState.EXPECTING_WITH_CONTENT : SenderState.EXPECTING;
175
176 out: while (true)
177 {
178 SenderState current = senderState.get();
179 switch (current)
180 {
181 case IDLE:
182 case COMPLETED:
183 {
184 if (updateSenderState(current, newSenderState))
185 break out;
186 break;
187 }
188 default:
189 {
190 illegalSenderState(current);
191 return;
192 }
193 }
194 }
195
196
197
198 if (contentProvider instanceof AsyncContentProvider)
199 ((AsyncContentProvider)contentProvider).setListener(this);
200
201 if (!beginToHeaders(exchange))
202 return;
203
204 sendHeaders(exchange, content, commitCallback);
205 }
206
207 protected boolean expects100Continue(Request request)
208 {
209 return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
210 }
211
212 protected boolean queuedToBegin(HttpExchange exchange)
213 {
214 if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
215 return false;
216
217 Request request = exchange.getRequest();
218 if (LOG.isDebugEnabled())
219 LOG.debug("Request begin {}", request);
220 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
221 notifier.notifyBegin(request);
222
223 if (updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
224 return true;
225
226 terminateRequest(exchange);
227 return false;
228 }
229
230 protected boolean beginToHeaders(HttpExchange exchange)
231 {
232 if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
233 return false;
234
235 Request request = exchange.getRequest();
236 if (LOG.isDebugEnabled())
237 LOG.debug("Request headers {}{}{}", request, System.lineSeparator(), request.getHeaders().toString().trim());
238 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
239 notifier.notifyHeaders(request);
240
241 if (updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
242 return true;
243
244 terminateRequest(exchange);
245 return false;
246 }
247
248 protected boolean headersToCommit(HttpExchange exchange)
249 {
250 if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
251 return false;
252
253 Request request = exchange.getRequest();
254 if (LOG.isDebugEnabled())
255 LOG.debug("Request committed {}", request);
256 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
257 notifier.notifyCommit(request);
258
259 if (updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
260 return true;
261
262 terminateRequest(exchange);
263 return false;
264 }
265
266 protected boolean someToContent(HttpExchange exchange, ByteBuffer content)
267 {
268 RequestState current = requestState.get();
269 switch (current)
270 {
271 case COMMIT:
272 case CONTENT:
273 {
274 if (!updateRequestState(current, RequestState.TRANSIENT))
275 return false;
276
277 Request request = exchange.getRequest();
278 if (LOG.isDebugEnabled())
279 LOG.debug("Request content {}{}{}", request, System.lineSeparator(), BufferUtil.toDetailString(content));
280 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
281 notifier.notifyContent(request, content);
282
283 if (updateRequestState(RequestState.TRANSIENT, RequestState.CONTENT))
284 return true;
285
286 terminateRequest(exchange);
287 return false;
288 }
289 default:
290 {
291 return false;
292 }
293 }
294 }
295
296 protected boolean someToSuccess(HttpExchange exchange)
297 {
298 RequestState current = requestState.get();
299 switch (current)
300 {
301 case COMMIT:
302 case CONTENT:
303 {
304
305
306 if (!exchange.requestComplete(null))
307 return false;
308
309 requestState.set(RequestState.QUEUED);
310
311
312 reset();
313
314 Request request = exchange.getRequest();
315 if (LOG.isDebugEnabled())
316 LOG.debug("Request success {}", request);
317 HttpDestination destination = getHttpChannel().getHttpDestination();
318 destination.getRequestNotifier().notifySuccess(exchange.getRequest());
319
320
321
322 Result result = exchange.terminateRequest();
323 terminateRequest(exchange, null, result);
324 return true;
325 }
326 default:
327 {
328 return false;
329 }
330 }
331 }
332
333 protected boolean anyToFailure(Throwable failure)
334 {
335 HttpExchange exchange = getHttpExchange();
336 if (exchange == null)
337 return false;
338
339
340
341 if (exchange.requestComplete(failure))
342 return abort(exchange, failure);
343
344 return false;
345 }
346
347 private void terminateRequest(HttpExchange exchange)
348 {
349
350
351 Throwable failure = this.failure;
352 if (failure == null)
353 failure = new HttpRequestException("Concurrent failure", exchange.getRequest());
354 Result result = exchange.terminateRequest();
355 terminateRequest(exchange, failure, result);
356 }
357
358 private void terminateRequest(HttpExchange exchange, Throwable failure, Result result)
359 {
360 Request request = exchange.getRequest();
361
362 if (LOG.isDebugEnabled())
363 LOG.debug("Terminating request {}", request);
364
365 if (result == null)
366 {
367 if (failure != null)
368 {
369 if (exchange.responseComplete(failure))
370 {
371 if (LOG.isDebugEnabled())
372 LOG.debug("Response failure from request {} {}", request, exchange);
373 getHttpChannel().abortResponse(exchange, failure);
374 }
375 }
376 }
377 else
378 {
379 HttpDestination destination = getHttpChannel().getHttpDestination();
380 boolean ordered = destination.getHttpClient().isStrictEventOrdering();
381 if (!ordered)
382 channel.exchangeTerminated(exchange, result);
383 if (LOG.isDebugEnabled())
384 LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
385 HttpConversation conversation = exchange.getConversation();
386 destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
387 if (ordered)
388 channel.exchangeTerminated(exchange, result);
389 }
390 }
391
392
393
394
395
396
397
398
399
400
401
402
403 protected abstract void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback);
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421 protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
422
423 protected void reset()
424 {
425 HttpContent content = this.content;
426 this.content = null;
427 content.close();
428 senderState.set(SenderState.COMPLETED);
429 }
430
431 protected void dispose()
432 {
433 HttpContent content = this.content;
434 this.content = null;
435 if (content != null)
436 content.close();
437 senderState.set(SenderState.FAILED);
438 }
439
440 public void proceed(HttpExchange exchange, Throwable failure)
441 {
442 if (!expects100Continue(exchange.getRequest()))
443 return;
444
445 if (failure != null)
446 {
447 anyToFailure(failure);
448 return;
449 }
450
451 while (true)
452 {
453 SenderState current = senderState.get();
454 switch (current)
455 {
456 case EXPECTING:
457 {
458
459 if (updateSenderState(current, SenderState.PROCEEDING))
460 {
461 if (LOG.isDebugEnabled())
462 LOG.debug("Proceeding while expecting");
463 return;
464 }
465 break;
466 }
467 case EXPECTING_WITH_CONTENT:
468 {
469
470
471
472
473
474
475 if (updateSenderState(current, SenderState.PROCEEDING_WITH_CONTENT))
476 {
477 if (LOG.isDebugEnabled())
478 LOG.debug("Proceeding while scheduled");
479 return;
480 }
481 break;
482 }
483 case WAITING:
484 {
485
486 if (updateSenderState(current, SenderState.SENDING))
487 {
488 if (LOG.isDebugEnabled())
489 LOG.debug("Proceeding while waiting");
490 contentCallback.iterate();
491 return;
492 }
493 break;
494 }
495 case FAILED:
496 {
497 return;
498 }
499 default:
500 {
501 illegalSenderState(current);
502 return;
503 }
504 }
505 }
506 }
507
508 public boolean abort(HttpExchange exchange, Throwable failure)
509 {
510
511 boolean terminate;
512 out: while (true)
513 {
514 RequestState current = requestState.get();
515 switch (current)
516 {
517 case FAILURE:
518 {
519 return false;
520 }
521 default:
522 {
523 if (updateRequestState(current, RequestState.FAILURE))
524 {
525 terminate = current != RequestState.TRANSIENT;
526 break out;
527 }
528 break;
529 }
530 }
531 }
532
533 this.failure = failure;
534
535 dispose();
536
537 Request request = exchange.getRequest();
538 if (LOG.isDebugEnabled())
539 LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
540 HttpDestination destination = getHttpChannel().getHttpDestination();
541 destination.getRequestNotifier().notifyFailure(request, failure);
542
543 if (terminate)
544 {
545
546
547 Result result = exchange.terminateRequest();
548 terminateRequest(exchange, failure, result);
549 }
550 else
551 {
552 if (LOG.isDebugEnabled())
553 LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
554 }
555
556 return true;
557 }
558
559 private boolean updateRequestState(RequestState from, RequestState to)
560 {
561 boolean updated = requestState.compareAndSet(from, to);
562 if (!updated && LOG.isDebugEnabled())
563 LOG.debug("RequestState update failed: {} -> {}: {}", from, to, requestState.get());
564 return updated;
565 }
566
567 private boolean updateSenderState(SenderState from, SenderState to)
568 {
569 boolean updated = senderState.compareAndSet(from, to);
570 if (!updated && LOG.isDebugEnabled())
571 LOG.debug("SenderState update failed: {} -> {}: {}", from, to, senderState.get());
572 return updated;
573 }
574
575 private void illegalSenderState(SenderState current)
576 {
577 anyToFailure(new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead"));
578 }
579
580 @Override
581 public String toString()
582 {
583 return String.format("%s@%x(req=%s,snd=%s,failure=%s)",
584 getClass().getSimpleName(),
585 hashCode(),
586 requestState,
587 senderState,
588 failure);
589 }
590
591
592
593
594 private enum RequestState
595 {
596
597
598
599 TRANSIENT,
600
601
602
603 QUEUED,
604
605
606
607 BEGIN,
608
609
610
611 HEADERS,
612
613
614
615 COMMIT,
616
617
618
619 CONTENT,
620
621
622
623 FAILURE
624 }
625
626
627
628
629 private enum SenderState
630 {
631
632
633
634 IDLE,
635
636
637
638 SENDING,
639
640
641
642 SENDING_WITH_CONTENT,
643
644
645
646 EXPECTING,
647
648
649
650 EXPECTING_WITH_CONTENT,
651
652
653
654 WAITING,
655
656
657
658 PROCEEDING,
659
660
661
662 PROCEEDING_WITH_CONTENT,
663
664
665
666 COMPLETED,
667
668
669
670 FAILED
671 }
672
673 private class CommitCallback implements Callback
674 {
675
676 @Override
677 public boolean isNonBlocking()
678 {
679 return content.isNonBlocking();
680 }
681
682 @Override
683 public void succeeded()
684 {
685 try
686 {
687 HttpContent content = HttpSender.this.content;
688 if (content == null)
689 return;
690 content.succeeded();
691 process();
692 }
693 catch (Throwable x)
694 {
695 anyToFailure(x);
696 }
697 }
698
699 @Override
700 public void failed(Throwable failure)
701 {
702 HttpContent content = HttpSender.this.content;
703 if (content == null)
704 return;
705 content.failed(failure);
706 anyToFailure(failure);
707 }
708
709 private void process() throws Exception
710 {
711 HttpExchange exchange = getHttpExchange();
712 if (exchange == null)
713 return;
714
715 if (!headersToCommit(exchange))
716 return;
717
718 HttpContent content = HttpSender.this.content;
719 if (content == null)
720 return;
721
722 if (!content.hasContent())
723 {
724
725 someToSuccess(exchange);
726 }
727 else
728 {
729
730 ByteBuffer contentBuffer = content.getContent();
731 if (contentBuffer != null)
732 {
733 if (!someToContent(exchange, contentBuffer))
734 return;
735 }
736
737 while (true)
738 {
739 SenderState current = senderState.get();
740 switch (current)
741 {
742 case SENDING:
743 {
744 contentCallback.iterate();
745 return;
746 }
747 case SENDING_WITH_CONTENT:
748 {
749
750 updateSenderState(current, SenderState.SENDING);
751 break;
752 }
753 case EXPECTING:
754 {
755
756 if (updateSenderState(current, SenderState.WAITING))
757 return;
758 break;
759 }
760 case EXPECTING_WITH_CONTENT:
761 {
762
763
764 if (updateSenderState(current, SenderState.WAITING))
765 return;
766 break;
767 }
768 case PROCEEDING:
769 {
770
771
772 if (updateSenderState(current, SenderState.IDLE))
773 return;
774 break;
775 }
776 case PROCEEDING_WITH_CONTENT:
777 {
778
779
780 updateSenderState(current, SenderState.SENDING);
781 break;
782 }
783 case FAILED:
784 {
785 return;
786 }
787 default:
788 {
789 illegalSenderState(current);
790 return;
791 }
792 }
793 }
794 }
795 }
796 }
797
798 private class ContentCallback extends IteratingCallback
799 {
800 @Override
801 protected Action process() throws Exception
802 {
803 HttpExchange exchange = getHttpExchange();
804 if (exchange == null)
805 return Action.IDLE;
806
807 HttpContent content = HttpSender.this.content;
808 if (content == null)
809 return Action.IDLE;
810
811 while (true)
812 {
813 boolean advanced = content.advance();
814 boolean lastContent = content.isLast();
815 if (LOG.isDebugEnabled())
816 LOG.debug("Content present {}, last {}, consumed {} for {}", advanced, lastContent, content.isConsumed(), exchange.getRequest());
817
818 if (advanced)
819 {
820 sendContent(exchange, content, this);
821 return Action.SCHEDULED;
822 }
823
824 if (lastContent)
825 {
826 sendContent(exchange, content, lastCallback);
827 return Action.IDLE;
828 }
829
830 SenderState current = senderState.get();
831 switch (current)
832 {
833 case SENDING:
834 {
835 if (updateSenderState(current, SenderState.IDLE))
836 {
837 if (LOG.isDebugEnabled())
838 LOG.debug("Content is deferred for {}", exchange.getRequest());
839 return Action.IDLE;
840 }
841 break;
842 }
843 case SENDING_WITH_CONTENT:
844 {
845 updateSenderState(current, SenderState.SENDING);
846 break;
847 }
848 default:
849 {
850 illegalSenderState(current);
851 return Action.IDLE;
852 }
853 }
854 }
855 }
856
857 @Override
858 public void succeeded()
859 {
860 HttpExchange exchange = getHttpExchange();
861 if (exchange == null)
862 return;
863 HttpContent content = HttpSender.this.content;
864 if (content == null)
865 return;
866 content.succeeded();
867 ByteBuffer buffer = content.getContent();
868 someToContent(exchange, buffer);
869 super.succeeded();
870 }
871
872 @Override
873 public void onCompleteFailure(Throwable failure)
874 {
875 HttpContent content = HttpSender.this.content;
876 if (content == null)
877 return;
878 content.failed(failure);
879 anyToFailure(failure);
880 }
881
882 @Override
883 protected void onCompleteSuccess()
884 {
885
886
887 }
888 }
889
890 private class LastContentCallback implements Callback
891 {
892 @Override
893 public boolean isNonBlocking()
894 {
895 return content.isNonBlocking();
896 }
897
898 @Override
899 public void succeeded()
900 {
901 HttpExchange exchange = getHttpExchange();
902 if (exchange == null)
903 return;
904 HttpContent content = HttpSender.this.content;
905 if (content == null)
906 return;
907 content.succeeded();
908 someToSuccess(exchange);
909 }
910
911 @Override
912 public void failed(Throwable failure)
913 {
914 HttpContent content = HttpSender.this.content;
915 if (content == null)
916 return;
917 content.failed(failure);
918 anyToFailure(failure);
919 }
920 }
921 }