1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.nio.ByteBuffer;
22 import java.util.Collections;
23 import java.util.Iterator;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.atomic.AtomicMarkableReference;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.eclipse.jetty.client.api.ContentProvider;
30 import org.eclipse.jetty.client.api.Request;
31 import org.eclipse.jetty.client.api.Result;
32 import org.eclipse.jetty.http.HttpGenerator;
33 import org.eclipse.jetty.http.HttpHeader;
34 import org.eclipse.jetty.http.HttpHeaderValue;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.Callback;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41
42 public class HttpSender implements AsyncContentProvider.Listener
43 {
44 private static final Logger LOG = Log.getLogger(HttpSender.class);
45 private static final String EXPECT_100_ATTRIBUTE = HttpSender.class.getName() + ".expect100";
46
47 private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
48 private final AtomicReference<SendState> sendState = new AtomicReference<>(SendState.IDLE);
49 private final HttpGenerator generator = new HttpGenerator();
50 private final HttpConnection connection;
51 private Iterator<ByteBuffer> contentIterator;
52 private ContinueContentChunk continueContentChunk;
53
54 public HttpSender(HttpConnection connection)
55 {
56 this.connection = connection;
57 }
58
59 @Override
60 public void onContent()
61 {
62 while (true)
63 {
64 SendState current = sendState.get();
65 switch (current)
66 {
67 case IDLE:
68 {
69 if (updateSendState(current, SendState.EXECUTE))
70 {
71 LOG.debug("Deferred content available, sending");
72 send();
73 return;
74 }
75 break;
76 }
77 case EXECUTE:
78 {
79 if (updateSendState(current, SendState.SCHEDULE))
80 {
81 LOG.debug("Deferred content available, scheduling");
82 return;
83 }
84 break;
85 }
86 case SCHEDULE:
87 {
88 LOG.debug("Deferred content available, queueing");
89 return;
90 }
91 default:
92 {
93 throw new IllegalStateException();
94 }
95 }
96 }
97 }
98
99 public void send(HttpExchange exchange)
100 {
101 if (!updateState(State.IDLE, State.BEGIN))
102 throw new IllegalStateException();
103
104 Request request = exchange.getRequest();
105 Throwable cause = request.getAbortCause();
106 if (cause != null)
107 {
108 exchange.abort(cause);
109 }
110 else
111 {
112 LOG.debug("Sending {}", request);
113 RequestNotifier notifier = connection.getDestination().getRequestNotifier();
114 notifier.notifyBegin(request);
115
116 ContentProvider content = request.getContent();
117 this.contentIterator = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
118
119 boolean updated = updateSendState(SendState.IDLE, SendState.EXECUTE);
120 assert updated;
121
122
123
124 if (content instanceof AsyncContentProvider)
125 ((AsyncContentProvider)content).setListener(this);
126
127 send();
128 }
129 }
130
131 private void send()
132 {
133 SendState currentSendState = sendState.get();
134 assert currentSendState != SendState.IDLE : currentSendState;
135
136 HttpClient client = connection.getHttpClient();
137 ByteBufferPool bufferPool = client.getByteBufferPool();
138 ByteBuffer header = null;
139 ByteBuffer chunk = null;
140 try
141 {
142 HttpExchange exchange = connection.getExchange();
143
144 if (exchange == null)
145 return;
146
147 final Request request = exchange.getRequest();
148 HttpConversation conversation = exchange.getConversation();
149 HttpGenerator.RequestInfo requestInfo = null;
150
151
152
153 boolean expect100HeaderPresent = request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
154 final boolean expecting100ContinueResponse = expect100HeaderPresent && conversation.getAttribute(EXPECT_100_ATTRIBUTE) == null;
155 if (expecting100ContinueResponse)
156 conversation.setAttribute(EXPECT_100_ATTRIBUTE, Boolean.TRUE);
157
158 ContentChunk contentChunk = continueContentChunk;
159 continueContentChunk = null;
160 if (contentChunk == null)
161 contentChunk = new ContentChunk(contentIterator);
162
163 while (true)
164 {
165 ByteBuffer content = contentChunk.content;
166 final ByteBuffer contentBuffer = content == null ? null : content.slice();
167
168 HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, content, contentChunk.lastContent);
169 switch (result)
170 {
171 case NEED_INFO:
172 {
173 ContentProvider requestContent = request.getContent();
174 long contentLength = requestContent == null ? -1 : requestContent.getLength();
175 String path = request.getPath();
176 String query = request.getQuery();
177 if (query != null)
178 path += "?" + query;
179 requestInfo = new HttpGenerator.RequestInfo(request.getVersion(), request.getHeaders(), contentLength, request.method(), path);
180 break;
181 }
182 case NEED_HEADER:
183 {
184 header = bufferPool.acquire(client.getRequestBufferSize(), false);
185 break;
186 }
187 case NEED_CHUNK:
188 {
189 chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
190 break;
191 }
192 case FLUSH:
193 {
194 out:
195 while (true)
196 {
197 State currentState = state.get();
198 switch (currentState)
199 {
200 case BEGIN:
201 {
202 if (!updateState(currentState, State.HEADERS))
203 continue;
204 RequestNotifier notifier = connection.getDestination().getRequestNotifier();
205 notifier.notifyHeaders(request);
206 break out;
207 }
208 case HEADERS:
209 case COMMIT:
210 {
211
212 break out;
213 }
214 case FAILURE:
215 {
216
217
218 return;
219 }
220 default:
221 {
222 throw new IllegalStateException();
223 }
224 }
225 }
226
227 StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor())
228 {
229 @Override
230 protected void onSucceeded()
231 {
232 LOG.debug("Write succeeded for {}", request);
233
234 if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
235 return;
236
237 send();
238 }
239
240 @Override
241 protected void onFailed(Throwable x)
242 {
243 fail(x);
244 }
245 };
246
247 if (expecting100ContinueResponse)
248 {
249
250 continueContentChunk = new ContinueContentChunk(contentChunk);
251 }
252
253 write(callback, header, chunk, expecting100ContinueResponse ? null : content);
254
255 if (callback.process())
256 {
257 LOG.debug("Write pending for {}", request);
258 return;
259 }
260
261 if (callback.isSucceeded())
262 {
263 if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
264 return;
265
266
267 contentChunk = new ContentChunk(contentIterator);
268
269 if (contentChunk.isDeferred())
270 {
271 out:
272 while (true)
273 {
274 currentSendState = sendState.get();
275 switch (currentSendState)
276 {
277 case EXECUTE:
278 {
279 if (updateSendState(currentSendState, SendState.IDLE))
280 {
281 LOG.debug("Waiting for deferred content for {}", request);
282 return;
283 }
284 break;
285 }
286 case SCHEDULE:
287 {
288 if (updateSendState(currentSendState, SendState.EXECUTE))
289 {
290 LOG.debug("Deferred content available for {}", request);
291 break out;
292 }
293 break;
294 }
295 default:
296 {
297 throw new IllegalStateException();
298 }
299 }
300 }
301 }
302 }
303 break;
304 }
305 case SHUTDOWN_OUT:
306 {
307 shutdownOutput();
308 break;
309 }
310 case CONTINUE:
311 {
312 break;
313 }
314 case DONE:
315 {
316 if (generator.isEnd())
317 {
318 out: while (true)
319 {
320 currentSendState = sendState.get();
321 switch (currentSendState)
322 {
323 case EXECUTE:
324 case SCHEDULE:
325 {
326 if (!updateSendState(currentSendState, SendState.IDLE))
327 throw new IllegalStateException();
328 break out;
329 }
330 default:
331 {
332 throw new IllegalStateException();
333 }
334 }
335 }
336 success();
337 }
338 return;
339 }
340 default:
341 {
342 throw new IllegalStateException("Unknown result " + result);
343 }
344 }
345 }
346 }
347 catch (Exception x)
348 {
349 LOG.debug(x);
350 fail(x);
351 }
352 finally
353 {
354 releaseBuffers(bufferPool, header, chunk);
355 }
356 }
357
358 private boolean processWrite(Request request, ByteBuffer content, boolean expecting100ContinueResponse)
359 {
360 if (!commit(request))
361 return false;
362
363 if (content != null && content.hasRemaining())
364 {
365 RequestNotifier notifier = connection.getDestination().getRequestNotifier();
366 notifier.notifyContent(request, content);
367 }
368
369 if (expecting100ContinueResponse)
370 {
371 LOG.debug("Expecting 100 Continue for {}", request);
372 continueContentChunk.signal();
373 return false;
374 }
375
376 return true;
377 }
378
379 public void proceed(boolean proceed)
380 {
381 ContinueContentChunk contentChunk = continueContentChunk;
382 if (contentChunk != null)
383 {
384 if (proceed)
385 {
386
387
388
389
390
391
392 LOG.debug("Proceeding {}", connection.getExchange());
393 contentChunk.await();
394 send();
395 }
396 else
397 {
398 HttpExchange exchange = connection.getExchange();
399 if (exchange != null)
400 fail(new HttpRequestException("Expectation failed", exchange.getRequest()));
401 }
402 }
403 }
404
405 private void write(Callback callback, ByteBuffer header, ByteBuffer chunk, ByteBuffer content)
406 {
407 int mask = 0;
408 if (header != null)
409 mask += 1;
410 if (chunk != null)
411 mask += 2;
412 if (content != null)
413 mask += 4;
414
415 EndPoint endPoint = connection.getEndPoint();
416 switch (mask)
417 {
418 case 0:
419 endPoint.write(callback, BufferUtil.EMPTY_BUFFER);
420 break;
421 case 1:
422 endPoint.write(callback, header);
423 break;
424 case 2:
425 endPoint.write(callback, chunk);
426 break;
427 case 3:
428 endPoint.write(callback, header, chunk);
429 break;
430 case 4:
431 endPoint.write(callback, content);
432 break;
433 case 5:
434 endPoint.write(callback, header, content);
435 break;
436 case 6:
437 endPoint.write(callback, chunk, content);
438 break;
439 case 7:
440 endPoint.write(callback, header, chunk, content);
441 break;
442 default:
443 throw new IllegalStateException();
444 }
445 }
446
447 protected boolean commit(Request request)
448 {
449 while (true)
450 {
451 State current = state.get();
452 switch (current)
453 {
454 case HEADERS:
455 if (!updateState(current, State.COMMIT))
456 continue;
457 LOG.debug("Committed {}", request);
458 RequestNotifier notifier = connection.getDestination().getRequestNotifier();
459 notifier.notifyCommit(request);
460 return true;
461 case COMMIT:
462 if (!updateState(current, State.COMMIT))
463 continue;
464 return true;
465 case FAILURE:
466 return false;
467 default:
468 throw new IllegalStateException();
469 }
470 }
471 }
472
473 protected boolean success()
474 {
475 HttpExchange exchange = connection.getExchange();
476 if (exchange == null)
477 return false;
478
479 AtomicMarkableReference<Result> completion = exchange.requestComplete(null);
480 if (!completion.isMarked())
481 return false;
482
483 generator.reset();
484
485 if (!updateState(State.COMMIT, State.IDLE))
486 throw new IllegalStateException();
487
488 exchange.terminateRequest();
489
490
491
492
493 HttpDestination destination = connection.getDestination();
494 Request request = exchange.getRequest();
495 destination.getRequestNotifier().notifySuccess(request);
496 LOG.debug("Sent {}", request);
497
498 Result result = completion.getReference();
499 if (result != null)
500 {
501 connection.complete(exchange, !result.isFailed());
502
503 HttpConversation conversation = exchange.getConversation();
504 destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
505 }
506
507 return true;
508 }
509
510 protected boolean fail(Throwable failure)
511 {
512 HttpExchange exchange = connection.getExchange();
513 if (exchange == null)
514 return false;
515
516 AtomicMarkableReference<Result> completion = exchange.requestComplete(failure);
517 if (!completion.isMarked())
518 return false;
519
520 generator.abort();
521
522 State current;
523 while (true)
524 {
525 current = state.get();
526 if (updateState(current, State.FAILURE))
527 break;
528 }
529
530 shutdownOutput();
531
532 exchange.terminateRequest();
533
534 HttpDestination destination = connection.getDestination();
535 Request request = exchange.getRequest();
536 destination.getRequestNotifier().notifyFailure(request, failure);
537 LOG.debug("Failed {} {}", request, failure);
538
539 Result result = completion.getReference();
540 boolean notCommitted = isBeforeCommit(current);
541 if (result == null && notCommitted && request.getAbortCause() == null)
542 {
543 completion = exchange.responseComplete(failure);
544 if (completion.isMarked())
545 {
546 result = completion.getReference();
547 exchange.terminateResponse();
548 LOG.debug("Failed on behalf {}", exchange);
549 }
550 }
551
552 if (result != null)
553 {
554 connection.complete(exchange, false);
555
556 HttpConversation conversation = exchange.getConversation();
557 destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
558 }
559
560 return true;
561 }
562
563 private void shutdownOutput()
564 {
565 connection.getEndPoint().shutdownOutput();
566 }
567
568 public boolean abort(Throwable cause)
569 {
570 State current = state.get();
571 boolean abortable = isBeforeCommit(current) ||
572 current == State.COMMIT && contentIterator.hasNext();
573 return abortable && fail(cause);
574 }
575
576 private boolean isBeforeCommit(State state)
577 {
578 return state == State.IDLE || state == State.BEGIN || state == State.HEADERS;
579 }
580
581 private void releaseBuffers(ByteBufferPool bufferPool, ByteBuffer header, ByteBuffer chunk)
582 {
583 if (!BufferUtil.hasContent(header))
584 bufferPool.release(header);
585 if (!BufferUtil.hasContent(chunk))
586 bufferPool.release(chunk);
587 }
588
589 private boolean updateState(State from, State to)
590 {
591 boolean updated = state.compareAndSet(from, to);
592 if (!updated)
593 LOG.debug("State update failed: {} -> {}: {}", from, to, state.get());
594 return updated;
595 }
596
597 private boolean updateSendState(SendState from, SendState to)
598 {
599 boolean updated = sendState.compareAndSet(from, to);
600 if (!updated)
601 LOG.debug("Send state update failed: {} -> {}: {}", from, to, sendState.get());
602 return updated;
603 }
604
605 private enum State
606 {
607 IDLE, BEGIN, HEADERS, COMMIT, FAILURE
608 }
609
610 private enum SendState
611 {
612 IDLE, EXECUTE, SCHEDULE
613 }
614
615 private static abstract class StatefulExecutorCallback implements Callback, Runnable
616 {
617 private final AtomicReference<State> state = new AtomicReference<>(State.INCOMPLETE);
618 private final Executor executor;
619
620 private StatefulExecutorCallback(Executor executor)
621 {
622 this.executor = executor;
623 }
624
625 @Override
626 public final void succeeded()
627 {
628 State previous = state.get();
629 while (true)
630 {
631 if (state.compareAndSet(previous, State.SUCCEEDED))
632 break;
633 previous = state.get();
634 }
635 if (previous == State.PENDING)
636 executor.execute(this);
637 }
638
639 @Override
640 public final void run()
641 {
642 onSucceeded();
643 }
644
645 protected abstract void onSucceeded();
646
647 @Override
648 public final void failed(final Throwable x)
649 {
650 State previous = state.get();
651 while (true)
652 {
653 if (state.compareAndSet(previous, State.FAILED))
654 break;
655 previous = state.get();
656 }
657 if (previous == State.PENDING)
658 {
659 executor.execute(new Runnable()
660 {
661 @Override
662 public void run()
663 {
664 onFailed(x);
665 }
666 });
667 }
668 else
669 {
670 onFailed(x);
671 }
672 }
673
674 protected abstract void onFailed(Throwable x);
675
676 public boolean process()
677 {
678 return state.compareAndSet(State.INCOMPLETE, State.PENDING);
679 }
680
681 public boolean isSucceeded()
682 {
683 return state.get() == State.SUCCEEDED;
684 }
685
686 public boolean isFailed()
687 {
688 return state.get() == State.FAILED;
689 }
690
691 private enum State
692 {
693 INCOMPLETE, PENDING, SUCCEEDED, FAILED
694 }
695 }
696
697 private class ContentChunk
698 {
699 private final boolean lastContent;
700 private final ByteBuffer content;
701
702 private ContentChunk(ContentChunk chunk)
703 {
704 lastContent = chunk.lastContent;
705 content = chunk.content;
706 }
707
708 private ContentChunk(Iterator<ByteBuffer> contentIterator)
709 {
710 lastContent = !contentIterator.hasNext();
711 content = lastContent ? BufferUtil.EMPTY_BUFFER : contentIterator.next();
712 }
713
714 private boolean isDeferred()
715 {
716 return content == null && !lastContent;
717 }
718 }
719
720 private class ContinueContentChunk extends ContentChunk
721 {
722 private final CountDownLatch latch = new CountDownLatch(1);
723
724 private ContinueContentChunk(ContentChunk chunk)
725 {
726 super(chunk);
727 }
728
729 private void signal()
730 {
731 latch.countDown();
732 }
733
734 private void await()
735 {
736 try
737 {
738 latch.await();
739 }
740 catch (InterruptedException x)
741 {
742 LOG.ignore(x);
743 }
744 }
745 }
746 }