1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.nio.ByteBuffer;
22 import java.util.concurrent.atomic.AtomicReference;
23
24 import org.eclipse.jetty.client.api.ContentProvider;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.client.api.Result;
27 import org.eclipse.jetty.http.HttpHeader;
28 import org.eclipse.jetty.http.HttpHeaderValue;
29 import org.eclipse.jetty.util.BufferUtil;
30 import org.eclipse.jetty.util.Callback;
31 import org.eclipse.jetty.util.IteratingCallback;
32 import org.eclipse.jetty.util.log.Log;
33 import org.eclipse.jetty.util.log.Logger;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public abstract class HttpSender implements AsyncContentProvider.Listener
59 {
60 protected static final Logger LOG = Log.getLogger(HttpSender.class);
61
62 private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
63 private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
64 private final Callback commitCallback = new CommitCallback();
65 private final IteratingCallback contentCallback = new ContentCallback();
66 private final Callback lastCallback = new LastContentCallback();
67 private final HttpChannel channel;
68 private HttpContent content;
69 private Throwable failure;
70
71 protected HttpSender(HttpChannel channel)
72 {
73 this.channel = channel;
74 }
75
76 protected HttpChannel getHttpChannel()
77 {
78 return channel;
79 }
80
81 protected HttpExchange getHttpExchange()
82 {
83 return channel.getHttpExchange();
84 }
85
86 @Override
87 public void onContent()
88 {
89 HttpExchange exchange = getHttpExchange();
90 if (exchange == null)
91 return;
92
93 while (true)
94 {
95 SenderState current = senderState.get();
96 switch (current)
97 {
98 case IDLE:
99 {
100 SenderState newSenderState = SenderState.SENDING;
101 if (updateSenderState(current, newSenderState))
102 {
103 if (LOG.isDebugEnabled())
104 LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
105 contentCallback.iterate();
106 return;
107 }
108 break;
109 }
110 case SENDING:
111 {
112 SenderState newSenderState = SenderState.SENDING_WITH_CONTENT;
113 if (updateSenderState(current, newSenderState))
114 {
115 if (LOG.isDebugEnabled())
116 LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
117 return;
118 }
119 break;
120 }
121 case EXPECTING:
122 {
123 SenderState newSenderState = SenderState.EXPECTING_WITH_CONTENT;
124 if (updateSenderState(current, newSenderState))
125 {
126 if (LOG.isDebugEnabled())
127 LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
128 return;
129 }
130 break;
131 }
132 case PROCEEDING:
133 {
134 SenderState newSenderState = SenderState.PROCEEDING_WITH_CONTENT;
135 if (updateSenderState(current, newSenderState))
136 {
137 if (LOG.isDebugEnabled())
138 LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
139 return;
140 }
141 break;
142 }
143 case SENDING_WITH_CONTENT:
144 case EXPECTING_WITH_CONTENT:
145 case PROCEEDING_WITH_CONTENT:
146 case WAITING:
147 {
148 if (LOG.isDebugEnabled())
149 LOG.debug("Deferred content available, {}", current);
150 return;
151 }
152 default:
153 {
154 throw illegalSenderState(current);
155 }
156 }
157 }
158 }
159
160 public void send(HttpExchange exchange)
161 {
162 Request request = exchange.getRequest();
163 Throwable cause = request.getAbortCause();
164 if (cause != null)
165 {
166 exchange.abort(cause);
167 }
168 else
169 {
170 if (!queuedToBegin(request))
171 throw new IllegalStateException();
172
173 ContentProvider contentProvider = request.getContent();
174 HttpContent content = this.content = new HttpContent(contentProvider);
175
176 SenderState newSenderState = SenderState.SENDING;
177 if (expects100Continue(request))
178 newSenderState = content.hasContent() ? SenderState.EXPECTING_WITH_CONTENT : SenderState.EXPECTING;
179 if (!updateSenderState(SenderState.IDLE, newSenderState))
180 throw illegalSenderState(SenderState.IDLE);
181
182
183
184 if (contentProvider instanceof AsyncContentProvider)
185 ((AsyncContentProvider)contentProvider).setListener(this);
186
187 if (!beginToHeaders(request))
188 return;
189
190 sendHeaders(exchange, content, commitCallback);
191 }
192 }
193
194 protected boolean expects100Continue(Request request)
195 {
196 return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
197 }
198
199 protected boolean queuedToBegin(Request request)
200 {
201 if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
202 return false;
203 if (LOG.isDebugEnabled())
204 LOG.debug("Request begin {}", request);
205 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
206 notifier.notifyBegin(request);
207 if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
208 terminateRequest(getHttpExchange(), failure, false);
209 return true;
210 }
211
212 protected boolean beginToHeaders(Request request)
213 {
214 if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
215 return false;
216 if (LOG.isDebugEnabled())
217 LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim());
218 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
219 notifier.notifyHeaders(request);
220 if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
221 terminateRequest(getHttpExchange(), failure, false);
222 return true;
223 }
224
225 protected boolean headersToCommit(Request request)
226 {
227 if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
228 return false;
229 if (LOG.isDebugEnabled())
230 LOG.debug("Request committed {}", request);
231 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
232 notifier.notifyCommit(request);
233 if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
234 terminateRequest(getHttpExchange(), failure, true);
235 return true;
236 }
237
238 protected boolean someToContent(Request request, ByteBuffer content)
239 {
240 RequestState current = requestState.get();
241 switch (current)
242 {
243 case COMMIT:
244 case CONTENT:
245 {
246 if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT))
247 return false;
248 if (LOG.isDebugEnabled())
249 LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content));
250 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
251 notifier.notifyContent(request, content);
252 if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
253 terminateRequest(getHttpExchange(), failure, true);
254 return true;
255 }
256 default:
257 {
258 return false;
259 }
260 }
261 }
262
263 protected boolean someToSuccess(HttpExchange exchange)
264 {
265 RequestState current = requestState.get();
266 switch (current)
267 {
268 case COMMIT:
269 case CONTENT:
270 {
271
272
273 boolean completed = exchange.requestComplete();
274 if (!completed)
275 return false;
276
277 requestState.set(RequestState.QUEUED);
278
279
280 reset();
281
282
283
284 Result result = exchange.terminateRequest(null);
285
286 Request request = exchange.getRequest();
287 if (LOG.isDebugEnabled())
288 LOG.debug("Request success {}", request);
289 HttpDestination destination = getHttpChannel().getHttpDestination();
290 destination.getRequestNotifier().notifySuccess(exchange.getRequest());
291
292 terminateRequest(exchange, null, true, result);
293
294 return true;
295 }
296 default:
297 {
298 return false;
299 }
300 }
301 }
302
303 protected boolean anyToFailure(Throwable failure)
304 {
305 HttpExchange exchange = getHttpExchange();
306 if (exchange == null)
307 return false;
308
309
310
311 boolean completed = exchange.requestComplete();
312 if (!completed)
313 return false;
314
315 this.failure = failure;
316
317
318 RequestState current;
319 boolean fail;
320 while (true)
321 {
322 current = requestState.get();
323 if (updateRequestState(current, RequestState.FAILURE))
324 {
325 fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT;
326 break;
327 }
328 }
329
330 dispose();
331
332
333
334 Result result = exchange.terminateRequest(failure);
335
336 Request request = exchange.getRequest();
337 if (LOG.isDebugEnabled())
338 LOG.debug("Request failure {} {}", exchange, failure);
339 HttpDestination destination = getHttpChannel().getHttpDestination();
340 destination.getRequestNotifier().notifyFailure(request, failure);
341
342 if (fail)
343 {
344 terminateRequest(exchange, failure, !isBeforeCommit(current), result);
345 }
346 else
347 {
348 if (LOG.isDebugEnabled())
349 LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
350 }
351
352 return true;
353 }
354
355 private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed)
356 {
357 if (exchange != null)
358 {
359 Result result = exchange.terminateRequest(failure);
360 terminateRequest(exchange, failure, committed, result);
361 }
362 }
363
364 private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed, Result result)
365 {
366 Request request = exchange.getRequest();
367
368 if (LOG.isDebugEnabled())
369 LOG.debug("Terminating request {}", request);
370
371 if (failure != null && !committed && result == null && request.getAbortCause() == null)
372 {
373
374 if (exchange.responseComplete())
375 {
376 result = exchange.terminateResponse(failure);
377 if (LOG.isDebugEnabled())
378 LOG.debug("Failed response from request {}", exchange);
379 }
380 }
381
382 if (result != null)
383 {
384 HttpDestination destination = getHttpChannel().getHttpDestination();
385 boolean ordered = destination.getHttpClient().isStrictEventOrdering();
386 if (!ordered)
387 channel.exchangeTerminated(result);
388 if (LOG.isDebugEnabled())
389 LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", request);
390 HttpConversation conversation = exchange.getConversation();
391 destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
392 if (ordered)
393 channel.exchangeTerminated(result);
394 }
395 }
396
397
398
399
400
401
402
403
404
405
406
407
408 protected abstract void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback);
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426 protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
427
428 protected void reset()
429 {
430 content.close();
431 content = null;
432 senderState.set(SenderState.IDLE);
433 }
434
435 protected void dispose()
436 {
437 HttpContent content = this.content;
438 if (content != null)
439 content.close();
440 }
441
442 public void proceed(HttpExchange exchange, Throwable failure)
443 {
444 if (!expects100Continue(exchange.getRequest()))
445 return;
446
447 if (failure != null)
448 {
449 anyToFailure(failure);
450 return;
451 }
452
453 while (true)
454 {
455 SenderState current = senderState.get();
456 switch (current)
457 {
458 case EXPECTING:
459 {
460
461 if (updateSenderState(current, SenderState.PROCEEDING))
462 {
463 if (LOG.isDebugEnabled())
464 LOG.debug("Proceeding while expecting");
465 return;
466 }
467 break;
468 }
469 case EXPECTING_WITH_CONTENT:
470 {
471
472
473
474
475
476
477 if (updateSenderState(current, SenderState.PROCEEDING_WITH_CONTENT))
478 {
479 if (LOG.isDebugEnabled())
480 LOG.debug("Proceeding while scheduled");
481 return;
482 }
483 break;
484 }
485 case WAITING:
486 {
487
488 if (!updateSenderState(current, SenderState.SENDING))
489 throw illegalSenderState(current);
490 if (LOG.isDebugEnabled())
491 LOG.debug("Proceeding while waiting");
492 contentCallback.iterate();
493 return;
494 }
495 default:
496 {
497 throw illegalSenderState(current);
498 }
499 }
500 }
501 }
502
503 public boolean abort(Throwable failure)
504 {
505 RequestState current = requestState.get();
506 boolean abortable = isBeforeCommit(current) || isSending(current);
507 return abortable && anyToFailure(failure);
508 }
509
510 private boolean updateRequestState(RequestState from, RequestState to)
511 {
512 boolean updated = requestState.compareAndSet(from, to);
513 if (!updated)
514 LOG.debug("RequestState update failed: {} -> {}: {}", from, to, requestState.get());
515 return updated;
516 }
517
518 private boolean updateSenderState(SenderState from, SenderState to)
519 {
520 boolean updated = senderState.compareAndSet(from, to);
521 if (!updated)
522 LOG.debug("SenderState update failed: {} -> {}: {}", from, to, senderState.get());
523 return updated;
524 }
525
526 private boolean isBeforeCommit(RequestState requestState)
527 {
528 switch (requestState)
529 {
530 case TRANSIENT:
531 case QUEUED:
532 case BEGIN:
533 case HEADERS:
534 return true;
535 default:
536 return false;
537 }
538 }
539
540 private boolean isSending(RequestState requestState)
541 {
542 switch (requestState)
543 {
544 case TRANSIENT_CONTENT:
545 case COMMIT:
546 case CONTENT:
547 return true;
548 default:
549 return false;
550 }
551 }
552
553 private RuntimeException illegalSenderState(SenderState current)
554 {
555 return new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead");
556 }
557
558
559
560
561 private enum RequestState
562 {
563
564
565
566 TRANSIENT,
567
568
569
570 TRANSIENT_CONTENT,
571
572
573
574 QUEUED,
575
576
577
578 BEGIN,
579
580
581
582 HEADERS,
583
584
585
586 COMMIT,
587
588
589
590 CONTENT,
591
592
593
594 FAILURE
595 }
596
597
598
599
600 private enum SenderState
601 {
602
603
604
605 IDLE,
606
607
608
609 SENDING,
610
611
612
613 SENDING_WITH_CONTENT,
614
615
616
617 EXPECTING,
618
619
620
621 EXPECTING_WITH_CONTENT,
622
623
624
625 WAITING,
626
627
628
629 PROCEEDING,
630
631
632
633 PROCEEDING_WITH_CONTENT
634 }
635
636 private class CommitCallback implements Callback
637 {
638 @Override
639 public void succeeded()
640 {
641 try
642 {
643 content.succeeded();
644 process();
645 }
646
647 catch (Exception x)
648 {
649 anyToFailure(x);
650 }
651 }
652
653 @Override
654 public void failed(Throwable failure)
655 {
656 content.failed(failure);
657 anyToFailure(failure);
658 }
659
660 private void process() throws Exception
661 {
662 HttpExchange exchange = getHttpExchange();
663 if (exchange == null)
664 return;
665
666 Request request = exchange.getRequest();
667 if (!headersToCommit(request))
668 return;
669
670 HttpContent content = HttpSender.this.content;
671
672 if (!content.hasContent())
673 {
674
675 someToSuccess(exchange);
676 }
677 else
678 {
679
680 ByteBuffer contentBuffer = content.getContent();
681 if (contentBuffer != null)
682 {
683 if (!someToContent(request, contentBuffer))
684 return;
685 }
686
687 while (true)
688 {
689 SenderState current = senderState.get();
690 switch (current)
691 {
692 case SENDING:
693 {
694 contentCallback.iterate();
695 return;
696 }
697 case SENDING_WITH_CONTENT:
698 {
699
700 updateSenderState(current, SenderState.SENDING);
701 break;
702 }
703 case EXPECTING:
704 {
705
706 if (updateSenderState(current, SenderState.WAITING))
707 return;
708 break;
709 }
710 case EXPECTING_WITH_CONTENT:
711 {
712
713
714 if (updateSenderState(current, SenderState.WAITING))
715 return;
716 break;
717 }
718 case PROCEEDING:
719 {
720
721
722 if (updateSenderState(current, SenderState.IDLE))
723 return;
724 break;
725 }
726 case PROCEEDING_WITH_CONTENT:
727 {
728
729
730 updateSenderState(current, SenderState.SENDING);
731 break;
732 }
733 default:
734 {
735 throw illegalSenderState(current);
736 }
737 }
738 }
739 }
740 }
741 }
742
743 private class ContentCallback extends IteratingCallback
744 {
745 @Override
746 protected Action process() throws Exception
747 {
748 HttpExchange exchange = getHttpExchange();
749 if (exchange == null)
750 return Action.IDLE;
751
752 HttpContent content = HttpSender.this.content;
753 while (true)
754 {
755 boolean advanced = content.advance();
756 boolean consumed = content.isConsumed();
757 if (LOG.isDebugEnabled())
758 LOG.debug("Content {} consumed {} for {}", advanced, consumed, exchange.getRequest());
759
760 if (advanced)
761 {
762 sendContent(exchange, content, this);
763 return Action.SCHEDULED;
764 }
765
766 if (consumed)
767 {
768 sendContent(exchange, content, lastCallback);
769 return Action.IDLE;
770 }
771
772 SenderState current = HttpSender.this.senderState.get();
773 switch (current)
774 {
775 case SENDING:
776 {
777 if (updateSenderState(current, SenderState.IDLE))
778 {
779 if (LOG.isDebugEnabled())
780 LOG.debug("Content is deferred for {}", exchange.getRequest());
781 return Action.IDLE;
782 }
783 break;
784 }
785 case SENDING_WITH_CONTENT:
786 {
787 updateSenderState(current, SenderState.SENDING);
788 break;
789 }
790 default:
791 {
792 throw illegalSenderState(current);
793 }
794 }
795 }
796 }
797
798 @Override
799 public void succeeded()
800 {
801 ByteBuffer buffer = content.getContent();
802 someToContent(getHttpExchange().getRequest(), buffer);
803 content.succeeded();
804 super.succeeded();
805 }
806
807 @Override
808 public void onCompleteFailure(Throwable failure)
809 {
810 content.failed(failure);
811 anyToFailure(failure);
812 }
813
814 @Override
815 protected void onCompleteSuccess()
816 {
817
818
819 }
820 }
821
822 private class LastContentCallback implements Callback
823 {
824 @Override
825 public void succeeded()
826 {
827 content.succeeded();
828 HttpExchange exchange = getHttpExchange();
829 if (exchange == null)
830 return;
831 someToSuccess(exchange);
832 }
833
834 @Override
835 public void failed(Throwable failure)
836 {
837 content.failed(failure);
838 anyToFailure(failure);
839 }
840 }
841 }