1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client;
20
21 import java.nio.ByteBuffer;
22 import java.util.Collections;
23 import java.util.Iterator;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.atomic.AtomicMarkableReference;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.eclipse.jetty.client.api.ContentProvider;
30 import org.eclipse.jetty.client.api.Request;
31 import org.eclipse.jetty.client.api.Result;
32 import org.eclipse.jetty.http.HttpGenerator;
33 import org.eclipse.jetty.http.HttpHeader;
34 import org.eclipse.jetty.http.HttpHeaderValue;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.util.BufferUtil;
38 import org.eclipse.jetty.util.Callback;
39 import org.eclipse.jetty.util.log.Log;
40 import org.eclipse.jetty.util.log.Logger;
41
42 public class HttpSender implements AsyncContentProvider.Listener
43 {
44 private static final Logger LOG = Log.getLogger(HttpSender.class);
45 private static final String EXPECT_100_ATTRIBUTE = HttpSender.class.getName() + ".expect100";
46
47 private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
48 private final AtomicReference<SendState> sendState = new AtomicReference<>(SendState.IDLE);
49 private final HttpGenerator generator = new HttpGenerator();
50 private final HttpConnection connection;
51 private Iterator<ByteBuffer> contentIterator;
52 private ContinueContentChunk continueContentChunk;
53
54 public HttpSender(HttpConnection connection)
55 {
56 this.connection = connection;
57 }
58
59 @Override
60 public void onContent()
61 {
62 while (true)
63 {
64 SendState current = sendState.get();
65 switch (current)
66 {
67 case IDLE:
68 {
69 if (updateSendState(current, SendState.EXECUTE))
70 {
71 LOG.debug("Deferred content available, sending");
72 send();
73 return;
74 }
75 break;
76 }
77 case EXECUTE:
78 {
79 if (updateSendState(current, SendState.SCHEDULE))
80 {
81 LOG.debug("Deferred content available, scheduling");
82 return;
83 }
84 break;
85 }
86 case SCHEDULE:
87 {
88 LOG.debug("Deferred content available, queueing");
89 return;
90 }
91 default:
92 {
93 throw new IllegalStateException();
94 }
95 }
96 }
97 }
98
99 public void send(HttpExchange exchange)
100 {
101 if (!updateState(State.IDLE, State.BEGIN))
102 throw new IllegalStateException();
103
104 Request request = exchange.getRequest();
105 Throwable cause = request.getAbortCause();
106 if (cause != null)
107 {
108 exchange.abort(cause);
109 }
110 else
111 {
112 LOG.debug("Sending {}", request);
113 RequestNotifier notifier = connection.getDestination().getRequestNotifier();
114 notifier.notifyBegin(request);
115
116 ContentProvider content = request.getContent();
117 this.contentIterator = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
118
119 boolean updated = updateSendState(SendState.IDLE, SendState.EXECUTE);
120 assert updated;
121
122
123
124 if (content instanceof AsyncContentProvider)
125 ((AsyncContentProvider)content).setListener(this);
126
127 send();
128 }
129 }
130
131 private void send()
132 {
133 SendState currentSendState = sendState.get();
134 assert currentSendState != SendState.IDLE : currentSendState;
135
136 HttpClient client = connection.getHttpClient();
137 ByteBufferPool bufferPool = client.getByteBufferPool();
138 ByteBuffer header = null;
139 ByteBuffer chunk = null;
140 try
141 {
142 HttpExchange exchange = connection.getExchange();
143
144 if (exchange == null)
145 return;
146
147 final Request request = exchange.getRequest();
148 HttpConversation conversation = exchange.getConversation();
149 HttpGenerator.RequestInfo requestInfo = null;
150
151
152
153 boolean expect100HeaderPresent = request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
154 final boolean expecting100ContinueResponse = expect100HeaderPresent && conversation.getAttribute(EXPECT_100_ATTRIBUTE) == null;
155 if (expecting100ContinueResponse)
156 conversation.setAttribute(EXPECT_100_ATTRIBUTE, Boolean.TRUE);
157
158 ContentChunk contentChunk = continueContentChunk;
159 continueContentChunk = null;
160 if (contentChunk == null)
161 contentChunk = new ContentChunk(contentIterator);
162
163 while (true)
164 {
165 ByteBuffer content = contentChunk.content;
166 final ByteBuffer contentBuffer = content == null ? null : content.slice();
167
168 HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, content, contentChunk.lastContent);
169 switch (result)
170 {
171 case NEED_INFO:
172 {
173 ContentProvider requestContent = request.getContent();
174 long contentLength = requestContent == null ? -1 : requestContent.getLength();
175 requestInfo = new HttpGenerator.RequestInfo(request.getVersion(), request.getHeaders(), contentLength, request.getMethod().asString(), request.getPath());
176 break;
177 }
178 case NEED_HEADER:
179 {
180 header = bufferPool.acquire(client.getRequestBufferSize(), false);
181 break;
182 }
183 case NEED_CHUNK:
184 {
185 chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
186 break;
187 }
188 case FLUSH:
189 {
190 out:
191 while (true)
192 {
193 State currentState = state.get();
194 switch (currentState)
195 {
196 case BEGIN:
197 {
198 if (!updateState(currentState, State.HEADERS))
199 continue;
200 RequestNotifier notifier = connection.getDestination().getRequestNotifier();
201 notifier.notifyHeaders(request);
202 break out;
203 }
204 case HEADERS:
205 case COMMIT:
206 {
207
208 break out;
209 }
210 case FAILURE:
211 {
212
213
214 return;
215 }
216 default:
217 {
218 throw new IllegalStateException();
219 }
220 }
221 }
222
223 StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor())
224 {
225 @Override
226 protected void onSucceeded()
227 {
228 LOG.debug("Write succeeded for {}", request);
229
230 if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
231 return;
232
233 send();
234 }
235
236 @Override
237 protected void onFailed(Throwable x)
238 {
239 fail(x);
240 }
241 };
242
243 if (expecting100ContinueResponse)
244 {
245
246 continueContentChunk = new ContinueContentChunk(contentChunk);
247 }
248
249 write(callback, header, chunk, expecting100ContinueResponse ? null : content);
250
251 if (callback.process())
252 {
253 LOG.debug("Write pending for {}", request);
254 return;
255 }
256
257 if (callback.isSucceeded())
258 {
259 if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
260 return;
261
262
263 contentChunk = new ContentChunk(contentIterator);
264
265 if (contentChunk.isDeferred())
266 {
267 out:
268 while (true)
269 {
270 currentSendState = sendState.get();
271 switch (currentSendState)
272 {
273 case EXECUTE:
274 {
275 if (updateSendState(currentSendState, SendState.IDLE))
276 {
277 LOG.debug("Waiting for deferred content for {}", request);
278 return;
279 }
280 break;
281 }
282 case SCHEDULE:
283 {
284 if (updateSendState(currentSendState, SendState.EXECUTE))
285 {
286 LOG.debug("Deferred content available for {}", request);
287 break out;
288 }
289 break;
290 }
291 default:
292 {
293 throw new IllegalStateException();
294 }
295 }
296 }
297 }
298 }
299 break;
300 }
301 case SHUTDOWN_OUT:
302 {
303 EndPoint endPoint = connection.getEndPoint();
304 endPoint.shutdownOutput();
305 break;
306 }
307 case CONTINUE:
308 {
309 break;
310 }
311 case DONE:
312 {
313 if (generator.isEnd())
314 {
315 out: while (true)
316 {
317 currentSendState = sendState.get();
318 switch (currentSendState)
319 {
320 case EXECUTE:
321 case SCHEDULE:
322 {
323 if (!updateSendState(currentSendState, SendState.IDLE))
324 throw new IllegalStateException();
325 break out;
326 }
327 default:
328 {
329 throw new IllegalStateException();
330 }
331 }
332 }
333 success();
334 }
335 return;
336 }
337 default:
338 {
339 throw new IllegalStateException("Unknown result " + result);
340 }
341 }
342 }
343 }
344 catch (Exception x)
345 {
346 LOG.debug(x);
347 fail(x);
348 }
349 finally
350 {
351 releaseBuffers(bufferPool, header, chunk);
352 }
353 }
354
355 private boolean processWrite(Request request, ByteBuffer content, boolean expecting100ContinueResponse)
356 {
357 if (!commit(request))
358 return false;
359
360 if (content != null)
361 {
362 RequestNotifier notifier = connection.getDestination().getRequestNotifier();
363 notifier.notifyContent(request, content);
364 }
365
366 if (expecting100ContinueResponse)
367 {
368 LOG.debug("Expecting 100 Continue for {}", request);
369 continueContentChunk.signal();
370 return false;
371 }
372
373 return true;
374 }
375
376 public void proceed(boolean proceed)
377 {
378 ContinueContentChunk contentChunk = continueContentChunk;
379 if (contentChunk != null)
380 {
381 if (proceed)
382 {
383
384
385
386
387
388
389 LOG.debug("Proceeding {}", connection.getExchange());
390 contentChunk.await();
391 send();
392 }
393 else
394 {
395 HttpExchange exchange = connection.getExchange();
396 if (exchange != null)
397 fail(new HttpRequestException("Expectation failed", exchange.getRequest()));
398 }
399 }
400 }
401
402 private void write(Callback callback, ByteBuffer header, ByteBuffer chunk, ByteBuffer content)
403 {
404 int mask = 0;
405 if (header != null)
406 mask += 1;
407 if (chunk != null)
408 mask += 2;
409 if (content != null)
410 mask += 4;
411
412 EndPoint endPoint = connection.getEndPoint();
413 switch (mask)
414 {
415 case 0:
416 endPoint.write(callback, BufferUtil.EMPTY_BUFFER);
417 break;
418 case 1:
419 endPoint.write(callback, header);
420 break;
421 case 2:
422 endPoint.write(callback, chunk);
423 break;
424 case 3:
425 endPoint.write(callback, header, chunk);
426 break;
427 case 4:
428 endPoint.write(callback, content);
429 break;
430 case 5:
431 endPoint.write(callback, header, content);
432 break;
433 case 6:
434 endPoint.write(callback, chunk, content);
435 break;
436 case 7:
437 endPoint.write(callback, header, chunk, content);
438 break;
439 default:
440 throw new IllegalStateException();
441 }
442 }
443
444 protected boolean commit(Request request)
445 {
446 while (true)
447 {
448 State current = state.get();
449 switch (current)
450 {
451 case HEADERS:
452 if (!updateState(current, State.COMMIT))
453 continue;
454 LOG.debug("Committed {}", request);
455 RequestNotifier notifier = connection.getDestination().getRequestNotifier();
456 notifier.notifyCommit(request);
457 return true;
458 case COMMIT:
459 if (!updateState(current, State.COMMIT))
460 continue;
461 return true;
462 case FAILURE:
463 return false;
464 default:
465 throw new IllegalStateException();
466 }
467 }
468 }
469
470 protected boolean success()
471 {
472 HttpExchange exchange = connection.getExchange();
473 if (exchange == null)
474 return false;
475
476 AtomicMarkableReference<Result> completion = exchange.requestComplete(null);
477 if (!completion.isMarked())
478 return false;
479
480 generator.reset();
481
482 if (!updateState(State.COMMIT, State.IDLE))
483 throw new IllegalStateException();
484
485 exchange.terminateRequest();
486
487
488
489
490 HttpDestination destination = connection.getDestination();
491 Request request = exchange.getRequest();
492 destination.getRequestNotifier().notifySuccess(request);
493 LOG.debug("Sent {}", request);
494
495 Result result = completion.getReference();
496 if (result != null)
497 {
498 connection.complete(exchange, !result.isFailed());
499
500 HttpConversation conversation = exchange.getConversation();
501 destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
502 }
503
504 return true;
505 }
506
507 protected boolean fail(Throwable failure)
508 {
509 HttpExchange exchange = connection.getExchange();
510 if (exchange == null)
511 return false;
512
513 AtomicMarkableReference<Result> completion = exchange.requestComplete(failure);
514 if (!completion.isMarked())
515 return false;
516
517 generator.abort();
518
519 State current;
520 while (true)
521 {
522 current = state.get();
523 if (updateState(current, State.FAILURE))
524 break;
525 }
526
527 exchange.terminateRequest();
528
529 HttpDestination destination = connection.getDestination();
530 Request request = exchange.getRequest();
531 destination.getRequestNotifier().notifyFailure(request, failure);
532 LOG.debug("Failed {} {}", request, failure);
533
534 Result result = completion.getReference();
535 boolean notCommitted = isBeforeCommit(current);
536 if (result == null && notCommitted && request.getAbortCause() == null)
537 {
538 result = exchange.responseComplete(failure).getReference();
539 exchange.terminateResponse();
540 LOG.debug("Failed on behalf {}", exchange);
541 }
542
543 if (result != null)
544 {
545 connection.complete(exchange, false);
546
547 HttpConversation conversation = exchange.getConversation();
548 destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
549 }
550
551 return true;
552 }
553
554 public boolean abort(Throwable cause)
555 {
556 State current = state.get();
557 boolean abortable = isBeforeCommit(current) ||
558 current == State.COMMIT && contentIterator.hasNext();
559 return abortable && fail(cause);
560 }
561
562 private boolean isBeforeCommit(State state)
563 {
564 return state == State.IDLE || state == State.BEGIN || state == State.HEADERS;
565 }
566
567 private void releaseBuffers(ByteBufferPool bufferPool, ByteBuffer header, ByteBuffer chunk)
568 {
569 if (!BufferUtil.hasContent(header))
570 bufferPool.release(header);
571 if (!BufferUtil.hasContent(chunk))
572 bufferPool.release(chunk);
573 }
574
575 private boolean updateState(State from, State to)
576 {
577 boolean updated = state.compareAndSet(from, to);
578 if (!updated)
579 LOG.debug("State update failed: {} -> {}: {}", from, to, state.get());
580 return updated;
581 }
582
583 private boolean updateSendState(SendState from, SendState to)
584 {
585 boolean updated = sendState.compareAndSet(from, to);
586 if (!updated)
587 LOG.debug("Send state update failed: {} -> {}: {}", from, to, sendState.get());
588 return updated;
589 }
590
591 private enum State
592 {
593 IDLE, BEGIN, HEADERS, COMMIT, FAILURE
594 }
595
596 private enum SendState
597 {
598 IDLE, EXECUTE, SCHEDULE
599 }
600
601 private static abstract class StatefulExecutorCallback implements Callback, Runnable
602 {
603 private final AtomicReference<State> state = new AtomicReference<>(State.INCOMPLETE);
604 private final Executor executor;
605
606 private StatefulExecutorCallback(Executor executor)
607 {
608 this.executor = executor;
609 }
610
611 @Override
612 public final void succeeded()
613 {
614 State previous = state.get();
615 while (true)
616 {
617 if (state.compareAndSet(previous, State.SUCCEEDED))
618 break;
619 previous = state.get();
620 }
621 if (previous == State.PENDING)
622 executor.execute(this);
623 }
624
625 @Override
626 public final void run()
627 {
628 onSucceeded();
629 }
630
631 protected abstract void onSucceeded();
632
633 @Override
634 public final void failed(final Throwable x)
635 {
636 State previous = state.get();
637 while (true)
638 {
639 if (state.compareAndSet(previous, State.FAILED))
640 break;
641 previous = state.get();
642 }
643 if (previous == State.PENDING)
644 {
645 executor.execute(new Runnable()
646 {
647 @Override
648 public void run()
649 {
650 onFailed(x);
651 }
652 });
653 }
654 else
655 {
656 onFailed(x);
657 }
658 }
659
660 protected abstract void onFailed(Throwable x);
661
662 public boolean process()
663 {
664 return state.compareAndSet(State.INCOMPLETE, State.PENDING);
665 }
666
667 public boolean isSucceeded()
668 {
669 return state.get() == State.SUCCEEDED;
670 }
671
672 public boolean isFailed()
673 {
674 return state.get() == State.FAILED;
675 }
676
677 private enum State
678 {
679 INCOMPLETE, PENDING, SUCCEEDED, FAILED
680 }
681 }
682
683 private class ContentChunk
684 {
685 private final boolean lastContent;
686 private final ByteBuffer content;
687
688 private ContentChunk(ContentChunk chunk)
689 {
690 lastContent = chunk.lastContent;
691 content = chunk.content;
692 }
693
694 private ContentChunk(Iterator<ByteBuffer> contentIterator)
695 {
696 lastContent = !contentIterator.hasNext();
697 content = lastContent ? BufferUtil.EMPTY_BUFFER : contentIterator.next();
698 }
699
700 private boolean isDeferred()
701 {
702 return content == null && !lastContent;
703 }
704 }
705
706 private class ContinueContentChunk extends ContentChunk
707 {
708 private final CountDownLatch latch = new CountDownLatch(1);
709
710 private ContinueContentChunk(ContentChunk chunk)
711 {
712 super(chunk);
713 }
714
715 private void signal()
716 {
717 latch.countDown();
718 }
719
720 private void await()
721 {
722 try
723 {
724 latch.await();
725 }
726 catch (InterruptedException x)
727 {
728 LOG.ignore(x);
729 }
730 }
731 }
732 }